I/O多路复用
I/O模型
https://vcvvvc.github.io/post/server_actor/
I/O多路复用
Select、Poll、Epoll, 多路是指?多个业务方(句柄)并发下来的 IO 。 复用是指?复用这一个后台处理程序。
Select
select()函数主要解决的是accept()函数阻塞问题,而没有解决recv()和send()函数阻塞问题
#include <sys/select.h>
int select(int nfds, fd_set *readset, fd_set *writeset, fd_set *exceptset,struct timeval *timeout);
-
nfds参数指定被监听的文件描述符的总数。
-
readfds, writefds和exceptfds参数分别指向可读、可写和异常等事件对应的文件描述符集合。
#include <sys/select.h>
int FD_ZERO(int fd, fd_set *fdset); //一个 fd_set类型变量的所有位都设为 0
int FD_CLR(int fd, fd_set *fdset); //清除某个位时可以使用
int FD_SET(int fd, fd_set *fd_set); //设置变量的某个位置位
int FD_ISSET(int fd, fd_set *fdset); //测试某个位是否被置位
- timeout参数用来设置select函数的超时时间
struct timeval{
long tv_sec; /*秒 */
long tv_usec; /*微秒 */
}
select示例代码: select.cpp
Poll
#include <poll.h>
int poll(struct pollfd *ufds, unsigned int nfds, int timeout);
struct pollfd {
int fd; //文件描述符
short events; //要求查询的事件掩码
short revents; //返回的事件掩码
};
-
ufds是一个数组,即poll函数可以监视多个文件描述符。
-
nfds为监听事件集合fds的大小
-
timeout 为poll的超时时间,单位毫秒。timeout 为-1时,poll永远阻塞,直到有事件发生。timeout为0时,poll立即返回。
poll示例代码: poll.cpp
Epoll
epoll示例代码: epoll.cpp
1. 创建一个epoll的句柄
int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2. 将被监听的描述符添加到epoll句柄或从epool句柄中删除或者对监听事件进行修改
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); //op为注册事件
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
- EPOLL_CTL_ADD 注册新的fd到epfd中;
- EPOLL_CTL_MOD 修改已经注册的fd的监听事件;
- EPOLL_CTL_DEL 从epfd中删除一个fd;
3. 等待事件触发,当超过timeout还没有事件触发时,就超时
int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);
等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中, 并且将注册在epfd上的socket fd的事件类型给清空
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
-
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
-
EPOLLOUT:表示对应的文件描述符可以写;
-
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
-
EPOLLERR:表示对应的文件描述符发生错误;
-
EPOLLHUP:表示对应的文件描述符被挂断;
-
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
-
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
触发模式
epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
- 水平触发(LT):默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件
//LevelTriggered(LT) //缺省工作方式,即默认的工作方式,支持blocksocket和no_blocksocket,错误率比较小。
- 边缘触发(ET): 当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时只通知一次)。
//Edge Triggered(ET) //高速工作方式,错误率比较大,只支持no_block socket (非阻塞socket)
假设现在对方发送了2k的数据,而我们先读取了1k,然后这时调用了epoll_wait,如果是边沿触发ET,那么这个fd变成就绪状态就会从epoll 队列移除,
则epoll_wait 会一直阻塞,忽略尚未读取的1k数据; 而如果是水平触发LT,那么epoll_wait 还会检测到可读事件而返回,我们可以继续读取剩下的1k 数据。
总结: LT模式可能触发的次数更多, 一旦触发的次数多, 也就意味着效率会下降; 但这样也不能就说LT模式就比ET模式效率更低
因为ET的使用对编程人员提出了更高更精细的要求,一旦使用者编程水平不够, 那ET模式还不如LT模式。
ET模式仅当状态发生变化的时候才获得通知,这里所谓的状态的变化并不包括缓冲区中还有未处理的数据,
也就是说,如果要采用ET模式,需要一直read/write直到出错为止,很多人反映为什么采用ET模式只接收了一部分数据就再也得不到通知了,大多因为这样;
而LT模式是只要有数据没有处理就会一直通知下去的.
epoll IO多路复用模型实现机制
设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发? 在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。 epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分:
-
调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
-
调用epoll_ctl向epoll对象中添加这100万个连接的套接字
-
调用epoll_wait收集发生的事件的连接
只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。
Linux内核具体的epoll机制实现思路
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关
/*
* This structure is stored inside the "private_data" member of the file
* structure and rapresent the main data sructure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the this structure access */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdllist;
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
/* RB tree root used to store monitored fd structs */
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transfering ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
};
每一个epoll对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高效的识别出来(红黑树的插入时间效率是lgn,其中n为树的高度)。
而所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当相应的事件发生时会调用这个回调方法。这个回调方法在内核中叫ep_poll_callback,它会将发生的事件添加到rdlist双链表中。
在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
*/
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
//红黑树节点
struct rb_node rbn;
/* List header used to link this structure to the eventpoll ready list */
//双向链表节点
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;
/* The file descriptor information this item refers to */
//事件句柄信息
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
//指向其所属的eventpoll对象
struct
![Uploading EPOLL_663944.jpg . . .]
eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* The structure that describe the interested events and the source fd */
//期待发生的事件类型
struct epoll_event event;
};
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。
通过红黑树和双链表数据结构,并结合回调机制,造就了epoll的高效。
代码示例
#include <iostream>
#include <unistd.h>
#include <cstring>
#include <cassert>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <fcntl.h>
const int MAX_EVENT_NUMBER = 1024;
const int BUFFER_SIZE = 10;
int setnonblocking(int fd);
void addfd(int epollfd, int fd, bool enable_et);
void lt(epoll_event *events, int number, int epollfd, int listenfd);
void et(epoll_event *events, int number, int epollfd, int listenfd);
int main(int argc, char **argv)
{
int port = 20999;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd != -1);
int reuse = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
int ret = bind(listenfd, (struct sockaddr*) &addr, sizeof(addr));
assert(ret != -1);
ret = listen(listenfd, 10);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd > 0);
addfd(epollfd, listenfd, true);
while (true) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if (ret < 0) {
std::cout << "epoll failed " << std::endl;
break;
}
//lt(events, ret, epollfd, sockfd); //LT模式
et(events, ret, epollfd, listenfd); //ET模式
}
return 0;
}
//设置非阻塞文件描述符
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
//将描述符fd的EPOLLIN注册到epollfd提示的epoll内核事件中,参数enable_et指定是否启用ET模式
void addfd(int epollfd, int fd, bool enable_et)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if (enable_et) {
event.events |= EPOLLET;
}
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
//LT模式
void lt(epoll_event *events, int number, int epollfd, int listenfd)
{
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, false);
}
else if (events[i].events & EPOLLIN) { //只要socket读缓存中还有未读出的数据,就会被触发
std::cout << "event trigger once" << std::endl;
memset(buf, '\0', BUFFER_SIZE);
int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
if (ret <= 0) {
close(sockfd);
continue;
}
std::cout << "get: " << ret << " bytes of content: " << buf << std::endl;
}
else {
std::cout << "something else happened" << std::endl;
}
}
}
//ET模式
void et(epoll_event *events, int number, int epollfd, int listenfd)
{
char buf[BUFFER_SIZE];
for (int i = 0; i < number; i++) {
int sockfd = events[i].data.fd;
if (sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, true);
}
else if (events[i].events & EPOLLIN) {
std::cout << "event trigger once" << std::endl;
int ret = 0;
//因为ET模式不会重复触发,所以我们要循环读取所有数据
while (true) {
memset(buf, '\0', BUFFER_SIZE);
ret = recv(sockfd, buf, BUFFER_SIZE-1, 0);
if (ret < 0) {
//对于非阻塞I/O,下面的条件成立时表示数据已全部读取完毕
if (errno == EAGAIN || errno == EWOULDBLOCK) {
std::cout << "read later!" << std::endl;
break;
}
close(sockfd);
break;
}
else if (ret == 0) {
close(sockfd);
}
else {
std::cout << "get " << ret << " bytes of content: " << buf << std::endl;
}
}
}
else {
std::cout << "something else happened" << std::endl;
}
}
}
三种概念
- 1)阻塞I/O
进程发起read后,如果kernel未准备好数据,进程就会被block,进入等待阶段;
等待kernel将数据准备好,才会返回数据,解除阻塞;
- 2)非阻塞I/O
进程发起read请求,kernel未准备好数据,直接返回一个error,进程无需阻塞等待;
进程会轮询发送read请求,如此往复;
一直到kernel准备好数据,才把数据拷贝并返回给进程,结束轮询;
- 3)I/O多路复用
进程调用select/poll/epoll,select/poll/epoll会将进程block起来;
kernel会‘监视’所有select负责的socket;
当任何一个socket准备好数据,select就会返回(也就是图中的return medable);
进程调用read(图中第二次 system call),将数据从kernel拷贝到用户进程;
总结:所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。
Re:
select
https://www.cnblogs.com/skyfsm/p/7079458.html
https://segmentfault.com/a/1190000019207061
poll
https://blog.csdn.net/oqqYuJi12345678/article/details/106313768
epoll
https://blog.csdn.net/lingfengtengfei/article/details/12398299
ot