epoll use
简介
#include <sys/epoll.h>
epoll与select
Epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目
效率提升,epoll对于句柄事件的选择不是遍历的,是事件响应的,就是句柄上事件来就马上选择出来,不需要遍历整个句柄链表,因此效率非常高,内核将句柄用红黑树保存的,IO效率不随FD数目增加而线性下降。
内存拷贝, select让内核把 FD 消息通知给用户空间的时候使用了内存拷贝的方式,开销较大,但是Epoll 在这点上使用了共享内存的方式,这个内存拷贝也省略了。
相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。
并且,在linux/posix_types.h头文件有这样的声明:
#define __FD_SETSIZE 1024
表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
触发模式
epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
- 水平触发(LT):默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件
//LevelTriggered(LT) //缺省工作方式,即默认的工作方式,支持blocksocket和no_blocksocket,错误率比较小。
- 边缘触发(ET): 当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时只通知一次)。
//Edge Triggered(ET) //高速工作方式,错误率比较大,只支持no_block socket (非阻塞socket)
假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发ET,那么这个fd变成就绪状态就会从epoll 队列移除,
则epoll_wait 会一直阻塞,忽略尚未读取的1k数据; 而如果是水平触发LT,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。
总结: LT模式可能触发的次数更多, 一旦触发的次数多, 也就意味着效率会下降; 但这样也不能就说LT模式就比ET模式效率更低
因为ET的使用对编程人员提出了更高更精细的要求,一旦使用者编程水平不够, 那ET模式还不如LT模式。
ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,
也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;
而LT模式是只要有数据没有处理就会一直通知下去的.
1. 创建一个epoll的句柄
int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2. 将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //op为注册事件
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
- EPOLL_CTL_ADD 注册新的fd到epfd中;
- EPOLL_CTL_MOD 修改已经注册的fd的监听事件;
- EPOLL_CTL_DEL 从epfd中删除一个fd;
3. 等待事件触发,当超过timeout还没有事件触发时,就超时
int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);
等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中, 并且将注册在epfd上的socket fd的事件类型给清空
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
-
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
-
EPOLLOUT:表示对应的文件描述符可以写;
-
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
-
EPOLLERR:表示对应的文件描述符发生错误;
-
EPOLLHUP:表示对应的文件描述符被挂断;
-
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
-
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
epoll IO多路复用模型实现机制
设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发? 在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。 epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分:
-
调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
-
调用epoll_ctl向epoll对象中添加这100万个连接的套接字
-
调用epoll_wait收集发生的事件的连接
只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。
Linux内核具体的epoll机制实现思路
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关
/*
* This structure is stored inside the "private_data" member of the file
* structure and rapresent the main data sructure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the this structure access */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdllist;
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
/* RB tree root used to store monitored fd structs */
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transfering ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
*/
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
//红黑树节点
struct rb_node rbn;
/* List header used to link this structure to the eventpoll ready list */
//双向链表节点
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
//事件句柄信息
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
//指向其所属的eventpoll对象
struct
![Uploading EPOLL_663944.jpg . . .]
eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* The structure that describe the interested events and the source fd */
//期待发生的事件类型
struct epoll_event event;
};
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。
通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。
代码示例
#include <iostream>
#include <unistd.h>
#include <cstring>
#include <cassert>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <fcntl.h>
const int MAX_EVENT_NUMBER = 1024;
const int BUFFER_SIZE = 10;
int setnonblocking(int fd);
void addfd(int epollfd, int fd, bool enable_et);
void lt(epoll_event *events, int number, int epollfd, int listenfd);
void et(epoll_event *events, int number, int epollfd, int listenfd);
int main(int argc, char **argv)
{
int port = 20999;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd != -1);
int reuse = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
int ret = bind(listenfd, (struct sockaddr*) &addr, sizeof(addr));
assert(ret != -1);
ret = listen(listenfd, 10);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd > 0);
addfd(epollfd, listenfd, true);
while (true) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (ret < 0) {
std::cout << "epoll failed " << std::endl;
break;
}
//lt(events, ret, epollfd, sockfd); //LT模式
et(events, ret, epollfd, listenfd); //ET模式
}
return 0;
}
//设置非阻塞文件描述符
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
//将描述符fd的EPOLLIN注册到epollfd提示的epoll内核事件中,参数enable_et指定是否启用ET模式
void addfd(int epollfd, int fd, bool enable_et)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if (enable_et) {
event.events |= EPOLLET;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
//LT模式
void lt(epoll_event *events, int number, int epollfd, int listenfd)
{
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, false);
}
else if (events[i].events & EPOLLIN) { //只要socket读缓存中还有未读出的数据,就会被触发
std::cout << "event trigger once" << std::endl;
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
if (ret <= 0) {
close(sockfd);
continue;
}
std::cout << "get: " << ret << " bytes of content: " << buf << std::endl;
}
else {
std::cout << "something else happened" << std::endl;
}
}
}
//ET模式
void et(epoll_event *events, int number, int epollfd, int listenfd)
{
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, true);
}
else if (events[i].events & EPOLLIN) {
std::cout << "event trigger once" << std::endl;
int ret = 0;
//因为ET模式不会重复触发,所以我们要循环读取所有数据
while (true) {
memset(buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
if (ret < 0) {
//对于非阻塞I/O,下面的条件成立时表示数据已全部读取完毕
if (errno == EAGAIN || errno == EWOULDBLOCK) {
std::cout << "read later!" << std::endl;
break;
}
close(sockfd);
break;
}
else if (ret == 0) {
close(sockfd);
}
else {
std::cout << "get " << ret << " bytes of content: " << buf << std::endl;
}
}
}
else {
std::cout << "something else happened" << std::endl;
}
}
}
Re:
https://www.jianshu.com/p/718c24af400f
https://www.bbsmax.com/A/l1dymR3Gde/