1. 多進(jìn)程并發(fā)服務(wù)器
我們?cè)谏弦还?jié)寫的TCP服務(wù)器只能處理單連接,在代碼實(shí)現(xiàn)時(shí),多進(jìn)程并發(fā)服務(wù)器與非并發(fā)服務(wù)器在創(chuàng)建監(jiān)聽套接字、綁定、監(jiān)聽這幾個(gè)步驟是一樣的,但是在接收連接請(qǐng)求的時(shí)候,多進(jìn)程并發(fā)服務(wù)器是這樣實(shí)現(xiàn)的:父進(jìn)程負(fù)責(zé)接受連接請(qǐng)求,一旦連接成功,將會(huì)創(chuàng)建一個(gè)子進(jìn)程與客戶端通信。示意圖如下:
(1)什么是并發(fā)
單核CPU → 多進(jìn)程/線程并發(fā) → 時(shí)間片輪轉(zhuǎn)
并發(fā) → 某一個(gè)時(shí)間片/點(diǎn)所能處理的任務(wù)數(shù)
服務(wù)器并發(fā):服務(wù)器在某個(gè)時(shí)間點(diǎn)/片所能處理的連接數(shù)所能接收的client連接越多,并發(fā)量越大
(2)多進(jìn)程并發(fā)服務(wù)器需要注意的幾個(gè)要點(diǎn)
使用多進(jìn)程的方式來解決服務(wù)器處理多連接的問題,需要注意下面幾點(diǎn):
共享:讀時(shí)共享、寫時(shí)復(fù)制。有血緣關(guān)系的進(jìn)程間將會(huì)共享
文件描述符
內(nèi)存映射區(qū)mmap
父進(jìn)程扮演什么角色?
等待接受客戶端連接accept()
有連接的時(shí)候通過fork()創(chuàng)建一個(gè)子進(jìn)程。父進(jìn)程只負(fù)責(zé)等待客戶端連接,即通過accept()阻塞等待連接請(qǐng)求,一旦有連接請(qǐng)求,馬上通過fork()創(chuàng)建一個(gè)子進(jìn)程,子進(jìn)程通過共享父進(jìn)程的文件描述符來實(shí)現(xiàn)和client通信。
將用于通信的文件描述符關(guān)閉。accept()接受連接請(qǐng)求后會(huì)返回一個(gè)用于通信的文件描述符,而父進(jìn)程的職責(zé)是等待連接并fork()創(chuàng)建用于通信的子進(jìn)程,所以對(duì)于父進(jìn)程來說,用于通信的文件描述符是沒有用處的,關(guān)閉該文件描述符來節(jié)省開銷。我們知道,文件描述符是有上限的,最多1024個(gè)(0-1023),如果不關(guān)閉的話,每次fork()一個(gè)子進(jìn)程都要浪費(fèi)一個(gè)文件描述符,如果進(jìn)程多了,可能文件描述符就不夠用了。
子進(jìn)程扮演什么角色?
通信。通過共享的父進(jìn)程accept()返回的文件描述符來與客戶端通信。
將用于監(jiān)聽的文件描述符關(guān)閉。同樣是為了節(jié)省資源,子進(jìn)程被fork()出來后也會(huì)擁有一個(gè)用于監(jiān)聽的文件描述符(因?yàn)樽舆M(jìn)程是對(duì)父進(jìn)程的拷貝),但是子進(jìn)程的作用是與客戶端通信,所以用于監(jiān)聽的文件描述符對(duì)子進(jìn)程而言并無用處,關(guān)閉以節(jié)省資源。
創(chuàng)建的子進(jìn)程個(gè)數(shù)有限制嗎?
受硬件限制
文件描述符默認(rèn)上限1024
子進(jìn)程資源回收
wait/waitpid
使用信號(hào)回收
signal
sigaction
捕捉信號(hào)SIGCHLD
(3)讀時(shí)共享寫時(shí)復(fù)制詳解
首先看圖
如果父子進(jìn)程都只是讀數(shù)據(jù),那么他們都通過虛擬地址去訪問1號(hào)物理地址的內(nèi)容,如果此時(shí)父進(jìn)程修改了數(shù)據(jù)a=8,那么父進(jìn)程會(huì)先復(fù)制一份數(shù)據(jù)到2號(hào)內(nèi)存,然后修改2號(hào)內(nèi)存的數(shù)據(jù),父進(jìn)程再讀的時(shí)候就去2號(hào)內(nèi)存讀,而子進(jìn)程依然去1號(hào)內(nèi)存讀。如果子進(jìn)程也要修改這個(gè)全局變量,那么子進(jìn)程也會(huì)拷貝一份數(shù)據(jù)到內(nèi)存3,然后修改內(nèi)存3的數(shù)據(jù),子進(jìn)程訪問數(shù)據(jù)時(shí)會(huì)訪問內(nèi)存3的數(shù)據(jù)。(多個(gè)子進(jìn)程就會(huì)拷貝多份)
2. 多進(jìn)程并發(fā)服務(wù)器代碼實(shí)現(xiàn)
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <signal.h>
#include <sys/wait.h>
#include <errno.h>
// 進(jìn)程回收函數(shù)
void recyle(int num)
{
pid_t pid;
while( (pid = waitpid(-1, NULL, WNOHANG)) > 0 )
{
printf("child died , pid = %dn", pid);
}
}
int main(int argc, const char* argv[])
{
if(argc < 2)
{
printf("eg: ./a.out portn");
exit(1);
}
struct sockaddr_in serv_addr;
socklen_t serv_len = sizeof(serv_addr);
int port = atoi(argv[1]);
// 創(chuàng)建套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
// 初始化服務(wù)器 sockaddr_in
memset(&serv_addr, 0, serv_len);
serv_addr.sin_family = AF_INET; // 地址族
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 監(jiān)聽本機(jī)所有的IP
serv_addr.sin_port = htons(port); // 設(shè)置端口
// 綁定IP和端口
bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
// 設(shè)置同時(shí)監(jiān)聽的最大個(gè)數(shù)
listen(lfd, 36);
printf("Start accept ......n");
// 使用信號(hào)回收子進(jìn)程pcb //這個(gè)子進(jìn)程回收機(jī)制會(huì)被子進(jìn)程復(fù)制
struct sigaction act;
act.sa_handler = recyle;
act.sa_flags = 0;
sigemptyset(&act.sa_mask);
sigaction(SIGCHLD, &act, NULL);
struct sockaddr_in client_addr;
socklen_t cli_len = sizeof(client_addr);
while(1)
{
// 父進(jìn)程接收連接請(qǐng)求
// accept阻塞的時(shí)候被信號(hào)中斷, 處理信號(hào)對(duì)應(yīng)的操作之后(比如子進(jìn)程終止,收到信號(hào)后去回收子進(jìn)程)
// 回來之后不阻塞了, 直接返回-1, 這時(shí)候 errno==EINTR
int cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
//解決方法就是,在一個(gè)循環(huán)中判斷,如果accept阻塞過程中被信號(hào)打斷
//也就是返回值-1且errno == EINTR,那么再一次調(diào)用accept
//這樣accept會(huì)再次回到阻塞狀態(tài),并且返回值不是-1,也就不會(huì)進(jìn)入循環(huán)
//等到再次被信號(hào)打斷的時(shí)候才會(huì)再次進(jìn)入循環(huán)
/*這里的cfd雖然只定義了一個(gè),但是在每個(gè)子進(jìn)程中都會(huì)有一個(gè)拷貝,并且修改一個(gè)子進(jìn)程的cfd不會(huì)影響其它子進(jìn)程*/
while(cfd == -1 && errno == EINTR)
{
cfd = accept(lfd, (struct sockaddr*)&client_addr, &cli_len);
}
printf("connect sucessfuln");
// 創(chuàng)建子進(jìn)程
pid_t pid = fork();
if(pid == 0)
{
close(lfd);
// child process
// 通信
char ip[64];
while(1)
{
// client ip port
printf("client IP: %s, port: %dn",
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, ip, sizeof(ip)),
ntohs(client_addr.sin_port));
char buf[1024];
int len = read(cfd, buf, sizeof(buf));
if(len == -1)
{
perror("read error");
exit(1);
}
else if(len == 0)
{
printf("客戶端斷開了連接n");
close(cfd);
break;
}
else
{
printf("recv buf: %sn", buf);
write(cfd, buf, len);
}
}
// 干掉子進(jìn)程
return 0;
}
else if(pid > 0)
{
// parent process
close(cfd);
}
}
close(lfd);
return 0;
}
3. 多線程并發(fā)服務(wù)器
多線程并發(fā)服務(wù)器示意圖如下:
在多進(jìn)程模型中,fork得到的子進(jìn)程會(huì)復(fù)制父進(jìn)程的文件描述符cfd等信息,每個(gè)進(jìn)程的cfd都是自己的,操作互不影響。但是線程不同,現(xiàn)在只有主線程的cfd,多個(gè)線程間的信息是共享的,假如說傳遞給每個(gè)子線程的cfd都是同一個(gè),那么線程1修改該文件描述符指向的內(nèi)容會(huì)影響到線程2的通信,因?yàn)樗鼈児蚕磉@一個(gè)文件描述符。所以這里需要建立一個(gè)文件描述符數(shù)組,每個(gè)子線程對(duì)應(yīng)數(shù)組中的一個(gè)文件描述符。
另外連接主線程的client是哪一個(gè),也就是說哪個(gè)client對(duì)應(yīng)和哪個(gè)子線程通信,這也需要把和子線程通信的client的ip和port傳給和該client通信的子線程,這樣子線程才能知道通信的客戶端的ip和port。
于是我們需要?jiǎng)?chuàng)建一個(gè)結(jié)構(gòu)體數(shù)組,每個(gè)子線程對(duì)應(yīng)結(jié)構(gòu)體數(shù)組中的一個(gè)成員,而結(jié)構(gòu)體數(shù)組中的每個(gè)成員將作為參數(shù)傳遞給子進(jìn)程的回調(diào)函數(shù)。
歸根到底就是因?yàn)椋M(jìn)程是獨(dú)立的,線程是共享的。
線程共享下面的資源:
全局?jǐn)?shù)據(jù)區(qū)
堆區(qū)
一塊有效內(nèi)存的地址,比如說把線程1的一塊內(nèi)存的地址傳給線程2,那么線程2也可以操作這塊內(nèi)存。
4. 多線程并發(fā)服務(wù)器代碼實(shí)現(xiàn)
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ctype.h>
#include <pthread.h>
// 自定義數(shù)據(jù)結(jié)構(gòu) //把線程處理函數(shù)所需要的信息封裝進(jìn)來
typedef struct SockInfo
{
int fd; // 文件描述符
struct sockaddr_in addr; //ip地址結(jié)構(gòu)體
pthread_t id; //線程id
}SockInfo;
// 子線程處理函數(shù)
void* worker(void* arg)
{
char ip[64];
char buf[1024];
SockInfo* info = (SockInfo*)arg;
// 通信
while(1)
{
printf("Client IP: %s, port: %dn",
inet_ntop(AF_INET, &info->addr.sin_addr.s_addr, ip, sizeof(ip)),
ntohs(info->addr.sin_port));
int len = read(info->fd, buf, sizeof(buf));
if(len == -1)
{
perror("read error");
pthread_exit(NULL); //只退出子線程
//exit(1); //exit會(huì)把主線程也一塊退出
}
else if(len == 0)
{
printf("客戶端已經(jīng)斷開了連接n");
close(info->fd);
break;
}
else
{
printf("recv buf: %sn", buf);
write(info->fd, buf, len);
}
}
return NULL;
}
int main(int argc, const char* argv[])
{
if(argc < 2)
{
printf("eg: ./a.out portn");
exit(1);
}
struct sockaddr_in serv_addr;
socklen_t serv_len = sizeof(serv_addr);
int port = atoi(argv[1]);
// 創(chuàng)建套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
// 初始化服務(wù)器 sockaddr_in
memset(&serv_addr, 0, serv_len);
serv_addr.sin_family = AF_INET; // 地址族
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 監(jiān)聽本機(jī)所有的IP
serv_addr.sin_port = htons(port); // 設(shè)置端口
// 綁定IP和端口
bind(lfd, (struct sockaddr*)&serv_addr, serv_len);
// 設(shè)置同時(shí)監(jiān)聽的最大個(gè)數(shù)
listen(lfd, 36);
printf("Start accept ......n");
int i = 0;
SockInfo info[256]; //每個(gè)線程對(duì)應(yīng)數(shù)組的一個(gè)元素,最多256個(gè)線程
// 規(guī)定 fd == -1 說明這是一個(gè)無效文件描述符,也就是說這個(gè)文件描述符是空閑的,沒被占用
for(i=0; i<sizeof(info)/sizeof(info[0]); ++i)
{
info[i].fd = -1; //所有文件描述符全部初始化為-1
}
socklen_t cli_len = sizeof(struct sockaddr_in);
while(1)
{
// 選一個(gè)沒有被使用的, 最小的數(shù)組元素
//因?yàn)橛锌赡芪覀兪褂玫奈募枋龇麑?duì)應(yīng)數(shù)組下標(biāo)i已經(jīng)累加到了100,但是前面
//99個(gè)都已經(jīng)被釋放了(斷開連接了),我們最好選用一個(gè)當(dāng)前空閑的數(shù)組下標(biāo)最小
//的文件描述符,以合理利用資源
for(i=0; i<256; ++i)
{
if(info[i].fd == -1)
{
break; //這樣就能把數(shù)組下標(biāo)最小的fd找出來,并確保i指向它,直接break出去
}
}
if(i == 256) //整個(gè)數(shù)組都被用完了,直接break出while循環(huán)
{
break;
}
// 主線程 - 等待接受連接請(qǐng)求
info[i].fd = accept(lfd, (struct sockaddr*)&info[i].addr, &cli_len); //第二個(gè)參數(shù)是傳出參數(shù),
//傳出客戶端ip信息(struct sockaddr*)類型
// 創(chuàng)建子線程 - 通信
pthread_create(&info[i].id, NULL, worker, &info[i]);
// 設(shè)置線程分離 //這樣子線程終止的時(shí)候會(huì)自動(dòng)釋放,就不需要主線程去釋放了
pthread_detach(info[i].id);
}
close(lfd);
// 只退出主線程 //對(duì)子線程無影響,子線程可以繼續(xù)通信
pthread_exit(NULL);
return 0;
}
5. 擴(kuò)展:Socket API封裝
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <sys/socket.h>
void perr_exit(const char *s)
{
perror(s);
exit(-1);
}
//也可以在vim下按2K跳轉(zhuǎn)到man文檔中的accept函數(shù),因?yàn)閙an文檔跳轉(zhuǎn)不區(qū)分大小寫
int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)
{
int n;
again:
if ((n = accept(fd, sa, salenptr)) < 0)
{
//ECONNABORTED 發(fā)生在重傳(一定次數(shù))失敗后,強(qiáng)制關(guān)閉套接字
//EINTR 進(jìn)程被信號(hào)中斷 //如果accept函數(shù)在阻塞時(shí)被信號(hào)打斷,處理完信號(hào)
//返回時(shí)就不會(huì)在阻塞了,而是直接返回-1
if ((errno == ECONNABORTED) || (errno == EINTR))
{
goto again; //如果accept阻塞時(shí)被信號(hào)打斷了,需要在執(zhí)行一次accept繼續(xù)阻塞
}
else
{
perr_exit("accept error");
}
}
return n;
}
int Bind(int fd, const struct sockaddr *sa, socklen_t salen)
{
int n;
if ((n = bind(fd, sa, salen)) < 0)
{
perr_exit("bind error");
}
return n;
}
int Connect(int fd, const struct sockaddr *sa, socklen_t salen)
{
int n;
n = connect(fd, sa, salen);
if (n < 0)
{
perr_exit("connect error");
}
return n;
}
int Listen(int fd, int backlog)
{
int n;
if ((n = listen(fd, backlog)) < 0)
{
perr_exit("listen error");
}
return n;
}
int Socket(int family, int type, int protocol)
{
int n;
if ((n = socket(family, type, protocol)) < 0)
{
perr_exit("socket error");
}
return n;
}
ssize_t Read(int fd, void *ptr, size_t nbytes)
{
ssize_t n;
again:
if ( (n = read(fd, ptr, nbytes)) == -1)
{
if (errno == EINTR)
goto again; //如果read被信號(hào)中斷了,應(yīng)該讓它繼續(xù)去read等待讀數(shù)據(jù) (read阻塞時(shí))
else
return -1;
}
return n;
}
ssize_t Write(int fd, const void *ptr, size_t nbytes)
{
ssize_t n;
again:
if ((n = write(fd, ptr, nbytes)) == -1)
{
if (errno == EINTR)
goto again;
else
return -1;
}
return n;
}
int Close(int fd)
{
int n;
if ((n = close(fd)) == -1)
perr_exit("close error");
return n;
}
/*參三: 應(yīng)該讀取的字節(jié)數(shù)*/ //一直讀到n字節(jié)數(shù)才會(huì)返回,否則阻塞等待
//socket 4096 readn(cfd, buf, 4096) nleft = 4096-1500
ssize_t Readn(int fd, void *vptr, size_t n)
{
size_t nleft; //usigned int 剩余未讀取的字節(jié)數(shù)
ssize_t nread; //int 實(shí)際讀到的字節(jié)數(shù)
char *ptr;
ptr = vptr;
nleft = n; //n 未讀取字節(jié)數(shù)
while (nleft > 0)
{
if ((nread = read(fd, ptr, nleft)) < 0)
{
if (errno == EINTR)
{
nread = 0;
}
else
{
return -1;
}
}
else if (nread == 0)
{
break;
}
nleft -= nread; //nleft = nleft - nread
ptr += nread;
}
return n - nleft;
}
ssize_t Writen(int fd, const void *vptr, size_t n)
{
size_t nleft;
ssize_t nwritten;
const char *ptr;
ptr = vptr;
nleft = n;
while (nleft > 0)
{
if ( (nwritten = write(fd, ptr, nleft)) <= 0)
{
if (nwritten < 0 && errno == EINTR)
nwritten = 0;
else
return -1;
}
nleft -= nwritten;
ptr += nwritten;
}
return n;
}
static ssize_t my_read(int fd, char *ptr) //靜態(tài)函數(shù)保證了讀完第一個(gè)100字節(jié)才去讀下一個(gè)100字節(jié),而不是每次調(diào)用都讀100字節(jié)
{
static int read_cnt; //改變量存在靜態(tài)數(shù)據(jù)區(qū),下次調(diào)用my_read函數(shù)的時(shí)候,read_cnt會(huì)保留上次的值
static char *read_ptr;
static char read_buf[100];
//因?yàn)檫@里的變量都是static的,所以并非每次調(diào)用my_read都會(huì)讀100字節(jié),而是讀完100字節(jié)再去讀下一個(gè)100字節(jié)
if (read_cnt <= 0) {
again:
if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) //"hellon"
{
if (errno == EINTR)
goto again;
return -1;
}
else if (read_cnt == 0)
return 0;
read_ptr = read_buf;
}
read_cnt--; //在上次調(diào)用結(jié)束的值基礎(chǔ)上--,保證了讀完100字節(jié)再去讀下一個(gè)100字節(jié)
*ptr = *read_ptr++;
return 1;
}
/*readline --- fgets*/
//傳出參數(shù) vptr
ssize_t Readline(int fd, void *vptr, size_t maxlen)
{
ssize_t n, rc;
char c, *ptr;
ptr = vptr;
for (n = 1; n < maxlen; n++)
{
if ((rc = my_read(fd, &c)) == 1) //ptr[] = hellon
{
*ptr++ = c;
if (c == 'n') //先讀100個(gè)字節(jié),依次遍歷,遇到 'n' 說明一行讀完了
break;
}
else if (rc == 0)
{
*ptr = 0;
return n-1;
}
else
return -1;
}
*ptr = 0;
return n;
}