IO多路复用
dd

IO多路复用

IO多路复用是一种同步IO模型,实现一个线程可以同时监视多个文件描述符;一旦某个文件描述符就绪,就能够通知应用程序进行相应的读写操作

多路是指多个网络连接,复用指的是同一个线程

Linux提供了select、poll、epoll三种接口函数实现IO多路复用

select&poll是将文件描述符存放到文件描述符集合中,调用select/poll将文件描述符集合拷贝到内核中,内核会遍历文件描述符集合看是否有事件发生,如果有就将对应的文件描述符标记为可读或可写,然后再将整个文件描述符集合拷贝到用户态中,在用户态进行遍历对发生了事件的文件描述符进行处理

epoll是使用一个文件描述符管理多个文件描述符,将用户感兴趣的文件描述符添加到内核中,由内核监听并返回发生了事件的文件描述符

为什么需要IO多路复用

在没有IO多路复用之前有BIO和NIO两种实现方式,但是都有一些问题

BIO同步阻塞

  • 服务端采用单线程,当accept一个请求后调用recv或send被阻塞时,没办法处理其他的请求,必须等待上一个recv或send调用完才可以处理(没办法实现并发,在同一时刻只能处理一个请求或者被阻塞)
  • 如果服务端采用多线程或多进程,在accept一个请求后创建一个相应的线程或进程负责这一个请求,可以实现并发,但是随着请求数量的增加会增大系统的开销(线程进程切换、进程资源的消耗、线程安全等)

NIO同步非阻塞

  • 服务端accept一个请求后加入fds集合,每次轮询一次fds集合recv或send数据,没有数据就立即返回,不会阻塞,但是每次都需要去轮询fds,会消耗性能

IO多路复用

服务端采用单线程通过IO多路复用将需要监听的文件描述符集合交给内核监听,由内核返回发生了事件的文件描述符,能够在同一时刻监听多个文件描述符

主要是IO多路复用能够在一个线程中实现并发操作(同时监听多个文件描述符)

API说明

select

1
2
3
#include <sys/select.h>
#include <sys/time.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

返回:若有就绪描述符则为其个数,超时为0,出错-1并设置errno

参数:除了nfds,其他都通过指针传递,以便于内核可以修改通知应用程序

  • nfds:指定被监听的文件描述符个数,通常为监听的所有文件描述符中的最大值加1
  • readfds\writefds\exceptfds分别对应可读、可写和异常事件文件描述符集合,当调用select时就通过这三个参数传入进程感兴趣的文件描述符,交给内核监听。内核通过修改它们通知应用程序有哪些文件描述符就绪
  • timeout:内核会修改这个参数表示自己等待了多长的时间,但是这个参数时不能够完全信任的,因为调用失败后timeout的值是不确定的
    • 0:立刻返回
    • NULL:一直阻塞直到有文件描述符就绪
    • 其他:指定阻塞等待多长时间
1
2
3
4
5
#include <sys/select.h>
FD_ZERO(fd_set *fdset); /* 清除fdset所有标志位 */
FD_SET(int fd, fd_set fdset); /* 设置fdset标志位fd */
FD_CLR(int fd, fd_set fdset); /* 清除fdset标志位fd */
int FD_ISSET(int fd, fd_set *fdset); /* 测试fdset的位fd是否被设置 */

fd_set结构体包含一个整型数组,数组中每一个元素的每一位代表一个文件描述符,相当于一个位图,fd_set能够容纳的文件描述符数量由FD_SETSIZE指定(限制了select能够监听的文件描述符数量)

poll

1
2
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

返回:若有就绪描述符则为其数目,超时为0,出错-1

参数:

  • nfds:被监听的文件描述符集合fds的大小
  • timeout:
    • -1:一直阻塞直到某个时间发生
    • 0:马上返回
    • 其他:指定poll的超时值
1
2
3
4
5
6
struct pollfd
{
int fd; /* 文件描述符 */
short events; /* 注册的事件 */
short revents; /* 实际发生的事件,有内核填充 */
};

poll支持的事件类型:

image

epoll

1
2
#include <sys/epoll.h>
int epoll_create(int size);

返回:成功返回创建的内核事件表对应的描述符,出错-1

size参数现在并不起作用,只是给内核一个提示,告诉它内核表需要多大,该函数返回的文件描述符将用作其他所有epoll函数的第一个参数,以指定要访问的内核事件表

1
2
#include <sys/epoll.h>
int epoll_ctl(int opfd, int op, int fd, struct epoll_event *event);

返回:成功返回0,出错-1

fd参数是要操作的文件描述符,op指定操作类型,操作类型有3种

  • EPOLL_CTL_ADD:往事件表中注册fd上的事件
  • EPOLL_CTL_MOD:修改fd上的注册事件
  • EPOLL_CTL_DEL:删除fd上的注册事件

event指定事件类型,它是epoll_event结构指针类型。epoll支持的事件类型和poll基本相同,表示epoll事件类型的宏是在poll对应的宏加上”E”,比如epoll的数据可读事件是EPOLLIN,但epoll有两个额外的事件类型-EPOLLET和EPOLLONESHOT

1
2
3
4
5
struct epoll_event
{
__uint32_t events; /* epoll事件 */
epoll_data_t data; /* 用户数据 */
};

data用于存储用户数据

1
2
3
4
5
6
7
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
}epoll_data_t;

epoll_data_t是一个联合体,其4个成员最多使用的是fd,它指定事件所从属的目标文件描述符,ptr成员可用来指定fd相关的用户数据,但由于opoll_data_t是一个联合体,我们不能同时使用fd和ptr,如果要将文件描述符和用户数据关联起来,以实现快速的数据访问,则只能使用其他手段,比如放弃使用fd成员,而在ptr指针指向的用户数据中包含fd

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

返回:成功返回就绪的文件描述符个数,出错-1

timeout参数的含义与poll接口的timeout参数相同,maxevents参数指定最多监听多少个事件,它必须大于0

epoll_wait如果检测到事件,就将所有就绪的事件从内核事件表(由epfd指定)中复制到events指定的数组中

总结

最好都搭配非阻塞IO:

  • 多路复用返回的事件不一定是可读写的,可能有发现数据已经达到通知应用程序,但是由于发现错误的校验和被丢弃,此时就没有数据可读

  • 防止程序发生阻塞

select&poll

区别:

  • select使用固定长度的位图表示文件描述符集合,所支持的文件描述符受到内核FD_SETSIZE限制,默认是1024;poll没有文件描述符数量的限制,是采用动态数组存储
  • select函数在会修改文件描述符集合的内容,没办法复用需要重新设置;poll是通过revents对文件描述符集合进行修改,不需要重新设置
  • select用户关心不同的事件需要添加到不同的文件描述符集合中;poll不需要,是在结构体中直接设置
  • select支持跨平台;poll不支持

缺点:

  • 需要进行两次遍历与两次拷贝
  • 无法动态添加文件描述符,需要等前一个select/poll返回后添加
  • 客户端多个连接但是少数活跃的话会导致效率比较低

epoll

优点:

  • 减少数据拷贝:在内核中使用红黑树对文件描述符进行管理,不需要每次都将整个文件描述符拷贝到内核中,只需要通过epoll_ctl将要变更的文件描述符交给内核
  • 提高检测效率:基于事件机制,有事件发生时通过回调函数添加到就绪链表中,再将就绪链表返回给用户,不需要轮询整个集合

缺点:

连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调去将发生事件的文件描述符添加到就绪链表中(红黑树中的查询)

两种触发模式

LT水平触发(默认)

当检测到有事件发生会将事件通知应用程序,应用程序如果不处理该事件则下次再次调用时,会再次通知应用程序

ET边缘触发

当检测到有事件发生会将此事件通知应用程序,应用程序必须立即处理该事件,如果不处理,下次调用时,不会再次通知应用程序该事件的发生

一般和非阻塞IO配合使用:

  • IO事件只会通知一次,应用程序不知道能够读写多少数据,所以在收到通知后应该尽可能读写数据以免错失读写机会

  • 循环执行非阻塞IO操作知道返回错误没有数据可读写

选择

  • 大流量下使用ET性能会更好,不会频繁调用epoll_wait
  • LT不会丢失数据而且可以通过使用大缓冲机制一次read就读取数据,ET需要一直读取知道返回EAGAIN(至少需要两次read)
    • 当没有数据可读的时候非阻塞IO就会返回EAGAGIN告诉应用程序没有数据可以读了,需要稍后再试
  • LT可以照顾多个连接的公平性,不会因为某个连接数据过大影响其他连接处理消息

API使用

select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include "selectServer.h"
#include <iostream>
#include <sys/select.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <unistd.h>
#include <vector>

using namespace std;

int getMaxNumOfVector(vector<int> &fds) {
int res = 0;
for (int i = 0; i < fds.size(); i++) {
res = max(res,fds[i]);
}
return res;
}

vector<int> flipVector(vector<int> &fds) {
vector<int> newFds;
for (int i = 0; i < fds.size(); i++) {
if (fds[i] != -1) {
newFds.push_back(fds[i]);
}
}
return newFds;
}

int main() {
int listenFd, connFd;
struct sockaddr_in server;
// 绑定服务器并监听
listenFd = socket(AF_INET,SOCK_STREAM,0);
memset(&server,0,sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(8888);
server.sin_addr.s_addr = INADDR_ANY;

bind(listenFd,(struct sockaddr*)&server,sizeof(server));
listen(listenFd,5);

// 初始化参数
fd_set readFd;
fd_set writeFd;
fd_set exceptFd;
char buff[1024];

FD_ZERO(&readFd);
FD_ZERO(&writeFd);
FD_ZERO(&exceptFd);

vector<int> fds;
fds.push_back(STDIN_FILENO);
fds.push_back(listenFd);

bool running = true;

while (running) {
memset(buff,0,sizeof(buff));

// 每次调用select都需要重新初始化readFd和exceptFd中的文件描述符集合
for (int i = 0; i < fds.size(); i++) {
FD_SET(fds[i],&readFd);
if ((fds[i] != STDIN_FILENO) && (fds[i] != listenFd)) {
FD_SET(fds[i],&exceptFd);
}
}

int eventNum = select(getMaxNumOfVector(fds) + 1, &readFd, &writeFd, &exceptFd, NULL);

if (eventNum < 0) {
cerr << "select error" << endl;
break;
}

for (int i = 0; i < fds.size(); i++) {
if (fds[i] == STDIN_FILENO) {
if (FD_ISSET(STDIN_FILENO, &readFd)) {
cin >> buff;
if (strcmp(buff,"quit") == 0) {
running = false;
break;
} else {
cout << buff << endl;
}
}
} else if (fds[i] == listenFd) {
if (FD_ISSET(listenFd, &readFd)) {
connFd = accept(listenFd,NULL,NULL);
if (connFd < 0) {
running = false;
break;
}
fds.push_back(connFd);
cout << "向fds添加" << connFd << ",fds.size:" << fds.size() << endl;
}
} else {
if (FD_ISSET(fds[i],&readFd)) {
int len = recv(fds[i],buff,sizeof(buff)-1,0);
if (len < 0) {
cerr << "recv error" << endl;
} else if (len == 0) {
cout << "从fds删除" << fds[i] << endl;
close(fds[i]);
// 断开连接
fds[i] = -1;
} else {
buff[len] = '\0';
cout << fds[i] << "recv:" << buff << endl;
}
} else if (FD_ISSET(fds[i], &writeFd)) {

} else if (FD_ISSET(fds[i],&exceptFd)) {

}
}
}
fds = flipVector(fds);
}

// 关闭文件描述符
for (int i = 0; i < fds.size(); i++) {
close(fds[i]);
}

return 0;
}

poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include "pollServer.h"
#include <iostream>
#include <vector>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>

using namespace std;

struct pollfd *getPollFd(vector<pollfd> &fds, int *ppoll_size)
{
struct pollfd *poll = (struct pollfd *) malloc(fds.size() * sizeof(struct pollfd));
for (int i = 0; i < fds.size(); i++) {
poll[i].fd = fds[i].fd;
poll[i].events = fds[i].events;
}
*ppoll_size = fds.size();
return poll;
}

vector<pollfd> flipVector(vector<pollfd> &fds) {
vector<pollfd> fdsnew;
for (int i = 0; i < fds.size(); i++) {
if (fds[i].fd != -1) {
fdsnew.push_back(fds[i]);
}
}
return fdsnew;
}

int main() {
// 绑定服务器
int listenFd, connFd;
struct sockaddr_in server;

listenFd = socket(AF_INET,SOCK_STREAM,0);
memset(&server,0, sizeof(server));
server.sin_addr.s_addr = INADDR_ANY;
server.sin_port = htons(8888);
server.sin_family = AF_INET;

bind(listenFd,(struct sockaddr*)&server, sizeof(server));
listen(listenFd,5);

struct pollfd poll_fd;
vector<struct pollfd> fds;

poll_fd.fd = STDIN_FILENO;
poll_fd.events = POLLIN;
fds.push_back(poll_fd);

char buff[1024];

struct pollfd *ppoll = nullptr;
int poll_size = 0;
ppoll = getPollFd(fds,&poll_size);

bool running = true;

while (running) {
int oldSize = fds.size();
memset(buff,0, sizeof(buff));

int eventNum = poll(ppoll,poll_size,-1);

if (eventNum < 0) {
cerr << "poll error" << endl;
break;
}

int fds_size = fds.size();
for (int i = 0; i < fds_size; i++) {
if (ppoll[i].fd == STDIN_FILENO) {
if (ppoll[i].revents & POLLIN) {
cin >> buff;
if (strcmp(buff,"quit") == 0) {
running = false;
break;
} else {
cout << buff << endl;
}
}
} else if (ppoll[i].fd == listenFd) {
if (ppoll[i].revents & POLLIN) {
connFd = accept(listenFd,NULL,NULL);
if (connFd < 0) {
running = false;
break;
}
poll_fd.fd = connFd;
poll_fd.events = POLLIN;
fds.push_back(poll_fd);
cout << "向fds添加" << connFd << endl;
}
} else {
if (ppoll[i].revents & POLLIN) {
int len = recv(ppoll[i].fd,buff, sizeof(buff)-1,0);
if (len < 0) {
cerr << "recv error" << endl;
break;
} else if (len == 0) {
cout << "从fds删除" << fds[i].fd << endl;
close(fds[i].fd);
fds[i].events = 0;
fds[i].fd = -1;
} else {
buff[len] = '\0';
cout << fds[i].fd << "recv:" << buff << endl;
}
}
}
}
fds = flipVector(fds);
if (oldSize != fds.size()) {
free(ppoll);
ppoll = getPollFd(fds,&poll_size);
}
}

for (int i = 0; i < fds.size(); i++) {
if (fds[i].fd != -1) {
close(fds[i].fd);
}
}
return 0;

}

epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include "epollServer.h"
#include <iostream>

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <string.h>

using namespace std;

void addfd(int epollfd, int fd)
{
epoll_event event;

event.data.fd = fd;
event.events = EPOLLIN;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
}

void delfd(int epollfd, int fd)
{
epoll_event event;

event.data.fd = fd;
event.events = EPOLLIN;
epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &event);
}

int main(int argc, char **argv)
{
int listenfd, connfd;
struct sockaddr_in servaddr;

listenfd = socket(AF_INET, SOCK_STREAM, 0);

memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8080);
servaddr.sin_addr.s_addr = INADDR_ANY;

bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
listen(listenfd, 5);

int epollfd = epoll_create(32);
if (epollfd < 0) {
cerr << "epoll_create error" << endl;
exit(-1);
}

addfd(epollfd, STDIN_FILENO);
addfd(epollfd, listenfd);

epoll_event events[32];

char buff[1024];
bool running = true;
while (running) {
buff[0] = '\0';

int event_num = epoll_wait(epollfd, events, 32, -1);
if (event_num < 0) {
cerr << "epoll_wait error" << endl;
break;
}

for (int i = 0; i < event_num; i++) {
int fd = events[i].data.fd;
int event = events[i].events;

if (fd == STDIN_FILENO) {
// 从STDIN_FILENO中读取数据
if (event & EPOLLIN) {
cin >> buff;
if (strcmp(buff, "quit") == 0) {
running = false;
break;
}
else {
cout << buff << endl;
}
}
}
else if (fd == listenfd) {
if (event & EPOLLIN) {
connfd = accept(listenfd, NULL, NULL);
if (connfd < 0) {
running = false;
break;
}

addfd(epollfd, connfd);
cout << "往epoll添加 " << connfd << endl;
}
}
else {
if (event & EPOLLIN) {
int len = recv(fd, buff, sizeof(buff) - 1, 0);
if (len < 0) {
cerr << "recv error" << endl;
}
else if (len == 0) {
cout << "从epoll删除 " << fd << endl;
// 客户端断开了连接
delfd(epollfd, fd);
}
else {
buff[len] = '\0';
cout << fd << " recv: " << buff << endl;
}
}
}
}
}

// 关闭文件描述符
close(listenfd);

return 0;
}