深入浅出 I/O 多路复用与 Proactor/Reactor:从 C10K 到 io_uring 的演进之路

深入浅出 I/O 多路复用与 Proactor/Reactor

一篇讲透 I/O 多路复用、Reactor/Proactor 模式、epoll 和 io_uring 的完整指南


目录

  1. 为什么需要多路复用?
  2. I/O 多路复用技术详解
  3. Reactor 与 Proactor 模式
  4. 发展历程:从 select 到 io_uring
  5. epoll 完全指南
  6. io_uring:Linux 的新一代异步 I/O
  7. 高性能网络库实践
  8. 性能对比与选型建议
  9. 常见问题解答

一、为什么需要多路复用?

1.1 C10K 问题

2000 年前后,随着互联网爆发,服务器需要同时处理上万个客户端连接。传统的”一连接一线程/进程”模式遭遇严重瓶颈:

  • 内存消耗:每个线程栈 ~8MB,10,000 个线程需要 80GB 内存
  • CPU 开销:频繁的上下文切换,CPU 大量时间花在调度而非业务处理
  • 系统限制:OS 对进程/线程数量、文件描述符数量均有上限

这就是著名的 C10K 问题(同时处理 10,000 个连接)。

1.2 I/O 模型对比(Unix/Linux)

I/O 模型 阻塞阶段 系统调用 特点 并发能力
阻塞 I/O 整个操作 read/write 简单直观,一连接一线程
非阻塞 I/O 无阻塞,需轮询 read/write (O_NONBLOCK) CPU 空转浪费 一般
I/O 多路复用 select/epoll 等待 select/poll/epoll 单线程监控多连接
信号驱动 I/O 数据准备阶段 sigaction + read 回调方式,实际使用少 一般
异步 I/O (AIO) 无阻塞 aio_read/aio_write (io_uring) 彻底异步,内核完成所有操作 最强

核心结论:I/O 多路复用是解决 C10K 问题的关键技术,而 io_uring 代表了未来的方向。


二、I/O 多路复用技术详解

2.1 什么是多路复用?

定义:通过一种机制,让单个进程/线程能够同时监视多个文件描述符(网络连接、文件、管道等),一旦某个描述符就绪(可读/可写/异常),内核主动通知应用程序进行处理。

通俗理解:就像餐厅里的服务员(一个线程)同时服务多张桌子(多个连接)——不需要为每张桌子配专属服务员,哪个桌子喊服务员就去处理哪个。

2.2 select——最早的多路复用

1
2
3
4
5
6
7
8
9
10
11
12
// select 的核心问题:需要遍历所有 fd
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(socket_fd, &readfds);
int ret = select(max_fd + 1, &readfds, nullptr, nullptr, nullptr);

// O(n) 扫描,连接越多越慢
for (int i = 0; i <= max_fd; i++) {
if (FD_ISSET(i, &readfds)) {
handle(i);
}
}

select 的缺陷:

  • ❌ 文件描述符数量上限(FD_SETSIZE,通常 1024)
  • ❌ 每次调用都需将 fd 集合从用户态拷贝到内核态
  • ❌ O(n) 线性扫描,性能随连接数增长线性下降
  • ❌ fd 集合会被内核修改,每次需重新初始化

2.3 poll——改进但仍不完美

1
2
3
4
5
6
7
8
9
10
11
// poll 使用数组存储,去除了 1024 上限
struct pollfd fds[MAX_CONN];
fds[0].fd = listen_fd;
fds[0].events = POLLIN;

int ret = poll(fds, nfds, timeout);
for (int i = 0; i < nfds; i++) {
if (fds[i].revents & POLLIN) {
handle(fds[i].fd);
}
}

poll 的改进与不足:

  • ✅ 去除了文件描述符数量上限
  • ✅ 使用 events/revents 分离,无需每次重新初始化
  • ❌ 仍需 O(n) 扫描
  • ❌ 每次调用都需拷贝整个数组到内核

2.4 epoll——Linux 的革命性突破

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// epoll 的核心:事件驱动,只返回就绪的 fd
int epfd = epoll_create(1); // 创建 epoll 实例

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev); // 注册感兴趣的事件

struct epoll_event events[MAX_EVENTS];
while (1) {
int n = epoll_wait(epfd, events, MAX_EVENTS, -1); // 等待事件
for (int i = 0; i < n; i++) { // 只处理 n 个就绪事件
handle(events[i].data.fd, events[i].events);
}
}

epoll 三大优势:

  1. 事件驱动:内核只返回就绪事件,无需扫描所有连接(O(1) 复杂度)
  2. 内存效率:内核与用户空间通过 mmap 共享内存,减少数据拷贝
  3. 海量连接:支持数万甚至数十万连接(理论上受系统内存限制)

两种触发模式:

模式 说明 特点 使用建议
水平触发 (LT) 只要状态满足就持续通知 默认模式,不易出错 适合初学者,兼容性好
边缘触发 (ET) 只在状态变化时通知一次 效率更高,必须配合非阻塞 I/O 适合高性能场景,需读完所有数据

边缘触发示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ET 模式必须循环读取直到 EAGAIN
void handle_read_et(int fd) {
char buf[4096];
while (1) {
ssize_t n = read(fd, buf, sizeof(buf));
if (n > 0) {
process(buf, n);
} else if (n == -1) {
if (errno == EAGAIN) break; // 数据读完了
else { perror("read error"); break; }
} else {
close(fd); break; // 连接关闭
}
}
}

2.5 各操作系统实现对比

操作系统 多路复用实现 说明
Linux epoll 最成熟、性能最好
FreeBSD / macOS kqueue 功能类似 epoll,支持更多事件类型
Solaris /dev/poll 类似 epoll
Windows IOCP 真正的异步 I/O(不是多路复用)

三、Reactor 与 Proactor 模式

3.1 Reactor 模式(基于 I/O 多路复用)

Reactor 模式是使用 I/O 多路复用的经典设计模式,广泛应用于高性能网络服务器。

核心思想:

  • 单线程/多线程监听事件
  • 事件发生时回调相应的处理器
  • 应用程序负责实际的 I/O 操作

Reactor 的三种变体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 1. 单 Reactor 单线程(Redis 6.0 之前)
while (1) {
epoll_wait(...);
for (每个就绪事件) {
if (是 accept 事件) accept();
else if (是 read 事件) read() -> process() -> write();
}
}

// 2. 单 Reactor 多线程(Memcached)
while (1) {
epoll_wait(...);
for (每个就绪事件) {
if (是 read 事件) {
read(); // Reactor 线程读取
thread_pool.submit(process); // 业务逻辑交给线程池
}
}
}

// 3. 多 Reactor 多线程(主从模式,Nginx、Netty)
// main_reactor: 专门处理 accept
// sub_reactors: 处理 read/write(每个绑定一个 CPU 核心)

主从 Reactor 架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
     ┌─────────────────────┐
│ Main Reactor │
│ (监听 accept) │
└──────────┬──────────┘
│ accept

┌──────────────────────────┐
│ 负载均衡分发 │
└──────┬──────┬──────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
SubReactor│ SubReactor│ SubReactor│
│ Core 0 │ │ Core 1 │ │ Core N │
└──────────┘ └──────────┘ └──────────┘

3.2 Proactor 模式(基于异步 I/O)

Proactor 模式完全由内核完成 I/O 操作,应用程序只负责处理结果。

核心思想:

  • 发起异步 I/O 操作(如 io_uring)
  • 内核完成 I/O 后将数据放入用户缓冲区
  • 内核通知应用程序处理结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Proactor 模式伪代码
struct io_uring ring;
io_uring_queue_init(256, &ring, 0);

// 提交异步读请求
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);
io_uring_prep_read(sqe, fd, buffer, size, offset);
io_uring_sqe_set_data(sqe, my_callback_data);
io_uring_submit(&ring);

// 等待完成事件
struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);
// 数据已经在 buffer 中,无需应用程序再调用 read
callback(cqe->user_data, cqe->res);
io_uring_cqe_seen(&ring, cqe);

3.3 Reactor vs Proactor 对比

对比维度 Reactor (epoll) Proactor (io_uring)
I/O 操作执行者 应用程序 内核
回调时机 I/O 就绪 I/O 完成
编程复杂度 较高(状态机) 较低(同步风格)
内存拷贝次数 多一次(内核→用户) 直接到用户缓冲区
内核支持 epoll (2002) io_uring (2019)
典型应用 Nginx, Redis, Memcached 新一代存储/网络服务

四、发展历程:从 select 到 io_uring

4.1 完整时间线

1
2
3
4
5
6
7
8
9
10
11
12
13
1983  ── select 诞生(BSD Unix)
1990s ── poll 诞生(System V)
2000 ── kqueue 诞生(FreeBSD)
2001 ── /dev/poll 诞生(Solaris)
2002 ── 🔥 epoll 合并到 Linux 2.6 内核
2002 ── Nginx 诞生(基于 epoll 的 Reactor 模式)
2003 ── libevent 诞生(封装 epoll/kqueue/select)
2007 ── libuv 诞生(Node.js 事件循环核心)
2014 ── seastar 框架诞生(基于 dpdk + 共享无锁)
2019 ── 🔥 io_uring 合并到 Linux 5.1 内核
2020 ── liburing 发布(io_uring 的易用封装)
2022 ── Redis 7.0 引入 io_uring 支持
2024 ── io_uring 成为高性能网络编程新标准

4.2 技术演进脉络

阶段 技术栈 并发能力 编程复杂度 代表产品
1990s select/poll + 多进程 低 (~10³) 简单 Apache HTTP Server
2002 epoll + Reactor 高 (10⁴~10⁵) 中等 Nginx
2010 epoll + 多线程 Reactor 很高 (10⁵) 复杂 Memcached, Redis
2019 io_uring + Proactor 极高 (10⁶) 较低 新一代存储/网络服务

4.3 一个有趣的循环

1
2
3
4
5
6
7
8
9
同步阻塞 I/O(简单,性能差)

为了性能,引入非阻塞 I/O(复杂)

Reactor 模式(基于多路复用)

Proactor 模式(基于异步 I/O

回到同步风格代码(简单,性能强)

本质变化:io_uring 让开发者可以用同步风格编写高性能代码,内核帮你做完了所有事情。


五、epoll 完全指南

5.1 epoll API 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include <sys/epoll.h>

// 1. 创建 epoll 实例
int epoll_create(int size); // 旧版,size 已忽略但需 >0
int epoll_create1(int flags); // 新版,flags = EPOLL_CLOEXEC

// 2. 管理感兴趣的事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// op:
// EPOLL_CTL_ADD - 添加
// EPOLL_CTL_MOD - 修改
// EPOLL_CTL_DEL - 删除

// 3. 等待事件
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
// timeout: -1 永久阻塞 | 0 立即返回 | >0 超时毫秒

5.2 epoll_event 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
struct epoll_event {
uint32_t events; // 事件掩码
epoll_data_t data; // 用户数据
};

typedef union epoll_data {
void *ptr; // 用户自定义指针
int fd; // 文件描述符
uint32_t u32;
uint64_t u64;
} epoll_data_t;

// 常用事件掩码
// EPOLLIN - 可读(包括对端关闭连接)
// EPOLLOUT - 可写
// EPOLLRDHUP - 对端关闭连接(需内核 2.6.17+)
// EPOLLET - 边缘触发
// EPOLLONESHOT - 只触发一次,需重新注册
// EPOLLERR - 错误
// EPOLLHUP - 挂起

5.3 完整服务器示例(Reactor 模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <iostream>
#include <unordered_map>
#include <vector>

class EpollServer {
public:
EpollServer(int port) : port_(port) {}

void start() {
// 1. 创建监听 socket
listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);
set_nonblocking(listen_fd_);

// 2. 设置地址重用
int opt = 1;
setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

// 3. 绑定
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port_);
bind(listen_fd_, (struct sockaddr*)&addr, sizeof(addr));

// 4. 监听
listen(listen_fd_, 1024);

// 5. 创建 epoll
epoll_fd_ = epoll_create1(0);

// 6. 注册监听 socket
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd_;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd_, &ev);

std::cout << "Server started on port " << port_ << std::endl;

// 7. 事件循环
std::vector<struct epoll_event> events(1024);
while (running_) {
int n = epoll_wait(epoll_fd_, events.data(), events.size(), -1);
for (int i = 0; i < n; i++) {
int fd = events[i].data.fd;
if (fd == listen_fd_) handle_accept();
else if (events[i].events & EPOLLIN) handle_read(fd);
else if (events[i].events & EPOLLOUT) handle_write(fd);
else if (events[i].events & (EPOLLERR | EPOLLHUP)) handle_error(fd);
}
}
}

private:
void set_nonblocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

void handle_accept() {
while (true) {
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
int client_fd = accept(listen_fd_, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
perror("accept"); break;
}
set_nonblocking(client_fd);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 边缘触发
ev.data.fd = client_fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &ev);
std::cout << "New connection: " << client_fd << std::endl;
}
}

void handle_read(int fd) {
char buf[4096];
std::string data;
while (true) {
ssize_t n = read(fd, buf, sizeof(buf));
if (n > 0) data.append(buf, n);
else if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
handle_error(fd); return;
} else { handle_close(fd); return; }
}
std::string response = "Echo: " + data;
write_buffers_[fd] = response;
// 改为关注写事件
struct epoll_event ev;
ev.events = EPOLLOUT | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
}

void handle_write(int fd) {
auto it = write_buffers_.find(fd);
if (it == write_buffers_.end()) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
return;
}
const std::string& data = it->second;
size_t sent = 0;
while (sent < data.size()) {
ssize_t n = write(fd, data.c_str() + sent, data.size() - sent);
if (n > 0) sent += n;
else if (n == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
handle_error(fd); return;
}
}
if (sent >= data.size()) {
write_buffers_.erase(it);
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd, &ev);
}
}

void handle_error(int fd) { std::cerr << "Error on fd " << fd << std::endl; handle_close(fd); }
void handle_close(int fd) {
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
write_buffers_.erase(fd);
close(fd);
}

int port_, listen_fd_, epoll_fd_;
bool running_ = true;
std::unordered_map<int, std::string> write_buffers_;
};

int main() {
EpollServer server(8080);
server.start();
return 0;
}

5.4 多线程 Reactor(主从模式)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class SubReactor {
public:
SubReactor(int id) : id_(id) { epoll_fd_ = epoll_create1(0); }

void add_fd(int fd) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev);
}

void run() {
std::vector<struct epoll_event> events(1024);
while (running_) {
int n = epoll_wait(epoll_fd_, events.data(), events.size(), -1);
for (int i = 0; i < n; i++)
handle_event(events[i].data.fd, events[i].events);
}
}

private:
int id_, epoll_fd_;
std::atomic<bool> running_{true};
};

class MainReactor {
public:
MainReactor(int port, int sub_count) : port_(port), sub_count_(sub_count) {}

void start() {
listen_fd_ = create_listen_socket(port_);

// 启动子 Reactor
for (int i = 0; i < sub_count_; i++) {
sub_reactors_.emplace_back(std::make_unique<SubReactor>(i));
std::thread(&SubReactor::run, sub_reactors_.back().get()).detach();
}

// 主 Reactor 只负责 accept + 轮询分发
int epfd = epoll_create1(0);
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_fd_;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd_, &ev);

int next = 0;
std::vector<struct epoll_event> events(1024);
while (true) {
int n = epoll_wait(epfd, events.data(), events.size(), -1);
for (int i = 0; i < n; i++) {
if (events[i].data.fd == listen_fd_) {
int cfd = accept(listen_fd_, nullptr, nullptr);
if (cfd > 0) {
sub_reactors_[next]->add_fd(cfd);
next = (next + 1) % sub_count_;
}
}
}
}
}

private:
int port_, sub_count_, listen_fd_;
std::vector<std::unique_ptr<SubReactor>> sub_reactors_;
};

六、io_uring:Linux 的新一代异步 I/O

6.1 为什么需要 io_uring?

传统 AIO 的缺陷:

  • ❌ 仅支持 O_DIRECT 文件 I/O,不支持网络 socket
  • ❌ 需要单独的信号/回调处理
  • ❌ 性能不佳,用户态与内核态交互频繁

io_uring 的设计目标:

  • ✅ 网络和文件 I/O 的统一接口
  • ✅ 尽可能减少系统调用
  • ✅ 真正的异步,无需用户态轮询
  • ✅ 支持批量提交

6.2 io_uring 核心架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────┐
│ 用户态 │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ SQ (提交队列) │ │ CQ (完成队列) │ │
│ │ ring buffer │ │ ring buffer │ │
│ └──────┬───────┘ └──────▲───────┘ │
│ │ io_uring_enter() │ │
└─────────┼───────────────────────┼───────────────┘
│ │
▼ │
┌─────────────────────────────────────────────────┐
│ 内核态 │
│ ┌─────────────────────────────────────────┐ │
│ │ 异步处理 I/O 请求,完成后写入 CQ │ │
│ └─────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘

关键特性:

  • SQPOLL 模式:内核线程轮询提交队列,无需 io_uring_enter 系统调用
  • IORING_SETUP_IOPOLL:对于 SSD/NVMe,内核主动轮询设备完成
  • Fixed Files/Buffers:预注册文件描述符和缓冲区,减少重复映射

6.3 liburing 服务器完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <liburing.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <iostream>
#include <vector>

// 编译: g++ -o server server.cpp -luring

class IoUringServer {
public:
IoUringServer(int port, int queue_depth = 256)
: port_(port), queue_depth_(queue_depth) {}

void start() {
// 1. 初始化 io_uring
struct io_uring_params params;
memset(&params, 0, sizeof(params));
// 可选:启用 SQPOLL 模式(零系统调用)
// params.flags = IORING_SETUP_SQPOLL;
// params.sq_thread_idle = 2000;

if (io_uring_queue_init_params(queue_depth_, &ring_, &params) < 0) {
perror("io_uring_queue_init_params"); return;
}

// 2. 创建监听 socket
listen_fd_ = create_listen_socket();

// 3. 投递 accept 请求
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);
io_uring_prep_accept(sqe, listen_fd_, nullptr, nullptr, 0);
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(OP_ACCEPT));
io_uring_submit(&ring_);

std::cout << "io_uring server started on port " << port_ << std::endl;

// 4. 事件循环
struct io_uring_cqe *cqe;
while (running_) {
if (io_uring_wait_cqe(&ring_, &cqe) < 0) { perror("wait_cqe"); continue; }

int op = reinterpret_cast<intptr_t>(io_uring_cqe_get_data(cqe));
int res = cqe->res;

if (res < 0) {
std::cerr << "Operation failed: " << -res << std::endl;
io_uring_cqe_seen(&ring_, cqe); continue;
}

if (op == OP_ACCEPT) handle_accept(res);
else if (op == OP_READ) handle_read(cqe);
else if (op == OP_WRITE) handle_write(cqe);

io_uring_cqe_seen(&ring_, cqe);
}
}

private:
struct Connection {
int fd;
std::vector<char> buffer;
std::string write_buffer;
};

enum { OP_ACCEPT = 1, OP_READ = 2, OP_WRITE = 3 };

int create_listen_socket() {
int fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(port_);
bind(fd, (struct sockaddr*)&addr, sizeof(addr));
listen(fd, 1024);
return fd;
}

void handle_accept(int client_fd) {
auto* conn = new Connection();
conn->fd = client_fd;
conn->buffer.resize(4096);

// 投递读请求
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);
io_uring_prep_read(sqe, client_fd, conn->buffer.data(), conn->buffer.size(), 0);
io_uring_sqe_set_data(sqe, encode(OP_READ, conn));
io_uring_submit(&ring_);

// 继续接受新连接
sqe = io_uring_get_sqe(&ring_);
io_uring_prep_accept(sqe, listen_fd_, nullptr, nullptr, 0);
io_uring_sqe_set_data(sqe, reinterpret_cast<void*>(OP_ACCEPT));
io_uring_submit(&ring_);
}

void handle_read(struct io_uring_cqe* cqe) {
auto* conn = decode_conn(cqe);
ssize_t bytes = cqe->res;
if (bytes <= 0) { close(conn->fd); delete conn; return; }

conn->write_buffer = "Echo: " + std::string(conn->buffer.data(), bytes);

struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);
io_uring_prep_write(sqe, conn->fd, conn->write_buffer.c_str(),
conn->write_buffer.size(), 0);
io_uring_sqe_set_data(sqe, encode(OP_WRITE, conn));
io_uring_submit(&ring_);
}

void handle_write(struct io_uring_cqe* cqe) {
auto* conn = decode_conn(cqe);
if (cqe->res < 0) { close(conn->fd); delete conn; return; }

// 写完,继续读
conn->buffer.assign(4096, 0);
struct io_uring_sqe *sqe = io_uring_get_sqe(&ring_);
io_uring_prep_read(sqe, conn->fd, conn->buffer.data(), conn->buffer.size(), 0);
io_uring_sqe_set_data(sqe, encode(OP_READ, conn));
io_uring_submit(&ring_);
}

// 将 op + 指针打包进 user_data
void* encode(int op, Connection* conn) {
return reinterpret_cast<void*>(
(static_cast<intptr_t>(op) << 32) | reinterpret_cast<intptr_t>(conn));
}
Connection* decode_conn(struct io_uring_cqe* cqe) {
return reinterpret_cast<Connection*>(
reinterpret_cast<intptr_t>(io_uring_cqe_get_data(cqe)) & 0xFFFFFFFF);
}

int port_, queue_depth_, listen_fd_;
struct io_uring ring_;
bool running_ = true;
};

6.4 性能优化技巧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 1. 固定缓冲区(Fixed Buffers)——减少内存映射开销
struct iovec iovs[256];
io_uring_register_buffers(&ring, iovs, 256);
io_uring_prep_read_fixed(sqe, fd, buf, size, offset, buf_index);

// 2. 固定文件描述符(Fixed Files)——减少 fget/fput 开销
int fixed_fds[1024];
io_uring_register_files(&ring, fixed_fds, 1024);

// 3. 批量提交——减少 io_uring_enter 调用次数
for (int i = 0; i < 100; i++) {
sqe = io_uring_get_sqe(&ring);
io_uring_prep_read(sqe, ...);
}
io_uring_submit(&ring); // 一次系统调用提交 100 个请求

// 4. SQPOLL 模式——内核线程轮询,接近零系统调用
struct io_uring_params params{};
params.flags = IORING_SETUP_SQPOLL;
params.sq_thread_idle = 2000; // 空闲 2 秒后停止轮询
io_uring_queue_init_params(256, &ring, &params);

6.5 io_uring 适用场景

场景 推荐程度 说明
高性能网络服务器 ⭐⭐⭐⭐⭐ 即将替代 epoll
存储引擎(RocksDB) ⭐⭐⭐⭐⭐ 异步文件 I/O
数据库 ⭐⭐⭐⭐⭐ MySQL 8.0+ 支持
消息队列 ⭐⭐⭐⭐ 持久化日志写入
普通 Web 服务 ⭐⭐⭐ epoll 已足够
低并发场景 ⭐⭐ 过度设计

七、高性能网络库实践

7.1 常见 C++ 网络库对比

I/O 模型 特点 适用场景
libevent Reactor (epoll/kqueue) 轻量级,历史悠久 跨平台需求
libev Reactor 比 libevent 更快,API 更简洁 高性能事件循环
libuv Reactor Node.js 核心,跨平台 异步 I/O 框架
Boost.Asio Proactor(模拟) 现代 C++,跨平台 C++ 项目首选
seastar Reactor + share-nothing 极致性能,每核独立运行 ScyllaDB, Redpanda
workflow 任务流 搜狗开源,自带调度器 通用后端服务

7.2 Boost.Asio 示例(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <asio.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <memory>

using asio::ip::tcp;

class Session : public std::enable_shared_from_this<Session> {
public:
Session(tcp::socket socket) : socket_(std::move(socket)) {}

void start() { do_read(); }

private:
void do_read() {
auto self = shared_from_this();
socket_.async_read_some(asio::buffer(data_, max_length),
[this, self](std::error_code ec, size_t length) {
if (!ec) do_write(length);
});
}

void do_write(size_t length) {
auto self = shared_from_this();
asio::async_write(socket_, asio::buffer(data_, length),
[this, self](std::error_code ec, size_t) {
if (!ec) do_read();
});
}

tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};

class Server {
public:
Server(asio::io_context& ctx, short port)
: acceptor_(ctx, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}

private:
void do_accept() {
acceptor_.async_accept(
[this](std::error_code ec, tcp::socket socket) {
if (!ec) std::make_shared<Session>(std::move(socket))->start();
do_accept();
});
}

tcp::acceptor acceptor_;
};

int main() {
asio::io_context ctx;
Server server(ctx, 8080);

// 多线程运行 io_context(自动负载均衡)
std::vector<std::thread> threads;
for (int i = 0; i < (int)std::thread::hardware_concurrency(); ++i)
threads.emplace_back([&ctx]() { ctx.run(); });

for (auto& t : threads) t.join();
return 0;
}

7.3 启用 io_uring 后端

1
2
# Boost.Asio 1.26+ 支持 io_uring 后端
g++ -DASIO_HAS_IO_URING -o server server.cpp -luring -lpthread

八、性能对比与选型建议

8.1 系统调用开销对比

操作 select/poll epoll io_uring
添加 1000 个 fd O(n) 拷贝 1 次系统调用 批量提交
等待事件 O(n) 扫描 O(1) 返回就绪 批量获取完成
读 1KB 数据 1 次 read 1 次 read 0 次(完成时已在缓冲区)
高并发场景 不可用 良好 优秀

8.2 吞吐量对比(10K 连接,理论参考值)

模型 吞吐量 CPU 占用 延迟
select/poll ~5K req/s 80%
epoll (LT) ~50K req/s 30%
epoll (ET) ~80K req/s 20%
io_uring (SQPOLL) ~120K req/s 15% 极低

8.3 技术选型决策树

1
2
3
4
5
6
是否需要跨平台(Windows/macOS/Linux)?
├── 是 ──→ Boost.Asio / libuv
└── 否
└── 是否需要极致性能(100K+ 连接)?
├── 是 ──→ io_uring + SQPOLL(内核 ≥ 5.10
└── 否 ──→ epoll (ET) + Reactor

8.4 各场景推荐方案

应用场景 推荐技术栈 理由
通用 Web 服务 libevent / libev 成熟稳定,跨平台
C++ 新项目 Boost.Asio 现代 C++,支持 io_uring
极致性能 io_uring + seastar 共享无锁架构
嵌入式/轻量级 原生 epoll 无依赖,可控性强
代理/网关 Nginx / Envoy 已有成熟方案
数据库/存储 io_uring 异步文件 I/O 优势明显

九、常见问题解答

Q1:边缘触发和水平触发怎么选?

水平触发 (LT):不易出错,适合大多数场景。即使一次没读完,下次 epoll_wait 还会通知。

边缘触发 (ET):性能更高,但必须循环读取直到 EAGAIN,且必须使用非阻塞 I/O。适合对性能有极致要求的场景。

Q2:为什么 Redis 6.0 之前单线程也能支撑 10W QPS?

Redis 使用单线程 Reactor 模式,所有操作都在内存中完成,没有锁竞争。单线程避免了上下文切换,性能反而更高。Redis 6.0 引入多线程只是为了处理网络 I/O(因为网络带宽成为瓶颈),核心命令执行仍是单线程。

Q3:io_uring 能否完全替代 epoll?

目前不能,但正在快速追赶:

  • ✅ io_uring 支持网络 socket(Linux 5.5+)
  • ✅ 性能已超越 epoll(特别是高并发场景)
  • ❌ 内核版本要求较高(5.1+,建议 5.10+)
  • ❌ 生态还在建设中(许多老项目未迁移)

Q4:如何调试 epoll/io_uring 程序?

1
2
3
4
5
6
7
8
9
10
11
12
# 查看进程打开的文件描述符
lsof -p <pid>
ls -la /proc/<pid>/fd/

# 查看 epoll 实例详情
cat /proc/<pid>/fdinfo/<epoll_fd>

# 跟踪 epoll 系统调用
strace -e epoll_wait,read,write -p <pid>

# 跟踪 io_uring 系统调用
strace -e io_uring_setup,io_uring_enter,io_uring_register -p <pid>

Q5:C++ 网络编程最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 1. RAII 管理资源
class FdGuard {
int fd_;
public:
explicit FdGuard(int fd) : fd_(fd) {}
~FdGuard() { if (fd_ >= 0) close(fd_); }
FdGuard(const FdGuard&) = delete;
FdGuard(FdGuard&& o) noexcept : fd_(std::exchange(o.fd_, -1)) {}
};

// 2. sendmsg/recvmsg 减少系统调用(scatter-gather I/O)
struct msghdr msg{};
struct iovec iov[2];
iov[0] = { header, header_len };
iov[1] = { data, data_len };
msg.msg_iov = iov;
msg.msg_iovlen = 2;
sendmsg(fd, &msg, 0); // 一次系统调用发送两个缓冲区

// 3. splice/sendfile 零拷贝
int pipefd[2];
pipe(pipefd);
splice(fd_in, nullptr, pipefd[1], nullptr, size, SPLICE_F_MOVE);
splice(pipefd[0], nullptr, fd_out, nullptr, size, SPLICE_F_MOVE);

// 4. SO_REUSEPORT 多进程负载均衡
int opt = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));

附录

A. 核心 API 速查表

API 作用 关键函数
select I/O 多路复用 select(), FD_SET(), FD_ISSET()
poll I/O 多路复用 poll()
epoll Linux 多路复用 epoll_create(), epoll_ctl(), epoll_wait()
io_uring Linux 异步 I/O io_uring_queue_init(), io_uring_submit(), io_uring_wait_cqe()

B. 推荐学习资源

书籍

  • 《Unix 网络编程》卷 1(Stevens)
  • 《Linux 高性能服务器编程》(游双)
  • 《深入理解 Linux 网络》

内核文档

  • Documentation/filesystems/io_uring.txt
  • man epoll, man io_uring

开源项目

C. 系统级调优参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 增加文件描述符限制
ulimit -n 1000000
echo "fs.file-max = 1000000" >> /etc/sysctl.conf

# 优化 TCP 参数
echo "net.core.somaxconn = 32768" >> /etc/sysctl.conf
echo "net.ipv4.tcp_max_syn_backlog = 65536" >> /etc/sysctl.conf
echo "net.ipv4.tcp_tw_reuse = 1" >> /etc/sysctl.conf

# epoll 相关
echo "fs.epoll.max_user_watches = 1000000" >> /etc/sysctl.conf

# 应用更改
sysctl -p

结语:I/O 多路复用技术经历了从 select → epoll → io_uring 的演进。epoll 成熟稳定,io_uring 代表未来。作为 C++ 开发者,掌握 Boost.Asio 是最稳妥的选择——它能根据平台自动选择最优后端,让你的代码同时兼顾可移植性与高性能。


深入浅出 I/O 多路复用与 Proactor/Reactor:从 C10K 到 io_uring 的演进之路
https://www.psnow.sbs/2026/04/17/深入浅出-I-O-多路复用与-Proactor-Reactor:从-C10K-到-io-uring-的演进之路/
作者
Psnow
发布于
2026年4月17日
许可协议