这是近期阅读《linux高性能服务器编程》里面描述如何使用I/O复用将系统上定时器、信号、I/O事件进行I/O复用,提高运行效率,以及两种事件处理模式、两种并发编程模式、高级I/O函数,此外还有libevent如何创立,和多进程、多线程之间共享内存、资源锁使用,以及调试。
主要说明下
- I/O复用
- 两种高效事件处理模式
- 两种高效并发模式
- libevent
1.I/O复用
I/O复用使得程序能监听多个文件描述符,本身是阻塞的,不同的是将内核的监听转化到用户中,当多个文件描述符同时就绪时,只能以此处理每个文件描述符,因此是串行工作,如果要并发则需要配合多进程、多线程实现
1.2 select使用:TCP无阻塞connect
TCP中连接一个TCP时通常是通过阻塞使连接,是一种异步过程,因此消耗系统资源,int select(int nfds,fd_set *readfds,fd_set * writefds,fd_set* exceptfds,struct timeval*timeout)
- nfds:被监听文件描述符总数,通常为监听文件描述符最大值加一
- readfds、writefds、exceptfds分别指向可读、可写、异常等事件文件描述符集合
1 |
|
1.2 poll:聊天室程序
poll和select类似,也是在指定事件轮询一定数量的文件描述符,
int poll(struct pollfd * fds,nfds_t nfds,int timeoout)
- fds是一个结构体:
1
2
3
4
5struct pollfd{
int fd;
short events;//告诉需要监听事件 POLLIN数据可读、POLLOUT数据可写、POLLERR错误
short revents;//以发生监听的事件
};
客户端实现
1 |
|
服务器实现
1 |
|
1.3 epoll:TCP、UDP同时处理
epoll能返回就绪的文件描述符,不用通过查找获得
- epoll_creat
- epoll_ctl
- epoll_wait(立即处理与稍后处理)
1 |
|
2.两种高效的时间处理模式
实际上Reactor属于同步模式,而Proactor属于异步模式,Proactor通过回调函数的预函数来处理
Reactor模式
同步模型用于实现Reactor模式,要求主线程只负载监听文件描述上是否有事件发生,有的话立即通知工作线程,除此之外主线程不做任何实质性工作:
- 主线程往epoll内核事件表中注册socket上的读就绪事件
- 主线程调用epoll_wait等待socket上有数据可读
- 当socket上有数据可读时,将socket可读事件放入请求列表
- 睡眠在请求列表上的某个工作线程被唤醒,从socket上读取数据,并处理客户请求,并往epoll内核事件表中注册该socket写就绪事件
- 主线程调用epoll_wait通知socket可写
- 当socket上有数据可写时,将socket可写事件放入请求列表
同样唤醒一个请求列表上的线程,执行工作
proactor模式
Reactor是将所有I/O操作交给内核处理,而异步proactor不是:
主线程调用aio_read(aio_write)向内核注册socket上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序
- 主线程继续处理其他逻辑
- 当socket上的数据被读入用户缓冲区后,内核将线应用程序发送一个信号,以通知应用程序该数据可读
- 应用程序预定义信号处理函数选择一个工作线程处理客户请求,以通知该应用程序数据已经可用
- 主线程继续处理其他逻辑
- 当用户缓冲区数据写入socket后,内核向应用程序发送信号,通知应用程序数据发送完毕
同样,应用程序通过预定义处理函数选择一个工作线程处理数据
3.两种高效的并发模式
如果程序是计算密集型,并发没有优势,如果是I/O密集型,由于I/O处理速度没有CPU快,程序阻塞I/O会使浪费大量CPU时间,并发中同步和异步与I/O中同步异步不同,“同步”是按照代码顺序执行,“异步”是指事件驱动型程序(中断、信号)
半同步/半异步模式
主线程负责监听socket,连接socket由工作线程完成
缺点:同一事件客户请求较多时,工作线程较少,客户端相应会越来越慢
领导者/追随者模式
通过多个工作线程轮流获得事件源集合,轮流负责监听、分发并处理事件的模式。在任意时间点都只有一个领导者,负责监听I/O事件。如果当前领导者检测到I/O事件,从线程池中推选新的领导者线程,然后自身处理I/O事件,然后新的领导者处理新的I/O事件。其中包含几个组件,用于切换时不会重新初始化I/O事件
有限状态机
有限状态机是内部逻辑单元一种高效的协调模式(处理协议消息数据),逐渐发展为行为树(待坑)
4.libevent
- 统一事件源:I/O事件、信号、定时事件
- 可移植性:不同复用模式
- 对并发编程支持
简单的说libevent核心是event loop,指不停在循环运行的事件;libevent通过分配和注册watch对不同类型事件进行监听,监听触发时执行相应操作
基本操作函数包括:event_init
、event_add
、event_base_dispatch
、evsignal_new
、event_free
、event_base_free
1 |
|
Bufferevent 结合socket的event方法调用
传统的libevent使用方法:bufferevent_socket_new
、bufferevent_socket_connect
、bufferevent_setcb
、bufferevent_enable
- 当需要放数据的时候,存入数据到buffer
- 等待socket可写
- 尽量向socket中写更多的data
- 如果还有data未写入,则再等待socket可写
客户端实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
int tcp_connect_server(const char* server_ip, int port);
void cmd_msg_cb(int fd, short events, void* arg);
void server_msg_cb(struct bufferevent* bev, void* arg);
void event_cb(struct bufferevent *bev, short event, void *arg);
int main(int argc, char** argv) {
if( argc < 3 ) {
//两个参数依次是服务器端的IP地址、端口号
printf("please input 2 parameter\n");
return -1;
}
struct event_base *base = event_base_new();
struct bufferevent* bev = bufferevent_socket_new(base, -1,
BEV_OPT_CLOSE_ON_FREE);
//监听终端输入事件
struct event* ev_cmd = event_new(base, STDIN_FILENO,
EV_READ | EV_PERSIST,
cmd_msg_cb, (void*)bev);
event_add(ev_cmd, NULL);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr) );
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
inet_aton(argv[1], &server_addr.sin_addr);
bufferevent_socket_connect(bev, (struct sockaddr *)&server_addr,
sizeof(server_addr));
bufferevent_setcb(bev, server_msg_cb, NULL, event_cb, (void*)ev_cmd);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
event_base_dispatch(base);
printf("finished \n");
return 0;
}
//用户输入事件
void cmd_msg_cb(int fd, short events, void* arg) {
char msg[1024];
int ret = read(fd, msg, sizeof(msg));
if( ret < 0 ) {
perror("read fail ");
exit(1);
}
struct bufferevent* bev = (struct bufferevent*)arg;
//把终端的消息发送给服务器端
bufferevent_write(bev, msg, ret);
}
//网络消息事件
void server_msg_cb(struct bufferevent* bev, void* arg){
char msg[1024];
size_t len = bufferevent_read(bev, msg, sizeof(msg));
msg[len] = '\0';
printf("recv %s from server\n", msg);
}
//连接事件
void event_cb(struct bufferevent *bev, short event, void *arg){
if (event & BEV_EVENT_EOF)
printf("connection closed\n");
else if (event & BEV_EVENT_ERROR)
printf("some other error\n");
else if( event & BEV_EVENT_CONNECTED) {
printf("the client has connected to server\n");
return ;
}
//这将自动close套接字和free读写缓冲区
bufferevent_free(bev);
struct event *ev = (struct event*)arg;
event_free(ev);
}
服务器实现
1 |
|