嗨,老铁,欢迎来到我的博客!

如果觉得我的内容还不错的话,可以关注下我在 segmentfault.com 上的直播。我主要从事 PHP 和 Java 方面的开发,《深入 PHP 内核》作者之一。

[视频直播] PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路 直播中我将毫无保留的分享我这六年的全部工作经验和踩坑的故事,以及会穿插着一些面试中的 考点难点加分点

周梦康 发表于 2016-04-05 22338 次浏览 标签 : Linux

epollpollselect深入浅出的理解,作为了一个伪架构师吹牛的必备技能包。很多文章,大多以纯文字的介绍,感觉好似空中楼阁,说得再形象,却没有代码配合,再好的戏也出不来。下面是自己的一些理解,如有勘误,还望大家批评指正。

可以先通过这个帖子预热下https://mengkang.net/559.html


可能是最接地气的 I/O 多路复用小结

阻塞型的网络编程接口

这个模式是大家所熟悉了解的,从创建(socket)、绑定(bind)、监听(listen)到循环接受(accept)。

当从一个客户端 socket 描述符读取数据时,发生阻塞,就导致了无法处理其他客户端的请求了。就好比有一个人在咨询客服问题,那么其他人打电话进来都是占线的,只能等待。

下面的代码仅仅作为演示,为了突出重点思路,一些错误处理没有添加。(对比 select 的代码,我会加上)

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>
  
#define SERV_PORT 8031
#define BUFSIZE 1024
 
int main(void)
{
    int lfd, cfd;
    struct sockaddr_in serv_addr,clin_addr;
    socklen_t clin_len;
    char recvbuf[BUFSIZE];
    int len;
  
    lfd = socket(AF_INET,SOCK_STREAM,0);
      
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(SERV_PORT);
     
    bind(lfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr));
      
    listen(lfd, 128);
 
    while(1){
        clin_len = sizeof(clin_addr);
        cfd = accept(lfd, (struct sockaddr *)&clin_addr, &clin_len);
        while(len = read(cfd,recvbuf,BUFSIZE)){
            write(STDOUT_FILENO,recvbuf,len);//把客户端输入的内容输出在终端
            // 只有当客户端输入 stop 就停止当前客户端的连接
            if (strncasecmp(recvbuf,"stop",4) == 0){
                close(cfd);
                break;
            }
        }
    }
 
    close(lfd); 
    return 0;
}

编译运行之后,开启两个终端使用命令nc 10.211.55.4 8031(假如服务器的 ip 为 10.211.55.4)。如果首先连上的客户端一直不输入stop加回车,那么第二个客户端输入任何内容都不会被客户端接收。如下图所示

输入abc的是先连接上的,在其输入stop之前,后面连接上的客户端输入123并不会被服务端收到。也就是说一直阻塞在第一个客户端那里。当第一个客户端输入stop之后,服务端才收到第二个客户端的发送过来的数据。

Select 模型

为了解决并发请求的处理,防止个别请求阻塞所有所有客户端都需要等待的情况,select 多路复用模型诞生了。但是实际它还是在同一时刻只能服务于一个客户端,只不过当一个客户端没有数据传输过来时,服务端可以转而服务于其他有数据过来的客户端。

你可能要说那和最上面的有什么区别啊,但是最原始的方式,如果要想服务第二个客户端,服务端必须关闭第一个客户端的描述符,而select模型中连接上的客户端在暂时没有数据传输过来时都不会断开。也就完成了服务端能接受多个客户端并发请求的任务了。

就好比商城的官方 Web IM 客服,一个客服可以同时连接上多个客户的咨询,而客服只需要在客户咨询问题的间歇间切换即可,而不是当有第一个客户申请客服服务之后,其他人则不能打开聊天窗口咨询了。(纯文字的描述说多了反而更晕,先理解到这里,下面看完实际的代码再反过来体会,中西药结合疗效才好)

其重点是select函数的调用。

int select(int maxfdp1,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)

maxfdp1表示该进程中描述符的总数。

fd_set则是配合select模型的重点数据结构,用来存放描述符的集合。

timeout表示select返回需要等待的时间。

从最简单的例子入手,我们这里仅仅设置readset,写描述符集合和异常描述符集合先不管。timeout我们传NULL,则表示select将一直阻塞等待readset中有数据可读才返回(这一点很重要)。

多客户端请求服务端,服务端与各客户端保持长连接并且能接收到各客户端数据大体思路如下:

  1. 初始化readset,并且将服务端监听的描述符添加到readset中去。

  2. 然后select阻塞等待readset集合中是否有描述符可读。

  3. 如果是服务端描述符可读,那么表示有新客户端连接上。通过accept接收客户端的数据,并且将客户端描述符添加到一个数组client中,以便二次遍历的时候使用。

  4. 执行第二次循环,此时通过for循环把client中的有效的描述符都添加到readset中去。

  5. select再次阻塞等待readset集合中是否有描述符可读。

  6. 如果此时已经连接上的某个客户端描述符有数据可读,则进行数据读取。

下面是实现代码,我带上了详细的注释,而这些注释呢,也是我自己学习过程中遇到的问题

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/select.h>
#include <sys/time.h>
#include <string.h>

#define SERV_PORT     8031
#define BUFSIZE       1024
#define FD_SET_SIZE   128

int main(void) {
    int lfd, cfd, maxfd, scokfd, retval;
    struct sockaddr_in serv_addr, clin_addr;

    socklen_t clin_len; // 地址信息结构体大小

    char recvbuf[BUFSIZE];
    int len;

    fd_set read_set, read_set_init;

    int client[FD_SET_SIZE];
    int i;
    int maxi = -1;


    if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        perror("套接字描述符创建失败");
        exit(1);
    }

    int opt = 1;
    setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(SERV_PORT);

    if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {
        perror("绑定失败");
        exit(1);
    }

    if (listen(lfd, FD_SET_SIZE) == -1) {
        perror("监听失败");
        exit(1);
    }

    maxfd = lfd;

    for (i = 0; i < FD_SET_SIZE; ++i) {
        client[i] = -1;
    }

    FD_ZERO(&read_set_init);
    FD_SET(lfd, &read_set_init);

    while (1) {
        // 每次循环开始时,都初始化 read_set
        read_set = read_set_init;
        
        // 因为上一步 read_set 已经重置,所以需要已连接上的客户端 fd (由上次循环后产生)重新添加进 read_set
        for (i = 0; i < FD_SET_SIZE; ++i) {
            if (client[i] > 0) {
                FD_SET(client[i], &read_set);
            }
        }

        printf("select 等待\n");
        // 这里会阻塞,直到 read_set 中某一个 fd 有数据可读才返回,注意 read_set 中除了客户端 fd 还有服务端监听的 fd
        retval = select(maxfd + 1, &read_set, NULL, NULL, NULL);
        if (retval == -1) {
            perror("select 错误\n");
        } else if (retval == 0) {
            printf("超时\n");
            continue;
        }
        printf("select 返回\n");

        //------------------------------------------------------------------------------------------------
        // 用 FD_ISSET 来判断 lfd (服务端监听的fd)是否可读。只有当新的客户端连接时,lfd 才可读 
        if (FD_ISSET(lfd, &read_set)) {
            clin_len = sizeof(clin_addr);
            if ((cfd = accept(lfd, (struct sockaddr *) &clin_addr, &clin_len)) == -1) {
                perror("接收错误\n");
                continue;
            }

            for (i = 0; i < FD_SET_SIZE; ++i) {
                if (client[i] < 0) {
                    // 把客户端 fd 放入 client 数组
                    client[i] = cfd;
                    printf("接收client[%d]一个请求来自于: %s:%d\n", i, inet_ntoa(clin_addr.sin_addr), ntohs(clin_addr.sin_port));
                    break;
                }
            }
            
            // 最大的描述符值也要重新计算
            maxfd = (cfd > maxfd) ? cfd : maxfd;
            // maxi 用于下面遍历所有有效客户端 fd 使用,以免遍历整个 client 数组
            maxi = (i >= maxi) ? ++i : maxi;
        }
        //------------------------------------------------------------------------------------------------

        for (i = 0; i < maxi; ++i) {
            if (client[i] < 0) {
                continue;
            }
            
            // 如果客户端 fd 中有数据可读,则进行读取
            if (FD_ISSET(client[i], &read_set)) {
                // 注意:这里没有使用 while 循环读取,如果使用 while 循环读取,则有阻塞在一个客户端了。
                // 可能你会想到如果一次读取不完怎么办?
                // 读取不完时,在循环到 select 时 由于未读完的 fd 还有数据可读,那么立即返回,然后到这里继续读取,原来的 while 循环读取直接提到最外层的 while(1) + select 来判断是否有数据继续可读
                len = read(client[i], recvbuf, BUFSIZE);
                if (len > 0) {
                    write(STDOUT_FILENO, recvbuf, len);
                }else if (len == 0){
                    // 如果在客户端 ctrl+z
                    close(client[i]);
                    printf("clinet[%d] 连接关闭\n", i);
                    FD_CLR(client[i], &read_set);
                    client[i] = -1;
                    break;
                }
            }
        }

    }

    close(lfd);

    return 0;
}

编译并实验

补:上面的代码中对一个客户端一次性输入的内容大于缓存大小的时候,需要多次select,不太好。可以将客户端 fd 设置为非阻塞模式,然后读取的时候 while 循环。

在上例的代码100行后添加

int old_option = fcntl(cfd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(cfd, F_SETFL, new_option);

在119后换成 while 读取

if (FD_ISSET(client[i], &read_set)) {
    while (1) {
        len = read(client[i], recvbuf, BUFSIZE);
        if (len > 0) {
            write(STDOUT_FILENO, recvbuf, len);
        } else if (len == 0) {
            close(client[i]);
            printf("clinet[%d] 连接关闭\n", i);
            FD_CLR(client[i], &read_set);
            client[i] = -1;
            break;
        }else if(len < 0){
            break;
        }
    }
}

最后完整代码地址https://github.com/zhoumengkang/notes/blob/master/unix/socket/select/nonblock.c

我们发现上例代码中有一个比较低效的地方,只要有一个客户端有数据传输过来,服务端需要遍历整个client数组,并且将所有已经连接上的客户端都在readset中判断一次,以确定是否有数据可读。

更多的 select 的不足之处可以参考这篇文章,http://www.cnblogs.com/Anker/p/3265058.html 

poll的实现和select非常相似,只是描述fd集合的方式不同,poll使用pollfd结构而不是selectfd_set结构,其他的都差不多。

epoll 模型

epoll 作为 select 的升级版,其中很多地方还是很类似的,可以类比着学习,epoll 的核心是

int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

使用epoll 来实现服务端同时接受多客户端长连接数据时,的大体步骤如下:

  1. 使用epoll_create创建一个 epoll 的句柄,下例中我们命名为epollfd

  2. 使用epoll_ctl把服务端监听的描述符添加到epollfd指定的 epoll 内核事件表中,监听服务器端监听的描述符是否可读。

  3. 使用epoll_wait阻塞等待注册的服务端监听的描述符可读事件的发生。

  4. 当有新的客户端连接上服务端时,服务端监听的描述符可读,则epoll_wait返回,然后通过accept获取客户端描述符。

  5. 使用epoll_ctl把客户端描述符添加到epollfd指定的 epoll 内核事件表中,监听服务器端监听的描述符是否可读。

  6. 当客户端描述符有数据可读时,则触发epoll_wait返回,然后执行读取。

完整代码如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <sys/time.h>
#include <string.h>

#define SERV_PORT           8031
#define MAX_EVENT_NUMBER    1024
#define BUFFER_SIZE         10


/* 将文件描述符 fd 上的 EPOLLIN 注册到 epollfd 指示的 epoll 内核事件表中 */
void addfd(int epollfd, int fd) {
    struct epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
}

void et(struct epoll_event *events, int number, int epollfd, int listenfd) {
    char buf[BUFFER_SIZE];
    for (int i = 0; i < number; ++i) {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            struct sockaddr_in client_address;
            socklen_t length = sizeof(client_address);
            int connfd = accept(listenfd, (struct sockaddr *) &client_address, &length);
            printf("接收一个请求来自于: %s:%d\n", inet_ntoa(client_address.sin_addr), ntohs(client_address.sin_port));

            addfd(epollfd, connfd);
        } else if (events[i].events & EPOLLIN) {
            /* 这段代码不会被重复触发,所以我们循环读取数据,以确保把 socket 缓存中的所有数据读取*/
            while (1) {
                memset(buf, '\0', BUFFER_SIZE);
                int ret = recv(sockfd, buf, BUFFER_SIZE - 1, 0);
                if (ret < 0) {
                    /* 对非阻塞 I/O ,下面的条件成立表示数据已经全部读取完毕。此后 epoll 就能再次触发 sockfd 上的 EPOLLIN 事件,以驱动下一次读操作 */
                    if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                        printf("read later\n");
                        break;
                    }
                    close(sockfd);
                    break;
                } else if (ret == 0) {
                    printf("断开一个连接\n");
                    close(sockfd);
                } else {
                    printf("get %d bytes of content: %s\n", ret, buf);
                }
            }
        }
    }
}


int main(void) {
    int lfd, epollfd,ret;
    struct sockaddr_in serv_addr;

    if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        perror("套接字描述符创建失败");
        exit(1);
    }

    int opt = 1;
    setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(SERV_PORT);

    if (bind(lfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {
        perror("绑定失败");
        exit(1);
    }

    if (listen(lfd, 5) == -1) {
        perror("监听失败");
        exit(1);
    }

    struct epoll_event events[MAX_EVENT_NUMBER];
    if ((epollfd = epoll_create(5)) == -1) {
        perror("创建失败");
        exit(1);
    }

    // 把服务器端 lfd 添加到 epollfd 指定的 epoll 内核事件表中,添加一个 lfd 可读的事件
    addfd(epollfd, lfd);
    while (1) {
        // 阻塞等待新客户端的连接或者客户端的数据写入,返回需要处理的事件数目
        if ((ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1)) < 0) {
            perror("epoll_wait失败");
            exit(1);
        }

        et(events, ret, epollfd, lfd);
    }

    close(lfd);
    return 0;
}

小结

I/O 复用是最常用的 I/O 通知机制,程序阻塞于 I/O 复用系统调用(比如select,poll,epoll_wait),但可以同时监听多个 I/O 事件。对 I/O 本身的读写操作是非阻塞的(当然也是需要通过fcntl来设置的)。

嗨,老铁,欢迎来到我的博客!

如果觉得我的内容还不错的话,可以关注下我在 segmentfault.com 上的直播。我主要从事 PHP 和 Java 方面的开发,《深入 PHP 内核》作者之一。

[视频直播] PHP 进阶之路 - 亿级 pv 网站架构的技术细节与套路 直播中我将毫无保留的分享我这六年的全部工作经验和踩坑的故事,以及会穿插着一些面试中的 考点难点加分点

评论列表

回复 路人甲 2016-04-07 14:39:48
牛逼
回复 路人甲 2016-04-09 17:11:42
学习了,之前老是似懂非懂的,实际执行下代码感觉清晰多了
回复 康哥 2016-04-10 11:29:24
最初只能是利用多进程或者多线程处理并发请求。但是只能是一个进程或者一个线程来处理请求。有了多路复用,则使得一个进程或者一个线程就能处理大量客户端请求了。
回复 Lynn 2016-04-21 22:06:43
深入浅出
回复 康哥 2016-04-22 11:07:09
回复Lynn: 写写,觉得写得还不是太好,这里可以把阻塞I/O和非阻塞I/O理解清楚。
回复 路人甲 2016-12-30 06:08:50
文中的图片挂了...