linux高性能编程-笔记

这是近期阅读《linux高性能服务器编程》里面描述如何使用I/O复用将系统上定时器、信号、I/O事件进行I/O复用,提高运行效率,以及两种事件处理模式、两种并发编程模式、高级I/O函数,此外还有libevent如何创立,和多进程、多线程之间共享内存、资源锁使用,以及调试。

书籍思维导图

主要说明下

  • I/O复用
  • 两种高效事件处理模式
  • 两种高效并发模式
  • libevent

1.I/O复用

I/O复用使得程序能监听多个文件描述符,本身是阻塞的,不同的是将内核的监听转化到用户中,当多个文件描述符同时就绪时,只能以此处理每个文件描述符,因此是串行工作,如果要并发则需要配合多进程、多线程实现

区别

1.2 select使用:TCP无阻塞connect

TCP中连接一个TCP时通常是通过阻塞使连接,是一种异步过程,因此消耗系统资源,int select(int nfds,fd_set *readfds,fd_set * writefds,fd_set* exceptfds,struct timeval*timeout)

  • nfds:被监听文件描述符总数,通常为监听文件描述符最大值加一
  • readfds、writefds、exceptfds分别指向可读、可写、异常等事件文件描述符集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <time.h>
#include <sys/ioctl.h>

#define BUFFR_SIZZE 1023
//设置文件描述符为非阻塞
int setnoblocking(int fd){
int old_option=fcntl(fd,F_GETFL);
int new_option=old_option|O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}
//非阻塞连接
int unblock_connect(const char *ip,int port,int time){
int ret=0;
struct sockaddr_in address;
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port=htons(port);

int sockfd=socket(PF_INET,SOCK_STREAM,0);
int fdopt=setnoblocking(sockfd);
ret=connect(sockfd,(struct sockaddr*)&address,sizeof(address));
if(ret==0){
printf("connect with server immediately\n");
fcntl(sockfd,F_SETFL,fdopt);
return sockfd;
}else if(errno!=EINPROGRESS){
printf("unlock connect not support\n");
return -1;
}
fd_set readfds;
fd_set writefds;
struct timeval timeout;
FD_ZERO(&readfds);
FD_SET(sockfd, &writefds);//当有连接时TCP端被应答,因此是写有状态三次握手

timeout.tv_sec=time;
timeout.tv_usec=0;

ret=select(sockfd+1,NULL,&writefds,NULL,&timeout);//I/O复用
if(ret<=0){
printf("connect tie out\n");
close(sockfd);
return -1;
}
if(!FD_ISSET(sockfd,&writefds)){
printf("no event on sockfd found\n");
close(sockfd);
return -1;
}
int error=0;
socklen_t length=sizeof(error);//清除sockfd上的错误
if(getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &length)<0){
printf("get socket option failed\n");
close(sockfd);
return -1;
}
if(error!=0){
printf("connection failed after select with the error:%d\n",error);
close(sockfd);
return -1;
}
printf("connection ready after select with th socket:%d\n",sockfd);
fcntl(sockfd, F_SETFL,fdopt);
return sockfd;
}

int main(int argc, char const *argv[])
{
if(argc<=2){
printf("usage: %s address port",argv[0]);
return 1;
}
const char *ip=argv[1];
int port=atoi(argv[2]);

int sockfd=unblock_connect(ip,port,10);
if(sockfd<0)return 1;
close(sockfd);
return 0;
}

1.2 poll:聊天室程序

poll和select类似,也是在指定事件轮询一定数量的文件描述符,int poll(struct pollfd * fds,nfds_t nfds,int timeoout)

  • fds是一个结构体:
    1
    2
    3
    4
    5
    struct pollfd{
    int fd;
    short events;//告诉需要监听事件 POLLIN数据可读、POLLOUT数据可写、POLLERR错误
    short revents;//以发生监听的事件
    };

客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <fcntl.h>
#include <unistd.h>

#define BUFFER_SIZE 64

int main(int argc, char const *argv[])
{
if(argc<=2){
printf("usage: %s address port",argv[0]);
return 1;
}
const char *ip=argv[1];
int port=atoi(argv[2]);

struct sockaddr_in address;
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port=htons(port);
int sockfd=socket(PF_INET,SOCK_STREAM,0);
assert(sockfd>=0);
if(connect(sockfd, (struct sockaddr*)&address, sizeof(address))<0){
printf("connection failed\n");
close(sockfd);
return -1;
}
pollfd fds[2];//注册事件
fds[0].fd=0;//终端输入
fds[0].events=POLLIN;//有写入事件
fds[0].revents=0;
fds[1].fd=sockfd;//TCP事件
fds[1].events=POLLIN|POLLRDHUP;//关闭事件
fds[1].revents=0;

char read_buff[BUFFER_SIZE];
int pipefd[2];
int ret=pipe(pipefd);
assert(ret!=-1);
while(1){
ret=poll(fds,2,-1);
if(ret<0){
printf("poll failed\n");
break;
}
if(fds[1].revents&POLLRDHUP){
printf("server close the connection\n");
break;
}else if(fds[1].revents&POLLIN){
memset(read_buff,0x0,BUFFER_SIZE);
recv(fds[1].fd,read_buff,BUFFER_SIZE-1,0);
printf("%s\n", read_buff);
}
if(fds[0].revents&POLLIN){//终端输入使用splice零拷贝定向至发送区(不用send)
ret=splice(0,NULL,pipefd[1],NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);
ret=splice(pipefd[0],NULL,sockfd,NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);
}
}
close(sockfd);
return 0;
}

服务器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>

#define USER_LIMIT 5 //最大客户端
#define BUFFER_SIZE 64
#define FD_LIMIT 65535

struct client_data{
sockaddr_in address;
char * write_buf;
char buff[BUFFER_SIZE];
};
int setnoblocking(int fd){
int old_option=fcntl(fd,F_GETFL);
int new_option=old_option|O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}

int main(int argc, char const *argv[])
{
if(argc<=2){
printf("usage: %s address port",argv[0]);
return 1;
}
const char *ip=argv[1];
int port=atoi(argv[2]);

struct sockaddr_in address;
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port=htons(port);
int listenfd=socket(PF_INET,SOCK_STREAM,0);
assert(listenfd>=0);
int ret=0;
ret=bind(listenfd,(struct sockaddr*)(&address),sizeof(struct sockaddr));
assert(ret!=-1);
ret=listen(listenfd,USER_LIMIT);
assert(ret!=-1);
client_data* users=new client_data[FD_LIMIT];//空间换区时间,不用每次加入用户时为期分配输入内容缓存,可用哈希表替代减少申请的内存量

pollfd fds[USER_LIMIT+1];
int users_counter=0;
for(int i=1;i<=USER_LIMIT;i++){
fds[i].fd=-1;
fds[i].events=0;
}
fds[0].fd=listenfd;//监听事件
fds[0].events=POLLIN|POLLERR;
fds[0].revents=0;
while(1){
ret=poll(fds,users_counter+1,-1);
if(ret<0){
printf("poll failed\n");
break;
}
for(int i=0;i<users_counter+1;++i){
if(fds[i].fd==listenfd&&fds[i].revents&POLLIN){//有新客户
struct sockaddr_in client_address;
socklen_t client_addrlength=sizeof(client_address);
int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
if(connfd<0){
printf("errno in %d\n", errno);
continue;
}
if(users_counter>=USER_LIMIT){
const char *info="too mant users\n";
printf("%s\n",info);
send(connfd,info,strlen(info),0);
close(connfd);
continue;
}
users_counter++;
users[connfd].address=client_address;
setnoblocking(connfd);//新用户分配
fds[users_counter].fd=connfd;
fds[users_counter].events=POLLIN|POLLRDHUP|POLLERR;\\用户可能有消息、断开
fds[users_counter].revents=0;
printf("comes a new user,now room have %d users\n",users_counter);
}else if(fds[i].revents&POLLERR){
printf("get a error from %d\n",fds[i].fd);
continue;
}else if(fds[i].revents& POLLRDHUP){\\断开用户
users[fds[i].fd]=users[fds[users_counter].fd];
close(fds[i].fd);
fds[i]=fds[users_counter];
i--;
users_counter--;
printf("a client left\n");
}else if(fds[i].revents&POLLIN){
int connfd=fds[i].fd;
memset(users[connfd].buff,0x0,BUFFER_SIZE);
ret=recv(connfd,users[connfd].buff,BUFFER_SIZE-1,0);
printf("get %d bytes of client data from %d\n",ret,connfd);
if(ret<0){
if(errno!=EAGAIN){
close(connfd);
users[fds[i].fd]=users[fds[users_counter].fd];
fds[i]=fds[users_counter];
i--;
users_counter--;
}
}else {
for(int j=1;j<=users_counter;j++){
if(fds[j].fd==connfd)continue;
fds[j].events|=~POLLIN;
fds[j].events|=POLLOUT;
users[fds[j].fd].write_buf=users[connfd].buff\\每个用户通知有新消息,通过指针获得
}
}
}else if(fds[i].revents&POLLOUT){
int connfd=fds[i].fd;
if(!users[connfd].write_buf)continue;
ret=send(connfd,users[connfd].write_buf,strlen(users[connfd].write_buf),0);//向用户发送消息
users[connfd].write_buf=NULL;
fds[i].events|=~POLLOUT;
fds[i].events|=POLLIN;
}

}
}
delete []users;
close(listenfd);
return 0;
}

1.3 epoll:TCP、UDP同时处理

epoll能返回就绪的文件描述符,不用通过查找获得

  • epoll_creat
  • epoll_ctl
  • epoll_wait(立即处理与稍后处理)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define USER_LIMIT 5
#define TCP_BUFFER_SIZE 512
#define UDP_BUFFER_SIZE 1024
//设置文件描述符非阻塞
int setnoblocking(int fd){
int old_option=fcntl(fd,F_GETFL);
int new_option=old_option|O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}
//添加文件描述符
void addfd(int epollfd,int fd){
epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN|EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnoblocking(fd);
}

int main(int argc, char const *argv[])
{
if(argc<=2){
printf("usage: %s address port",argv[0]);
return 1;
}
const char *ip=argv[1];
int port=atoi(argv[2]);

//TCP
struct sockaddr_in address;
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port=htons(port);
int listenfd=socket(PF_INET,SOCK_STREAM,0);
assert(listenfd>=0);
int ret=0;
ret=bind(listenfd,(struct sockaddr*)(&address),sizeof(struct sockaddr));
assert(ret!=-1);
ret=listen(listenfd,USER_LIMIT);
assert(ret!=-1);
//UDP
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
inet_pton(AF_INET,ip,&address.sin_addr);
address.sin_port=htons(port);
int udpfd=socket(PF_INET,SOCK_DGRAM,0);
assert(udpfd>=0);
ret=bind(udpfd,(struct sockaddr*)(&address),sizeof(struct sockaddr));
assert(ret!=-1);

epoll_event events[MAX_EVENT_NUMBER];
int epollfd=epoll_create(2+USER_LIMIT);//待连接用户的文件描述符和TCP、UDP连接文件描述符
assert(epollfd!=-1);
addfd(epollfd, listenfd);
addfd(epollfd, udpfd);
while(1){
int number=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
if(number<0){
printf("epoll failure\n");
break;
}
for(int i=0;i<number;i++){
int sockfd=events[i].data.fd;
if(sockfd==listenfd){//TCP连接
struct sockaddr_in client_address;
socklen_t client_length =sizeof(client_address);
int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_length);
addfd(epollfd, connfd);//将新用户文件描述符添加
}else if(sockfd==udpfd){//UDP连接
char buff[UDP_BUFFER_SIZE];
memset(buff,0x0,UDP_BUFFER_SIZE);
struct sockaddr_in client_address;
socklen_t client_length=sizeof(client_address);
ret=recvfrom(udpfd,buff,UDP_BUFFER_SIZE-1,0,(struct sockaddr*)&client_address,&client_length);
if(ret>0){
sendto(udpfd,buff,UDP_BUFFER_SIZE-1,0,(struct sockaddr*)&client_address,client_length);
}
}else if(events[i].events&EPOLLIN){//用户消息
char buff[TCP_BUFFER_SIZE];
while(1){
memset(buff,0x0,TCP_BUFFER_SIZE);
ret=recv(sockfd,buff,TCP_BUFFER_SIZE-1,0);
if(ret<0){
if((errno==EAGAIN)||(errno==EWOULDBLOCK))break;
close(sockfd);
break;
}
else if(ret==0)close(sockfd);
else send(sockfd,buff,ret,0);
}
}else {
printf("other thing happend?\n");
}
}
}
close(listenfd);
return 0;
}

2.两种高效的时间处理模式

实际上Reactor属于同步模式,而Proactor属于异步模式,Proactor通过回调函数的预函数来处理

Reactor模式

同步模型用于实现Reactor模式,要求主线程只负载监听文件描述上是否有事件发生,有的话立即通知工作线程,除此之外主线程不做任何实质性工作:

  1. 主线程往epoll内核事件表中注册socket上的读就绪事件
  2. 主线程调用epoll_wait等待socket上有数据可读
  3. 当socket上有数据可读时,将socket可读事件放入请求列表
  4. 睡眠在请求列表上的某个工作线程被唤醒,从socket上读取数据,并处理客户请求,并往epoll内核事件表中注册该socket写就绪事件
  5. 主线程调用epoll_wait通知socket可写
  6. 当socket上有数据可写时,将socket可写事件放入请求列表
  7. 同样唤醒一个请求列表上的线程,执行工作

    Reactor

    proactor模式

    Reactor是将所有I/O操作交给内核处理,而异步proactor不是:

  8. 主线程调用aio_read(aio_write)向内核注册socket上的读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序

  9. 主线程继续处理其他逻辑
  10. 当socket上的数据被读入用户缓冲区后,内核将线应用程序发送一个信号,以通知应用程序该数据可读
  11. 应用程序预定义信号处理函数选择一个工作线程处理客户请求,以通知该应用程序数据已经可用
  12. 主线程继续处理其他逻辑
  13. 当用户缓冲区数据写入socket后,内核向应用程序发送信号,通知应用程序数据发送完毕
  14. 同样,应用程序通过预定义处理函数选择一个工作线程处理数据

    Proactor

3.两种高效的并发模式

如果程序是计算密集型,并发没有优势,如果是I/O密集型,由于I/O处理速度没有CPU快,程序阻塞I/O会使浪费大量CPU时间,并发中同步和异步与I/O中同步异步不同,“同步”是按照代码顺序执行,“异步”是指事件驱动型程序(中断、信号)

半同步/半异步模式

主线程负责监听socket,连接socket由工作线程完成
缺点:同一事件客户请求较多时,工作线程较少,客户端相应会越来越慢

领导者/追随者模式

通过多个工作线程轮流获得事件源集合,轮流负责监听、分发并处理事件的模式。在任意时间点都只有一个领导者,负责监听I/O事件。如果当前领导者检测到I/O事件,从线程池中推选新的领导者线程,然后自身处理I/O事件,然后新的领导者处理新的I/O事件。其中包含几个组件,用于切换时不会重新初始化I/O事件

有限状态机

有限状态机是内部逻辑单元一种高效的协调模式(处理协议消息数据),逐渐发展为行为树(待坑)

4.libevent

git源码

  • 统一事件源:I/O事件、信号、定时事件
  • 可移植性:不同复用模式
  • 对并发编程支持
    简单的说libevent核心是event loop,指不停在循环运行的事件;libevent通过分配和注册watch对不同类型事件进行监听,监听触发时执行相应操作

基本操作函数包括:event_initevent_addevent_base_dispatchevsignal_newevent_freeevent_base_free

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#include <sys/signal.h>
#include <stdint.h>
#include <event.h>

void signal_cb(int fd,short event,void *argc){//回调函数
struct event_base *base=(event_base *)argc;
struct timeval delay={2,0};
printf("Caught an interrupt signal");
event_base_loopexit(base,&delay);
}

void timeout_cb(int fd,short event,void *argc){//回调函数
printf("timeout....\n");
}

int main(int argc, char const *argv[])
{
struct event_base *base=event_init();
struct event*signal_event=evsignal_new(base,SIGINT,signal_cb,base);
event_add(signal_event, NULL);

timeval tv={1,0};
struct event *timeout_event=evtimer_new(base,timeout_cb,NULL);
event_add(timeout_event, &tv);//添加事件

event_base_dispatch(base);//开始监听事件
event_free(timeout_event);
event_free(signal_event);
event_base_free(base);

return 0;
}

Bufferevent 结合socket的event方法调用

传统的libevent使用方法:bufferevent_socket_newbufferevent_socket_connectbufferevent_setcbbufferevent_enable

  1. 当需要放数据的时候,存入数据到buffer
  2. 等待socket可写
  3. 尽量向socket中写更多的data
  4. 如果还有data未写入,则再等待socket可写

    客户端实现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    #include<sys/types.h>  
    #include<sys/socket.h>
    #include<netinet/in.h>
    #include<arpa/inet.h>
    #include<errno.h>
    #include<unistd.h>

    #include<stdio.h>
    #include<string.h>
    #include<stdlib.h>

    #include<event.h>
    #include<event2/bufferevent.h>
    #include<event2/buffer.h>
    #include<event2/util.h>

    int tcp_connect_server(const char* server_ip, int port);
    void cmd_msg_cb(int fd, short events, void* arg);
    void server_msg_cb(struct bufferevent* bev, void* arg);
    void event_cb(struct bufferevent *bev, short event, void *arg);

    int main(int argc, char** argv) {
    if( argc < 3 ) {
    //两个参数依次是服务器端的IP地址、端口号
    printf("please input 2 parameter\n");
    return -1;
    }
    struct event_base *base = event_base_new();
    struct bufferevent* bev = bufferevent_socket_new(base, -1,
    BEV_OPT_CLOSE_ON_FREE);
    //监听终端输入事件
    struct event* ev_cmd = event_new(base, STDIN_FILENO,
    EV_READ | EV_PERSIST,
    cmd_msg_cb, (void*)bev);
    event_add(ev_cmd, NULL);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr) );
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(atoi(argv[2]));
    inet_aton(argv[1], &server_addr.sin_addr);
    bufferevent_socket_connect(bev, (struct sockaddr *)&server_addr,
    sizeof(server_addr));
    bufferevent_setcb(bev, server_msg_cb, NULL, event_cb, (void*)ev_cmd);
    bufferevent_enable(bev, EV_READ | EV_PERSIST);
    event_base_dispatch(base);
    printf("finished \n");
    return 0;
    }
    //用户输入事件
    void cmd_msg_cb(int fd, short events, void* arg) {
    char msg[1024];
    int ret = read(fd, msg, sizeof(msg));
    if( ret < 0 ) {
    perror("read fail ");
    exit(1);
    }
    struct bufferevent* bev = (struct bufferevent*)arg;
    //把终端的消息发送给服务器端
    bufferevent_write(bev, msg, ret);
    }
    //网络消息事件
    void server_msg_cb(struct bufferevent* bev, void* arg){
    char msg[1024];
    size_t len = bufferevent_read(bev, msg, sizeof(msg));
    msg[len] = '\0';
    printf("recv %s from server\n", msg);
    }
    //连接事件
    void event_cb(struct bufferevent *bev, short event, void *arg){
    if (event & BEV_EVENT_EOF)
    printf("connection closed\n");
    else if (event & BEV_EVENT_ERROR)
    printf("some other error\n");
    else if( event & BEV_EVENT_CONNECTED) {
    printf("the client has connected to server\n");
    return ;
    }
    //这将自动close套接字和free读写缓冲区
    bufferevent_free(bev);
    struct event *ev = (struct event*)arg;
    event_free(ev);
    }

服务器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#include<netinet/in.h>    
#include<sys/socket.h>
#include<unistd.h>

#include<stdio.h>
#include<string.h>

#include<event.h>
#include<listener.h>
#include<bufferevent.h>
#include<thread.h>

void listener_cb(evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg);
void socket_read_cb(bufferevent *bev, void *arg);
void socket_event_cb(bufferevent *bev, short events, void *arg);

int main() {
//evthread_use_pthreads();//enable threads
struct sockaddr_in sin;
memset(&sin, 0, sizeof(struct sockaddr_in));
sin.sin_family = AF_INET;
sin.sin_port = htons(9999);
event_base *base = event_base_new();
evconnlistener *listener
= evconnlistener_new_bind(base, listener_cb, base,
LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,
10, (struct sockaddr*)&sin,
sizeof(struct sockaddr_in));
event_base_dispatch(base);

evconnlistener_free(listener);
event_base_free(base);

return 0;
}
//一个新客户端连接上服务器了
//当此函数被调用时,libevent已经帮我们accept了这个客户端。该客户端的
//文件描述符为fd
void listener_cb(evconnlistener *listener, evutil_socket_t fd,
struct sockaddr *sock, int socklen, void *arg)
{
printf("accept a client %d\n", fd);
event_base *base = (event_base*)arg;
//为这个客户端分配一个bufferevent
bufferevent *bev = bufferevent_socket_new(base, fd,
BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, socket_read_cb, NULL, socket_event_cb, NULL);
bufferevent_enable(bev, EV_READ | EV_PERSIST);
}
//网络事件
void socket_read_cb(bufferevent *bev, void *arg)
{
char msg[4096];

size_t len = bufferevent_read(bev, msg, sizeof(msg)-1 );

msg[len] = '\0';
printf("server read the data %s\n", msg);

char reply[] = "I has read your data";
bufferevent_write(bev, reply, strlen(reply) );
}
//网络连接事件
void socket_event_cb(bufferevent *bev, short events, void *arg)
{
if (events & BEV_EVENT_EOF)
printf("connection closed\n");
else if (events & BEV_EVENT_ERROR)
printf("some other error\n");

//这将自动close套接字和free读写缓冲区
bufferevent_free(bev);
}

Evbuffer 缓冲化I/O事件