epoll进阶
事件模型
EPOLL事件有两种模型
- Level Triggered (LT) 只要有数据都会触发。epoll的默认工作模式是水平触发。
- Edge Triggered (ET) 只有数据到来才触发,不管缓存区中是否还有数据。
思考如下步骤
- 假定我们已经把一个用来从管道中读取数据的文件描述符(rfd)添加到epoll描述符。
- 管道的另一端写入了2KB的数据
- 调用
epoll_wait
,并且它会返回rfd
,说明它已经准备好读取操作 - 读取1KB的数据
- 调用
epoll_wait
……
在这个过程中,有两种工作模式:
ET模式
ET模式即Edge Triggered工作模式。
如果我们在第1步将rfd添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。
- 基于非阻塞文件句柄
- 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
LT模式
LT模式即Level Triggered工作模式。
与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。
比较
LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
ET(edge-triggered):==ET是高速工作方式,只支持no-block socket==。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。
实例一:基于管道epoll ET触发模式
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#define MAXLINE 10
int main(int argc, char *argv[]) {
int efd, i;
int pfd[2];
pid_t pid;
char buf[MAXLINE], ch = 'a';
pipe(pfd);
pid = fork();
if (pid == 0) {
close(pfd[0]);
while (1) {
for (i = 0; i < MAXLINE / 2; i++)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
write(pfd[1], buf, sizeof(buf));
sleep(2);
}
close(pfd[1]);
} else if (pid > 0) {
struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
close(pfd[1]);
efd = epoll_create(10);
/* event.events = EPOLLIN; */
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
event.data.fd = pfd[0];
epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event);
while (1) {
res = epoll_wait(efd, resevent, 10, -1);
printf("res %d\n", res);
if (resevent[0].data.fd == pfd[0]) {
len = read(pfd[0], buf, MAXLINE / 2);
write(STDOUT_FILENO, buf, len);
}
}
close(pfd[0]);
close(efd);
} else {
perror("fork");
exit(-1);
}
return 0;
}
实例二:基于网络C/S模型的epoll ET触发模式
server.c
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define MAXLINE 10
#define SERV_PORT 8080
int main(void) {
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, efd;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
listen(listenfd, 20);
struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
efd = epoll_create(10);
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
printf("Accepting connections ...\n");
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);
while (1) {
res = epoll_wait(efd, resevent, 10, -1);
printf("res %d\n", res);
if (resevent[0].data.fd == connfd) {
len = read(connfd, buf, MAXLINE / 2);
write(STDOUT_FILENO, buf, len);
}
}
return 0;
}
client.c
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#define MAXLINE 10
#define SERV_PORT 8080
int main(int argc, char *argv[]) {
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, i;
char ch = 'a';
sockfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);
connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
while (1) {
for (i = 0; i < MAXLINE / 2; i++)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
write(sockfd, buf, sizeof(buf));
sleep(10);
}
Close(sockfd);
return 0;
}
实例三:基于网络C/S非阻塞模型的epoll ET触发模式
server.c
#include <arpa/inet.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#define MAXLINE 10
#define SERV_PORT 8080
int main(void) {
struct sockaddr_in servaddr, cliaddr;
socklen_t cliaddr_len;
int listenfd, connfd;
char buf[MAXLINE];
char str[INET_ADDRSTRLEN];
int i, efd, flag;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(SERV_PORT);
bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
listen(listenfd, 20);
struct epoll_event event;
struct epoll_event resevent[10];
int res, len;
efd = epoll_create(10);
/* event.events = EPOLLIN; */
event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */
printf("Accepting connections ...\n");
cliaddr_len = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &cliaddr_len);
printf("received from %s at PORT %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),
ntohs(cliaddr.sin_port));
flag = fcntl(connfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
event.data.fd = connfd;
epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);
while (1) {
printf("epoll_wait begin\n");
res = epoll_wait(efd, resevent, 10, -1);
printf("epoll_wait end res %d\n", res);
if (resevent[0].data.fd == connfd) {
while ((len = read(connfd, buf, MAXLINE / 2)) > 0)
write(STDOUT_FILENO, buf, len);
}
}
return 0;
}
client.c
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#define MAXLINE 10
#define SERV_PORT 8080
int main(int argc, char *argv[]) {
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, i;
char ch = 'a';
sockfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);
connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr));
while (1) {
for (i = 0; i < MAXLINE / 2; i++)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
for (; i < MAXLINE; i++)
buf[i] = ch;
buf[i - 1] = '\n';
ch++;
write(sockfd, buf, sizeof(buf));
sleep(10);
}
Close(sockfd);
return 0;
}
epoll反应堆思想
epoll还有一种更高级的使用方法,那就是借鉴封装的思想,简单的说就是当某个事情发生了,自动的去处理这个事情。这样的思想对我们的编码来说就是设置回调,将文件描述符,对应的事件,和事件产生时的处理函数封装到一起,这样当某个文件描述符的事件发生了,回调函数会自动被触发,这就是所谓的反应堆思想。
从我们之前对epoll的使用上如何去支持反应堆呢?需要重新再认识一下struct epoll_event
中的epoll_data_t
结构体:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
我们之前使用的是共用体上的fd域,如果是要实现封装思想,光有fd是不够的,所以转换思路,看第一个域ptr,是一个泛型指针,指针可以指向一块内存区域,这块区域可以代表一个结构体,既然是结构体,那么我们就可以自定义了,将我们非常需要的文件描述符,事件类型,回调函数都封装在结构体上(我们在信号一章就见识过struct sigaction
上有掩码和回调函数等信息),这样当我们要监控的文件描述符对应的事件发生之后,我们去调用回调函数就可以了,这样就可以将文件描述符对应事件的处理代码梳理的非常清晰。