I/O模型

警告
本文最后更新于 2023-02-28,文中内容可能已过时。

在Linux系统中,socket(套接字)函数是用于创建一个通信端点(socket)的,其原型如下:

1
int socket(int domain, int type, int protocol);

其中:

  • domain:表示创建的套接字的工作域,如 IPv4、IPv6 等;
  • type:表示创建的套接字的类型,如 SOCK_STREAM(面向连接的TCP套接字)、SOCK_DGRAM(无连接的UDP套接字)等;
  • protocol:表示套接字使用的协议,如 IPPROTO_TCP(TCP协议)、IPPROTO_UDP(UDP协议)等。

socket 函数会返回一个新的文件描述符,该描述符代表创建的套接字,可以使用该文件描述符进行读写操作或者关闭该套接字。

函数调用成功后,常常需要将套接字与某个具体的地址/端口绑定(使用 bind 函数),再使用 listen 函数开始监听连接请求(对于 TCP 协议的套接字),或者不需要绑定而直接使用 connect 函数进行连接(对于 TCP 协议的客户端),或者直接使用 sendto 函数发送数据(对于 UDP 协议的套接字)。

总的来说,socket 函数是创建套接字的第一步,并为后续的网络编程活动奠定了基础。

在Linux系统中,bind(绑定)函数用于将一个套接字(socket)与一个地址(IP地址+端口号)绑定在一起。其原型如下:

1
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • sockfd:需要绑定的套接字的文件描述符;
  • addr:指向要绑定的地址结构体(struct sockaddr 类型)的指针;
  • addrlen:绑定地址结构体的大小。

在调用 bind 函数之前,需要确定要绑定的 IP 地址和端口号。地址结构体可以是 struct sockaddr_instruct sockaddr_in6,分别对应 IPv4 和 IPv6 地址。具体使用哪种结构体取决于套接字使用的协议,常用的如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
struct sockaddr_in {
    sa_family_t sin_family;   // address family: AF_INET
    in_port_t sin_port;       // port in network byte order
    struct in_addr sin_addr;  // internet address
};

struct sockaddr_in6 {
    sa_family_t sin6_family;   // address family: AF_INET6
    in_port_t sin6_port;       // port in network byte order
    struct in6_addr sin6_addr; // IPv6 address
};

绑定后,可以使用该套接字进行通信。如果需要开启服务器模式,可以绑定地址和端口后,使用 listen 函数开始监听请求。

总的来说,bind 函数是在组织和配置网络通信环境时非常有用的工具函数。它使得套接字可以与特定地址和端口绑定在一起,以便其他应用程序或计算机可以找到并通过这个套接字与这个应用程序或计算机进行通信。

在 Linux 中,listen函数是用于将指定的套接字(socket)转化为被动套接字(passive socket),以便对来自客户端的连接请求进行接受。它使得服务器可以开始监听连接,并且等待客户端的连接请求。

函数原型如下:

1
int listen(int sockfd, int backlog);

其中:

  • sockfd:指定需要转化为被动套接字的套接字描述符,它可以使用 socket() 函数创建。
  • backlog:指定等待接受的最大连接数。即在该套接字等待连接时所接受请求队列的最大长度。

通过调用 listen() 函数,服务器可以开始等待连接。此时,操作系统将创建一个请求队列,并将请求放入队列中,等待服务器处理。

对于每个请求,listen() 函数所接受的基本操作步骤如下:

  1. 等待客户端的请求。
  2. 请求到达时创建一个新的已连接套接字(connected socket)。
  3. 返回到服务器,处理新的已连接套接字。

需要注意的是,listen 函数并没有实际启动监听功能,它只是标记该套接字为被动套接字,告诉操作系统这个套接字应该用于接受连接请求。如果接受了连接请求,listen函数返回 0,否则返回 -1。在异常情况下,errno 变量会被设置为相应的错误值。

在使用 listen函数时,需要在 socket 和 bind 函数之后调用。同时,需要注意指定的套接字要符合相应协议的要求,否则可能会出现无法连接或者无法接受连接等问题。

在 Linux 中,accept 函数是用于从处于监听状态的套接字中接收来自客户端的连接请求,从而得到一个新的对端套接字,可以使用该新的套接字来和客户端进行通信。

1
int accept(int sockfd, struct sockaddr *restrict addr, socklen_t *restrict addrlen);
  • sockfd:所监听的套接字的文件描述符,一般是使用 socket和 bind 函数设置的服务器套接字;
  • addr:指向一个结构体的指针,用于存储连接进来客户端的地址。如果不需要获取该客户端地址信息,可以将该指针设置为 NULL
  • addrlen:指向一个 socklen_t 类型的整数,存储了 addr 所指向的地址结构体的大小。在调用之前,需要把这个值设置为 sizeof(struct sockaddr)

调用 accept() 函数时,如果已有客户端发起连接请求,函数将会返回一个新的文件描述符,该文件描述符与该客户端建立了连接,可以用于后续的通信。如果没有连接请求,该函数将一直阻塞等待,直到接收到连接请求或者发生错误。

需要注意的是,accept函数一般在服务器处理新连接的循环中一直运行,所以应当确保能够及时处理每一个连接的请求,避免连接队列满,导致新的连接请求无法处理。

总的来说,accept函数是在组织和配置网络通信环境时非常有用的工具函数。它使得服务器能够接受连接请求,从而可以与客户端进行通信。

在 Linux 中,connect函数用于创建连接到指定 IP 地址和端口号的套接字(socket)。

1
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • sockfd:需要连接的套接字的文件描述符;
  • addr:指向要连接的地址结构体(struct sockaddr 类型)的指针;
  • addrlen:要连接的地址结构体的大小。

在调用 connect() 函数之前,需要确定需要连接的 IP 地址和端口号。地址结构体可以是 struct sockaddr_instruct sockaddr_in6,分别对应 IPv4 和 IPv6 地址。具体使用哪种结构体取决于套接字使用的协议,常用的如下:

调用 connect() 函数将通过特定的 IP 地址和端口号尝试连接指定套接字。如果连接成功,将返回 0;如果连接失败,将返回 -1,并将 errno 设置为相应的错误码。网络连接的尝试将会一直阻塞,知道连接成功或者发生错误。

需要注意的是,使用 connect() 建立连接的套接字是客户端套接字,而不是服务端套接字。使用该套接字发送和接收数据,可以与服务端进行通信。通信结束后,需要关闭该套接字关闭连接。

总的来说,connect() 函数是连接到指定 IP 地址和端口号的关键函数,它使得应用程序能够与特定的服务器或者其他应用程序之间建立连接,并传输数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// tcp_server_listen 函数,通过指定特定的端口监听 TCP 连接请求
int tcp_server_listen(int port) {
  int listenfd; // 定义监听套接字

  listenfd = socket(AF_INET, SOCK_STREAM, 0); // 创建一个 IPv4 套接字

  struct sockaddr_in server_addr; // 定义服务器 socket 地址结构体
  memset(&server_addr, 0, sizeof(server_addr)); // 初始化为 0
  server_addr.sin_family = AF_INET;             // 使用 IPv4
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 绑定到任意可用的地址
  server_addr.sin_port = htons(port);              // 绑定到特定的端口

  // 设置套接字选项 SO_REUSEADDR,用于调试目的,可以快速重用被占用的端口
  int on = 1;
  if ((setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) == -1) {
    error(1, errno, "setsockopt failedd");
  }

  // 将套接字与 IP 地址和端口绑定在一起
  int rt1 =
      bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr));
  if (rt1 < 0) {
    error(1, errno, "bind failed");
  }

  // 开始监听该套接字
  int rt2 = listen(listenfd, LISTENQ);
  if (rt2 < 0) {
    error(1, errno, "listen failedd");
  }

  // 忽略 SIGPIPE 信号,避免在通信过程中出错
  signal(SIGPIPE, SIG_IGN);

  // 返回监听套接字
  return listenfd;
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// tcp_client 函数,用于创建一个 TCP 客户端连接,返回连接套接字描述符
int tcp_client(char *address, int port) {
  int socket_fd; // 定义套接字

  // 创建一个 IPv4 套接字
  socket_fd = socket(AF_INET, SOCK_STREAM, 0);

  // 定义需要连接的服务器 socket 地址结构体
  struct sockaddr_in server_addr;
  memset(&server_addr, 0, sizeof(server_addr));       // 清空结构体
  server_addr.sin_family = AF_INET;                   // 使用 IPv4
  server_addr.sin_port = htons(port);                 // 指定端口
  inet_pton(AF_INET, address, &server_addr.sin_addr); // 使用指定 IP 地址

  socklen_t server_len = sizeof(server_addr);

  // 尝试连接到远程主机
  int connect_rt =
      connect(socket_fd, (struct sockaddr *)&server_addr, server_len);
  if (connect_rt < 0) {                 // 如果连接失败
    error(1, errno, "connect failed "); // 输出错误信息并退出程序
  }

  // 返回连接套接字描述符
  return socket_fd;
}
  • 阻塞 IO(bloking IO)
  • 非阻塞 IO(non-blocking IO)
  • 多路复用 IO(multiplexing IO)
  • 信号驱动式 IO(signal-driven IO)
  • 异步 IO(asynchronous IO)

对于套接字上的输入操作

  1. 常涉及等待数据到达网络。当数据包到达时,它被复制到内核中的缓冲区中。(下图中的 wait for data 阶段)
  2. 将此数据从内核缓冲区复制到我们的应用程序缓冲区。(下图中的 copy data from kernel to user 阶段)

https://image.linux88.com/2023/02/28/df282afa06c754eb82edde6a883f6987.png

  1. 应用程序通过调用系统调用(如 read 和 write)来读写数据,如果数据未准备好,则系统调用会一直阻塞,
  2. 直到数据准备就绪才返回。

上述使用多线程的服务器模型看起来完美地解决了为多个客户机提供问答服务的要求,但实际上情况并非如此简单。如果同时要响应成百上千个连接请求,无论是多线程还是多进程,都会严重消耗系统资源,降低系统的响应效率。此外,线程和进程本身也更容易陷入死锁状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include <errno.h>
#include <netinet/in.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define SERV_PORT 43211
#define MAXLINE 4096
#define LISTENQ 1024

void error_exit(const char *msg) {
  perror(msg);
  exit(EXIT_FAILURE);
}

void *handle_connection(void *arg) {
  int connfd = *(int *)arg;
  char buff[MAXLINE];
  int n;

  while ((n = recv(connfd, buff, MAXLINE, 0)) > 0) {
    buff[n] = '\0';
    printf("recv msg from client: %s", buff);
    if (send(connfd, buff, n, 0) < 0) {
      error_exit("send failed");
    }
  }

  if (n < 0) {
    error_exit("recv failed");
  }

  close(connfd);
  return NULL;
}

int main(int argc, char const *argv[]) {
  int listenfd, connfd, n;
  struct sockaddr_in server_addr;
  char buff[MAXLINE];

  if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
    error_exit("create socket error");
  }

  memset(&server_addr, 0, sizeof(server_addr));
  server_addr.sin_family = AF_INET;
  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
  server_addr.sin_port = htons(SERV_PORT);

  int opt = 1;
  if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
    error_exit("setsockopt error");
  }

  if (bind(listenfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) <
      0) {
    error_exit("bind failed");
  }

  if (listen(listenfd, LISTENQ) < 0) {
    error_exit("listen failed");
  }

  signal(SIGPIPE, SIG_IGN);

  printf("waiting for client's request, server listen port: %d\n", SERV_PORT);

  while (1) {
    struct sockaddr_in client_addr;
    socklen_t client_len = sizeof(client_addr);
    int connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_len);
    if (connfd < 0) {
      perror("accept failed");
      continue;
    }

    pthread_t tid;
    if (pthread_create(&tid, NULL, handle_connection, &connfd) != 0) {
      perror("pthread_create failed");
      close(connfd);
      continue;
    }

    if (pthread_detach(tid) != 0) {
      perror("pthread_detach failed");
      close(connfd);
      continue;
    }
  }

  return 0;
}

线程池和连接池都是一种池化技术,用于提高并发性能和资源利用率,但它们针对的资源类型不同,有以下区别:

  • 线程池(Thread Pool):维护一组预先创建的线程,当有任务需要执行时,从池中取出一个线程来处理任务,任务执行完毕后将线程归还给池。线程池主要用于控制并发量,防止创建过多的线程造成资源浪费和系统负载过高。
  • 连接池(Connection Pool):维护一组预先创建的网络连接或数据库连接,当需要访问网络或数据库时,从池中取出一个连接进行操作,操作完成后将连接归还给池。连接池主要用于减少每次操作时建立连接和断开连接的开销,提高访问效率和资源利用率。

Go 的sync.Pool属于线程池类型,它可以用于存储和复用那些在高并发情况下需要频繁分配和回收的临时对象,从而减少 GC 的压力和提高性能。在 Go 中,连接池一般使用第三方库进行实现,例如database/sql中的连接池。

池有一个限制,也就是资源数量的上限。如果外界的请求数量超过了池中资源的数量,那么使用池的系统可能会比没有池的系统响应更慢。因此,在使用池的时候需要考虑预计的请求规模,并相应地调整池的大小以满足需求。

  1. 应用程序通过调用系统调用(如 read 和 write)来读写数据,如果数据未准备好,则系统调用会立即返回一个错误码,应用程序可以继续执行其他任务,定期轮询检查数据是否准备好,
  2. 直到数据准备就绪才读写。

https://image.linux88.com/2023/02/28/129f556498c480cfe73cf45f89738cc5.png

非阻塞 IO 的流程:

  1. 当一个用户程序向内核发出读取数据的指令时,如果数据尚未准备好,内核不会让该程序进入等待状态,而是立即返回一个错误(EWOULDBLOCK)
    1. 因此,用户程序不必等待就能够快速地获取结果。
    2. 如果用户程序发现返回值是一个错误,它就知道数据还没有准备好,因此可以再次发送读取数据的指令。
  2. 当内核中的数据准备好之后,再次收到用户程序的读取指令,内核就会立即将数据复制到用户内存中,并返回一个正确的结果。

因此,在非阻塞 IO 中,用户程序需要不断地主动向内核询问数据是否准备好

在非阻塞 IO 中,当调用 recv()接口时,它会立即返回一个值,代表了不同的含义。例如,在本例中,

  • 如果 recv()返回一个大于 0 的值,表示数据已经接收完毕,返回值即为接收到的字节数;
  • 如果 recv()返回 0,表示连接已经正常关闭;
  • 如果 recv()返回-1,并且 errno 等于 EAGAIN,表示接收操作还没有完成;
  • 如果 recv()返回-1,并且 errno 不等于 EAGAIN,表示接收操作遇到了系统错误。

非阻塞 I/O 配合 I/O 多路复用,是高性能网络编程中的常见技术。

1
2
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );

这个函数调用的作用是将文件描述符 fd 所对应的文件设置为非阻塞模式。

在非阻塞模式下,当执行 I/O 操作时,不会阻塞进程的执行,而是立即返回。如果操作无法立即完成,会返回一个特殊的错误码,表示操作需要稍后重试。这个函数调用中的第二个参数 F_SETFL 表示设置文件的属性,第三个参数 O_NONBLOCK 表示要设置的属性值,即文件要被设置为非阻塞模式。

通常情况下,设置文件为非阻塞模式的原因是为了实现异步 I/O 操作。在异步 I/O 操作中,进程可以继续执行其他任务,而不需要等待 I/O 操作完成。当 I/O 操作完成时,内核会向进程发送信号或者调用回调函数,通知进程操作已经完成,进程可以继续处理数据。非阻塞模式也常用于网络编程中,可以使程序在等待网络数据时不会被阻塞。

  1. 在 I/O 多路复用模型中,应用程序可以同时监听多个文件描述符的 I/O 事件,可以将多个 I/O 操作集中到一个线程中处理。应用程序通过系统调用select、poll、epoll将自己阻塞,等待多个文件描述符中的一个或多个变为可读或可写。

  2. 当有一个或多个文件描述符变为可读或可写时,select、poll、epoll会返回,应用程序就可以通过 I/O 操作读取或写入数据了。这个过程是同步的,即应用程序会阻塞在 select 或 poll 系统调用上,等待文件描述符变为可读或可写。

https://image.linux88.com/2023/02/28/0de614b72616755f399cf65c33118767.png

多路复用 IO 是一种在单个线程内同时处理多个 socket 的 IO 请求的方法。当用户进程调用 select 函数时,进程会被阻塞,同时内核会监视所有由 select 函数负责的 socket。当任何一个 socket 中的数据准备好了,select 就会返回,这时用户进程再调用 read 操作,将数据从内核拷贝到用户进程。

相比于阻塞 IO,使用 select 的最大优势是可以在一个线程内同时处理多个 socket 的 IO 请求。用户可以注册多个 socket,然后不断调用 select 函数读取被激活的 socket,以此达到同时处理多个 IO 请求的目的。而在阻塞 IO 中,必须通过多线程的方式才能达到这个目的。

在多路复用模型中,每个 socket 一般都被设置成非阻塞的。虽然整个用户进程在调用 select 函数时被阻塞,但是它与非阻塞 IO 类似,不会被单个 socket 的 IO 请求所阻塞。因此,使用 select 的优势并不在于能够更快地处理单个连接,而是能够处理更多的连接。当然,如果连接数不高,使用 select/epoll 的 web server 并不一定比使用多线程+阻塞 IO 的 web server 性能更好,甚至可能延迟更大。

select 函数是一种比较常用的I/O多路复用机制,适用于Linux、Unix、Windows等操作系统。主要用于同时监控多个IO事件,当有数据需要读取或写入时,select函数才会返回。下面是select函数的特点:

  1. 支持监控多个文件描述符,可以同时监控读、写和异常事件。
  2. 能够设置超时时间,可以等待一定时间后自动返回。
  3. 适用于不同的操作系统,可移植性较高。
  4. 使用比较简单,方便于快速实现。
1
2
3
4
5
int select(int nfds, fd_set *restrict readfds,fd_set *restrict writefds, fd_set *restrict exceptfds, struct timeval *restrict timeout);
void FD_CLR(int fd, fd_set *set);
int  FD_ISSET(int fd, fd_set *set);
void FD_SET(int fd, fd_set *set);
void FD_ZERO(fd_set *set);

各个参数含义:

  1. nfds:指定被监听的文件描述符个数,即所有文件描述符中最大值加1。
  2. readfds:用于监听可读事件的文件描述符集合。
  3. writefds:用于监听可写事件的文件描述符集合。
  4. exceptfds:用于监听异常事件的文件描述符集合。
  5. timeout:指定select的超时时间,可以使 select 阻塞等待数据的到来,也可以使 select 超时返回。

在使用select函数时,需要使用fd_set结构体变量(类似于集合)来存储需要监控的文件描述符,通过FD_SET()和FD_CLR()这两个宏来在fd_set结构体中添加或删除对应的文件描述符。

在调用select函数时,需要将需要监控的文件描述符集合传递给select函数,select函数会阻塞等待文件描述符上发生的事件,直到有事件发生时,select函数将返回,并通过修改相应的文件描述符集合来通知应用程序。

  1. FD_ZERO(fd_set *set):将 fd_set 清空,所有元素都设置成 0。
  2. FD_SET(int fd, fd_set *set):将指定的文件描述符 fd 加入到 fd_set 中,a[fd] 设置成 1。
  3. FD_CLR(int fd, fd_set *set):将指定的文件描述符 fd 从 fd_set 中剔除,a[fd] 设置成 0。
  4. FD_ISSET(int fd, fd_set *set):判断指定的文件描述符 fd 是否在 fd_set 中,判断出对应套接字的元素 a[fd] 是 0 还是 1。

timeout 选项:

1
2
3
4
struct timeval {
    time_t      tv_sec;         /* seconds */
    suseconds_t tv_usec;        /* microseconds */
};
  • 第一个可能是设置成空 (NULL),表示如果没有 I/O 事件发生,则 select 一直等待下去。
  • 第二个可能是设置一个非零的值,这个表示等待固定的一段时间后从 select 阻塞调用中返回。
  • 第三个可能是将 tv_sec 和 tv_usec 都设置成 0,表示根本不等待,检测完毕立即返回。这种情况使用得比较少。

select 函数也有一些缺点,比如监控的文件描述符数目上限比较小,且每次调用都需要将文件描述符集合传递给select,会导致性能较低等。因此,epoll机制逐渐成为了一种更高效的I/O多路复用技术。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#define MAX_CLIENT_NUM 10

int main(int argc, char *argv[]) {
  int listenfd, connfd;           // 定义监听和客户端套接字
  struct sockaddr_in client_addr; // 定义客户端 socket 地址结构体
  socklen_t client_len = sizeof(client_addr); // 客户端地址结构体的长度
  fd_set curr_fds, read_fds;     // 定义select的文件描述符集合
  int client_fd[MAX_CLIENT_NUM]; // 客户端连接套接字数组
  int curr_client_num = 0;       // 当前连接的客户端数
  int max_fd, i, ret; // 最大文件描述符、循环计数器、读写操作返回值
  char buf[MAXLINE]; // 读写数据缓冲区

  // 创建 TCP 服务器监听套接字
  listenfd = tcp_server_listen(SERV_PORT);
  printf("server listen port: %d\n", SERV_PORT);

  // 初始化当前文件描述符集合,并将监听套接字加入
  FD_ZERO(&curr_fds);
  FD_SET(listenfd, &curr_fds);
  max_fd = listenfd;

  // 创建一个死循环进行 select 监听
  while (1) {
    // 将当前文件描述符集合 curr_fds 拷贝到 read_fds 中,是为了防止在 select
    // 监听期间,当前文件描述符集合 curr_fds 被修改,造成混乱。read_fds
    // 只是一个备份而已,不会影响主要的文件描述符集合。
    read_fds = curr_fds;

    /*
      为什么需要 max_fd + 1?
      select函数需要一个整数类型的参数,表示监视的最大文件描述符,
      这个参数的值应该是所有已注册的文件描述符中最大的一个加一,也就是
      max_fd + 1。因为文件描述符在 Linux 系统中是一个非负整数,而 select
      函数又需要一个范围内可用的文件描述符的集合,因此需要计算出最大文件描述符值
      max_fd + 1,以便设置文件描述符集合的大小。
    */
    // 监听所有已注册的文件描述符
    if (select(max_fd + 1, &read_fds, NULL, NULL, NULL) < 0) {
      error(1, errno, "select failed ");
    }

    // 遍历所有已注册的文件描述符,处理相应事件
    for (i = 0; i <= max_fd; ++i) {
      if (FD_ISSET(i, &read_fds)) {
        // 如果是监听套接字就绪,则有客户端连接请求
        if (i == listenfd) {
          connfd =
              accept(listenfd, (struct sockaddr *)&client_addr, &client_len);
          if (connfd < 0) {
            error(1, errno, "accept failed ");
          }
          // 将连接套接字加入到当前文件描述符集合中
          if (curr_client_num < MAX_CLIENT_NUM) {
            FD_SET(connfd, &curr_fds);
            client_fd[curr_client_num++] = connfd;
            if (connfd > max_fd) {
              max_fd = connfd;
            }
            printf("New client connected: %s:%d (fd %d)\n",
                   inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port),
                   connfd);
          } else {
            printf("Max client number reached, rejected new connection.\n");
            close(connfd);
          }
        }
        // 否则,有已连接套接字的数据到了,进行读写操作
        else {
          ret = read(i, buf, sizeof(buf));
          if (ret <= 0) {
            // 从客户端断开连接,将其从当前文件描述符集合中删除
            printf("Client disconnected: %d\n", i);
            FD_CLR(i, &curr_fds);
            close(i);
            for (int j = 0; j < curr_client_num; ++j) {
              if (client_fd[j] == i) {
                for (int k = j + 1; k < curr_client_num; ++k) {
                  client_fd[k - 1] = client_fd[k];
                }
                curr_client_num--;
                break;
              }
            }
          } else {
            printf("Received message from client %d: %s", i, buf);
            if (write(i, buf, strlen(buf)) < 0) {
              printf("Failed to send message to client %d\n", i);
            }
          }
        }
      }
    }
  }

  close(listenfd);
  return 0;
}

在 Linux 中,poll 函数用于等待多个文件描述符(socket、管道、设备等)中的一个或多个变为可读、可写、出错等状态,从而使得程序可以同时监听多个文件描述符而不需要使用阻塞 I/O。poll 函数是 I/O 多路复用机制中的一种。

1
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  • fds:一个指针,指向包含了多个文件描述符的数组。对于每一个要监听的文件描述符,都需要创建一个 struct pollfd(fd, events, revents) 结构体,并将其放入这个数组中;
    • fd:要监听的文件描述符;
    • events:要监听的事件:
      • 可读:
        • POLLIN:There is data to read.
        • POLLPRI: There is urgent data to read.
        • POLLRDNORM:Normal data may be read.
        • POLLRDBAND:Priority data may be read.
      • 可写:
        • POLLOU:Writing now will not block.
        • POLLWRNORM:Writing now will not block.
        • POLLWRBAND:Priority data may be written.
    • revents:返回时指定的事件:
      • POLLERR:一些错误发送。
      • POLLHUP:描述符挂起。
      • POLLNVAL:请求的事件无效。
  • nfds:需要等待的文件描述符数目,即数组的长度;
  • timeout:等待时的超时时间,单位为毫秒。
    • 如果为 -1,表示一直等待直到事件发生或者出现错误;
    • 如果为 0,表示 poll() 函数立即返回,不等待任何事件;
    • 如果大于 0,表示等待指定的毫秒数,超时后 poll() 函数将返回 0。

poll 是一种非阻塞 I/O 模型,用于等待多个文件描述符上的事件。通过调用 poll 函数,可以指定一组文件描述符及其关注的事件,然后等待这些事件中的任何一个发生。不同于 select 函数,poll 函数允许调用者在一个数组中指定文件描述符,因此无需使用额外的数据结构来维护描述符集合。

poll 函数的返回值取决于发生的事件个数

  • 如果没有发生任何事件,返回值为 0。
  • 如果有事件发生,返回值为到达事件的文件描述符的数量。
  • 如果有错误发生,则返回值为 -1。

我们还可以使用 pollfd 结构体的 fd 成员将某个文件描述符从事件检测中删除,只需设置 fd 为负值即可;若 poll 函数的返回值对应的 pollfd 结构体的 revents 成员的值为 0,则表示该文件描述符未发生任何事件。

select 函数相比,poll 函数可以控制 pollfd 数组的大小,这意味着可以突破 select 函数最大描述符的限制。但是,使用 poll 函数需要分配 pollfd 数组并通知 poll 函数该数组的大小。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#define INIT_SIZE 128

int main(int argc, char **argv) {
  int listenfd, connfd;           // 定义监听和客户端套接字
  struct sockaddr_in client_addr; // 定义客户端 socket 地址结构体
  socklen_t client_len = sizeof(client_addr); // 客户端地址结构体的长度
  int ready_number;
  ssize_t n;
  char buf[MAXLINE];

  listenfd = tcp_server_listen(SERV_PORT);

  // 初始化pollfd数组,这个数组的第一个元素是listenfd,其余的用来记录将要连接的connfd
  struct pollfd event_set[INIT_SIZE];
  event_set[0].fd = listenfd;
  event_set[0].events =
      POLLRDNORM; // 期望系统内核检测监听套接字上的连接建立完成事件

  printf("server listen port: %d\n", SERV_PORT);
  // 将某个文件描述符从事件检测中删除,只需设置 fd 为负值即可
  // 用-1表示这个数组位置还没有被占用,
  int i;
  for (i = 1; i < INIT_SIZE; i++) {
    event_set[i].fd = -1;
  }

  for (;;) {
    if ((ready_number = poll(event_set, INIT_SIZE, -1)) < 0) {
      error(1, errno, "poll failed ");
    }

    if (event_set[0].revents & POLLRDNORM) {

      connfd = accept(listenfd, (struct sockaddr *)&client_addr, &client_len);

      // 找到一个可以记录该连接套接字的位置
      for (i = 1; i < INIT_SIZE; i++) {
        if (event_set[i].fd < 0) {
          event_set[i].fd = connfd;
          event_set[i].events = POLLRDNORM;
          break;
        }
      }

      if (i == INIT_SIZE) {
        error(1, errno, "can not hold so many clients");
      }

      if (--ready_number <= 0)
        continue;
    }

    for (i = 1; i < INIT_SIZE; i++) {
      int socket_fd;
      if ((socket_fd = event_set[i].fd) < 0)
        continue;
      if (event_set[i].revents & (POLLRDNORM | POLLERR)) {
        if ((n = read(socket_fd, buf, MAXLINE)) > 0) {
          if (write(socket_fd, buf, n) < 0) {
            error(1, errno, "write error");
          }
        } else if (n == 0 || errno == ECONNRESET) {
          close(socket_fd);
          event_set[i].fd = -1;
        } else {
          error(1, errno, "read error");
        }

        if (--ready_number <= 0)
          break;
      }
    }
  }
}

The Linux Programming Interface 这本书中,对比了 select,poll,epoll 的 CPU 耗时。

1
2
3
4
5
# operations  |  poll  |  select   | epoll
10            |   0.61 |    0.73   | 0.41
100           |   2.9  |    3.0    | 0.42
1000          |   35   |    35     | 0.53
10000         |   990  |    930    | 0.66

使用 epoll 的基本流程如下:

  1. 创建一个 epoll 实例:

    1
    
    int epoll_create(int size);
    

    epoll_create() 会返回一个 epoll 实例的句柄,size 参数指定将被监听的 FD 上限,实际大小可以通过 /proc/sys/fs/epoll/max_user_watches 来查看和修改。

  2. 将要监听的文件描述符注册到 epoll 实例中:

    1
    
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    
    • epfd 是 epoll 实例的句柄。
    • op 可以是 EPOLL_CTL_ADD(添加), EPOLL_CTL_MOD(修改) 或 EPOLL_CTL_DEL(删除)。
    • fd 是需要监听的文件描述符。
    • event 指向 epoll_event 结构体,它包含了需要监听的文件描述符以及事件的类型。
  3. 等待文件描述符(socket)的就绪事件:

    1
    
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
    
    • epfd 是 epoll 实例的句柄。
    • events 是事件结构体的地址,内核在告知该 epoll 实例中有事件到达时将会把就绪事件通知到 events 中。
    • maxevents 是当事件就绪时,内核同时通知的最大事件数。当没有事件到达时,epoll_wait() 函数会一直阻塞等待,直到有事件到达,或者超时时间 timeout 到达。
    • timeout 为 0 时表示立即返回,为 -1 时表示无限等待,其他值表示等待的毫秒数。
  4. 处理就绪事件:

    通过遍历返回的 events,可以检查每个文件描述符(socket)上的事件,并确定应该采取的操作(如读取数据等)。

  5. 取消对某个文件描述符的监控:

    1
    
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    

    epfd 是 epoll 实例的句柄,op 设置为 EPOLL_CTL_DELfd 表示要取消监控的文件描述符。

  6. 关闭 epoll 实例:

    1
    
    int close(int fd);
    

    fd 是 epoll 实例的句柄,通过该函数将 epoll 实例销毁。

下面是 epoll_event 的结构体:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
typedef union epoll_data
{
  void *ptr;
  int fd;
  uint32_t u32;
  uint64_t u64;
} epoll_data_t;

struct epoll_event
{
  uint32_t events;	/* Epoll events */
  epoll_data_t data;	/* User data variable */
} __EPOLL_PACKED;
  • EPOLLIN :文件描述符可读
  • EPOLLPRI :高优先级数据可读
  • EPOLLOUT :文件描述符可写
  • EPOLLRDNORM :文件描述符有数据可读
  • EPOLLRDBAND :优先级带数据可读
  • EPOLLWRNORM :文件描述符可写
  • EPOLLWRBAND :优先级带数据可写
  • EPOLLMSG :消息可读
  • EPOLLERR :关注的文件描述符出错
  • EPOLLHUP :关注的文件描述符被挂起
  • EPOLLRDHUP :与远程端口的连接被挂起或关闭
  • EPOLLEXCLUSIVE :保证同一时刻只有一个进程或线程能访问这个事件的文件描述符
  • EPOLLWAKEUP :设置了该标志,epoll_wait() 在没有事件到达时不会阻塞
  • EPOLLONESHOT :事件一次性,触发后必须重新加入事件队列才能继续监控该文件描述符的事件
  • EPOLLET :边缘触发方式,只有在文件描述符状态改变时才触发事件。在数据流到达时会触发一次事件,并且不再触发,除非有新数据到达。

edge-triggeredlevel-triggered 都是 epoll 可监听的事件类型。

  • level-triggered:当文件描述符可读或可写时,每次 epoll_wait() 函数返回时都会报告文件描述符就绪状态,直到该文件描述符上的事件被处理完毕之后,epoll_wait() 函数再次返回,如果这个文件描述符又有事件发生,会再次通知到程序。在这种模式下,文件描述符只有在状态发生变化时(比如有数据到达)才会通知。程序需要不断读取数据,直到 read 函数返回失败(读到 EOF 或收到错误)。
  • edge-triggered:只在文件描述符状态变化时通知一次,直到下一次发生状态变化前都不会再通知。这种模式下,当文件描述符状态发生变化时,会通知程序一次,并不会继续通知,直到程序读取了大于等于文件描述符上的字节数后,才会再次通知。基于这种模式,程序需要在就绪事件被通知后,立即读取文件描述符中的数据,否则可能错过接下来的事件。

通常情况下,level-triggered 模式比较容易理解和使用,可以持续监听某个 socket 的可读可写事件,并实时读取数据,具有更好的稳定性和可读性;而 edge-triggered 模式需要格外小心,因为被忽略的事件可能会导致代码不稳定。

在使用 edge-triggered 模式时,需要注意以下几点:

  1. 需要立即处理并读取完整个文件描述符上的数据,以避免漏掉数据。
  2. 需要考虑所有异常情况,比如 EAGAINEWOULDBLOCK 等。
  3. 需要小心地管理非阻塞读写过程中的状态,以确保在 epoll_wait() 重置状态之前不会调用多次读取或写入操作。

一般我们认为,边缘触发的效率比条件触发的效率要高,这一点也是 epoll 的杀手锏之一。

  1. 在信号驱动式 I/O 模型中,应用程序通过系统调用sigaction设置信号处理函数,并通过系统调用fcntl设置文件描述符的属性,使其在 I/O 操作完成时向应用程序发送一个 SIGIO 信号。
  2. 应用程序通过信号处理函数处理该信号,从而进行 I/O 操作。这个过程是异步的,即应用程序不需要阻塞在 I/O 操作上,而是可以继续执行其他操作,当 I/O 操作完成后,内核会向应用程序发送 SIGIO 信号,应用程序通过信号处理函数处理该信号,完成 I/O 操作。

https://image.linux88.com/2023/02/28/a702fd212fe4d9353e5d296bfe9945c9.png

  1. 在异步 I/O 模型中,应用程序通过系统调用aio_readaio_write等异步 I/O 函数发起 I/O 操作,然后可以继续执行其他操作。
  2. 当 I/O 操作完成后,内核会向应用程序发送一个信号或回调应用程序注册的函数,应用程序通过信号处理函数或回调函数处理 I/O 操作的完成。这个过程是异步的,即应用程序不需要阻塞在 I/O 操作上,而是可以继执行,而不必等待 I/O 操作的完成,因为 I/O 操作是由操作系统内核完成的。

https://image.linux88.com/2023/02/28/321d0ab274f7cff3c4ff00d18f050b89.png

https://image.linux88.com/2023/02/28/24cdca5c63dd09fc46b04ae90cb16ae0.png

根据上图可知前 4 种 I/O 模型的第一阶段是不一样的,第二阶段是相同的都是 blocked。

而当 I/O 操作完成时,操作系统内核会通知应用程序,让应用程序知道 I/O 已经完成,并将 I/O 结果传递给应用程序。

POSIX 对同步 I/O 和异步 I/O 的定义

  • 同步 I/O 操作导致请求进程被阻塞,直到该 I/O 操作完成。
  • 异步 I/O 操作不会导致请求进程被阻塞。

根据上面的定义前 4 种 I/O 模型为同步 I/O 模型,只有异步 I/O 与 POSIX 定义的异步 I/O 相匹配。


极客时间<网络编程实战>代码