简介

#include <sys/epoll.h>

epoll与select

Epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目
效率提升,epoll对于句柄事件的选择不是遍历的,是事件响应的,就是句柄上事件来就马上选择出来,不需要遍历整个句柄链表,因此效率非常高,内核将句柄用红黑树保存的,IO效率不随FD数目增加而线性下降。
内存拷贝, select让内核把 FD 消息通知给用户空间的时候使用了内存拷贝的方式,开销较大,但是Epoll 在这点上使用了共享内存的方式,这个内存拷贝也省略了。
相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。
并且,在linux/posix_types.h头文件有这样的声明:
#define __FD_SETSIZE 1024
表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。


触发模式

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

  • 水平触发(LT):默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件

//LevelTriggered(LT) //缺省工作方式,即默认的工作方式,支持blocksocket和no_blocksocket,错误率比较小。

  • 边缘触发(ET): 当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时只通知一次)。

//Edge Triggered(ET) //高速工作方式,错误率比较大,只支持no_block socket (非阻塞socket)

假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发ET,那么这个fd变成就绪状态就会从epoll 队列移除,
则epoll_wait 会一直阻塞,忽略尚未读取的1k数据; 而如果是水平触发LT,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。
总结: LT模式可能触发的次数更多, 一旦触发的次数多, 也就意味着效率会下降; 但这样也不能就说LT模式就比ET模式效率更低
因为ET的使用对编程人员提出了更高更精细的要求,一旦使用者编程水平不够, 那ET模式还不如LT模式。

ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,
也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;
而LT模式是只要有数据没有处理就会一直通知下去的.

1. 创建一个epoll的句柄

int epoll_create(int size);

创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。

这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

2. 将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //op为注册事件

epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。

  • EPOLL_CTL_ADD 注册新的fd到epfd中;
  • EPOLL_CTL_MOD 修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL 从epfd中删除一个fd;

3. 等待事件触发,当超过timeout还没有事件触发时,就超时

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中, 并且将注册在epfd上的socket fd的事件类型给清空

参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。


typedef union epoll_data {
   void    *ptr;
   int      fd;
   uint32_t u32;
   uint64_t u64;
} epoll_data_t;

struct epoll_event {
   uint32_t     events;    /* Epoll events */
   epoll_data_t data;      /* User data variable */
};

events可以是以下几个宏的集合:

  • EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

  • EPOLLOUT:表示对应的文件描述符可以写;

  • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

  • EPOLLERR:表示对应的文件描述符发生错误;

  • EPOLLHUP:表示对应的文件描述符被挂断;

  • EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

  • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里


epoll IO多路复用模型实现机制

设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发? 在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。 epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分:

  • 调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)

  • 调用epoll_ctl向epoll对象中添加这100万个连接的套接字

  • 调用epoll_wait收集发生的事件的连接

只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。


Linux内核具体的epoll机制实现思路

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关

/*
 * This structure is stored inside the "private_data" member of the file
 * structure and rapresent the main data sructure for the eventpoll
 * interface.
 */
struct eventpoll {
    /* Protect the this structure access */
    spinlock_t lock;

    /*
     * This mutex is used to ensure that files are not removed
     * while epoll is using them. This is held during the event
     * collection loop, the file cleanup path, the epoll file exit
     * code and the ctl operations.
     */
    struct mutex mtx;

    /* Wait queue used by sys_epoll_wait() */
    wait_queue_head_t wq;

    /* Wait queue used by file->poll() */
    wait_queue_head_t poll_wait;

    /* List of ready file descriptors */
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdllist;
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    /* RB tree root used to store monitored fd structs */
    struct rb_root rbr;

    /*
     * This is a single linked list that chains all the "struct epitem" that
     * happened while transfering ready events to userspace w/out
     * holding ->lock.
     */
    struct epitem *ovflist;

    /* The user that created the eventpoll descriptor */
    struct user_struct *user;
};

每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。

而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。

在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:

/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 */
struct epitem {
    /* RB tree node used to link this structure to the eventpoll RB tree */
//红黑树节点
    struct rb_node rbn;

    /* List header used to link this structure to the eventpoll ready list */
//双向链表节点
    struct list_head rdllink;

    /*
     * Works together "struct eventpoll"->ovflist in keeping the
     * single linked chain of items.
     */
    struct epitem *next;

    /* The file descriptor information this item refers to */
//事件句柄信息
    struct epoll_filefd ffd;

    /* Number of active wait queue attached to poll operations */
    int nwait;

    /* List containing poll wait queues */
    struct list_head pwqlist;

    /* The "container" of this item */
//指向其所属的eventpoll对象
    struct
![Uploading EPOLL_663944.jpg . . .]
eventpoll *ep;

    /* List header used to link this item to the "struct file" items list */
    struct list_head fllink;

    /* The structure that describe the interested events and the source fd */
 //期待发生的事件类型
    struct epoll_event event;
};

当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

wait

通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。


代码示例

#include <iostream>
#include <unistd.h>
#include <cstring>
#include <cassert>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <fcntl.h>

const int MAX_EVENT_NUMBER = 1024;
const int BUFFER_SIZE = 10;

int setnonblocking(int fd);
void addfd(int epollfd, int fd, bool enable_et);
void lt(epoll_event *events, int number, int epollfd, int listenfd);
void et(epoll_event *events, int number, int epollfd, int listenfd);

int main(int argc, char **argv)
{
    int port = 20999;

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    assert(listenfd != -1);

    int reuse = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));

    int ret = bind(listenfd, (struct sockaddr*) &addr, sizeof(addr));
    assert(ret != -1);
    ret = listen(listenfd, 10);
    assert(ret != -1);

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd = epoll_create(5);
    assert(epollfd > 0);

    addfd(epollfd, listenfd, true);
    while (true) {
        int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
        if (ret < 0) {
            std::cout << "epoll failed " << std::endl;
            break;
        }

        //lt(events, ret, epollfd, sockfd);		//LT模式
        et(events, ret, epollfd, listenfd);		//ET模式
    }

    return 0;
}

//设置非阻塞文件描述符
int setnonblocking(int fd)
{
    int old_option = fcntl(fd, F_GETFL);
    int new_option = old_option | O_NONBLOCK;
    fcntl(fd, F_SETFL, new_option);
    return old_option;
}

//将描述符fd的EPOLLIN注册到epollfd提示的epoll内核事件中,参数enable_et指定是否启用ET模式
void addfd(int epollfd, int fd, bool enable_et)
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;

    if (enable_et) {
        event.events |= EPOLLET;
    }

    epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);

    setnonblocking(fd);
}

//LT模式
void lt(epoll_event *events, int number, int epollfd, int listenfd)
{
    char buf[BUFFER_SIZE];

    for (int i = 0; i < number; i++) {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);

            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
            addfd(epollfd, connfd, false);
        }
        else if (events[i].events & EPOLLIN) {		//只要socket读缓存中还有未读出的数据,就会被触发
            std::cout << "event trigger once" << std::endl;
            memset(buf, '\0', BUFFER_SIZE);

            int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
            if (ret <= 0) {
                close(sockfd);
                continue;
            }

            std::cout << "get: " << ret << " bytes of content: " << buf << std::endl;
        }
        else {
            std::cout << "something else happened" << std::endl;
        }
    }
}

//ET模式
void et(epoll_event *events, int number, int epollfd, int listenfd)
{
    char buf[BUFFER_SIZE];

    for (int i = 0; i < number; i++) {
        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof(client_address);

            int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
            addfd(epollfd, connfd, true);
        }
        else if (events[i].events & EPOLLIN) {
            std::cout << "event trigger once" << std::endl;
            int ret = 0;

            //因为ET模式不会重复触发,所以我们要循环读取所有数据
            while (true) {
                memset(buf, '\0', BUFFER_SIZE);

                ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
                if (ret < 0) {
                    //对于非阻塞I/O,下面的条件成立时表示数据已全部读取完毕
                    if (errno == EAGAIN || errno == EWOULDBLOCK) {
                        std::cout << "read later!" << std::endl;
                        break;
                    }

                    close(sockfd);
                    break;
                }
                else if (ret == 0) {
                    close(sockfd);
                }
                else {
                    std::cout << "get " << ret << " bytes of content: " << buf << std::endl;
                }
            }
        }
        else {
            std::cout << "something else happened" << std::endl;
        }
    }

}

Re:

https://www.jianshu.com/p/718c24af400f

https://www.bbsmax.com/A/l1dymR3Gde/

https://www.jianshu.com/p/397449cadc9a

https://zhuanlan.zhihu.com/p/165162146