std::jthread 全面解析

std::jthread 全面解析

目录

概述

std::jthread 是 C++20 引入的一个新线程类,定义在 <thread> 头文件中。它是对 std::thread 的增强版本,主要添加了自动加入 (joining)协作式取消 (cooperative cancellation) 的功能。

jthread 中的 “j” 代表 “joining”(加入),强调其自动管理线程生命周期的特性。

jthread 的特点

1. 自动加入

jthread 对象被销毁时,如果线程仍然可加入(joinable),它会自动调用 join()。这消除了资源泄漏的风险。

2. 协作式取消机制

提供了停止令牌 (std::stop_token),允许线程在执行过程中检查取消请求,并进行优雅的资源清理。

3. 异常安全

自动加入机制确保了即使在异常情况下也能正确清理线程资源。

基本用法

创建和启动线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <thread>
#include <chrono>

void worker() {
for (int i = 0; i < 5; ++i) {
std::cout << "Working... " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}

int main() {
std::jthread t(worker); // 线程立即启动

// 主线程继续执行其他任务
std::this_thread::sleep_for(std::chrono::seconds(1));

return 0; // t 自动 join()
}

传递参数

1
2
3
4
5
6
7
8
void worker_with_args(int id, const std::string& name) {
std::cout << "Thread " << id << " - " << name << " started" << std::endl;
}

int main() {
std::jthread t(worker_with_args, 1, "Worker");
return 0;
}

使用 Lambda 表达式

1
2
3
4
5
6
7
int main() {
std::jthread t([](int x) {
std::cout << "Lambda thread with x=" << x << std::endl;
}, 42);

return 0;
}

与 std::thread 的区别

特性 std::thread std::jthread
自动加入 ❌ 必须手动调用 join() 或 detach() ✅ 自动在析构时 join()
协作式取消 ❌ 无内置支持 ✅ 通过 stop_token 提供
资源安全 ❌ 可能忘记 join() 导致资源泄漏 ✅ 自动管理生命周期
C++版本 C++11 C++20

停止令牌 (std::stop_token)

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void worker_with_stop(std::stop_token stoken) {
while (!stoken.stop_requested()) {
std::cout << "Working..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
std::cout << "Thread stopped gracefully" << std::endl;
}

int main() {
std::jthread t(worker_with_stop);

std::this_thread::sleep_for(std::chrono::seconds(1));

// 请求停止
auto stop_source = t.get_stop_source();
stop_source.request_stop();

return 0;
}

方法列表

1
2
3
4
5
// 检查是否已请求停止
bool stop_requested() const noexcept;

// 检查是否可停止
bool stop_possible() const noexcept;

停止源 (std::stop_source)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int main() {
std::jthread t([](std::stop_token st) {
while (!st.stop_requested()) {
// 工作代码
}
});

// 获取停止源并请求停止
auto source = t.get_stop_source();

// 检查是否已请求停止
if (!source.stop_requested()) {
source.request_stop();
}

return 0;
}

取消回调 (std::stop_callback)

1
2
3
4
5
6
7
8
9
10
11
void worker_with_callback(std::stop_token token) {
// 注册停止回调
std::stop_callback callback(token, [] {
std::cout << "Cleanup before stopping..." << std::endl;
});

while (!token.stop_requested()) {
// 工作代码
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

实际应用场景

1. 后台任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class BackgroundProcessor {
private:
std::jthread worker_;

void process_loop(std::stop_token token) {
while (!token.stop_requested()) {
// 处理任务
auto task = get_next_task();
if (task) process_task(*task);

std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
cleanup();
}

public:
BackgroundProcessor() : worker_(&BackgroundProcessor::process_loop, this) {}

~BackgroundProcessor() {
// 自动停止并加入
}

void request_stop() {
worker_.request_stop();
}
};

2. 定时器/周期任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PeriodicTask {
std::jthread timer_thread_;

public:
void start() {
timer_thread_ = std::jthread([this](std::stop_token token) {
while (!token.stop_requested()) {
perform_task();

// 等待直到下一次执行或停止请求
for (int i = 0; i < 10 && !token.stop_requested(); ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
});
}

void stop() {
timer_thread_.request_stop();
}
};

3. 并行数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void parallel_process(std::vector<int>& data) {
const size_t num_threads = std::thread::hardware_concurrency();
std::vector<std::jthread> threads;

auto worker = [&data](std::stop_token token, size_t start, size_t end) {
for (size_t i = start; i < end && !token.stop_requested(); ++i) {
process_item(data[i]);
}
};

size_t chunk_size = data.size() / num_threads;
for (size_t i = 0; i < num_threads; ++i) {
size_t start = i * chunk_size;
size_t end = (i == num_threads - 1) ? data.size() : start + chunk_size;
threads.emplace_back(worker, start, end);
}

// 所有线程自动加入
}

注意事项

1. 停止是协作式的

1
2
3
4
5
6
7
8
9
10
11
12
13
// 错误示例:线程不会自动停止
void worker(std::stop_token token) {
while (true) { // 没有检查 stop_token
// 工作代码
}
}

// 正确示例:定期检查停止请求
void worker_correct(std::stop_token token) {
while (!token.stop_requested()) {
// 工作代码
}
}

2. 资源清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ResourceHolder {
std::unique_ptr<int> resource;

public:
~ResourceHolder() {
// 确保在停止时清理资源
}
};

void worker_with_resource(std::stop_token token) {
ResourceHolder holder;

while (!token.stop_requested()) {
// 使用资源
}
// holder 自动清理
}

3. 性能考虑

  • 定期检查 stop_token 会带来轻微的性能开销
  • 检查频率应根据具体场景调整

4. 线程安全

  • std::stop_tokenstd::stop_source 是线程安全的
  • 多个线程可以安全地请求停止

示例代码

完整的实际示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <atomic>

class WorkerPool {
private:
std::vector<std::jthread> workers_;
std::atomic<int> processed_count_{0};

void worker_func(int id, std::stop_token token) {
std::cout << "Worker " << id << " started" << std::endl;

// 注册清理回调
std::stop_callback cleanup(token, [id] {
std::cout << "Worker " << id << " cleaning up..." << std::endl;
});

while (!token.stop_requested()) {
// 模拟工作
std::this_thread::sleep_for(std::chrono::milliseconds(100));

// 处理任务
int count = ++processed_count_;
if (count % 10 == 0) {
std::cout << "Worker " << id << " processed " << count << " tasks" << std::endl;
}
}

std::cout << "Worker " << id << " stopped" << std::endl;
}

public:
void start(size_t num_workers) {
for (size_t i = 0; i < num_workers; ++i) {
workers_.emplace_back(&WorkerPool::worker_func, this, i);
}
}

void stop_all() {
std::cout << "Stopping all workers..." << std::endl;
for (auto& worker : workers_) {
worker.request_stop();
}
workers_.clear(); // 等待所有线程完成
}

~WorkerPool() {
if (!workers_.empty()) {
stop_all();
}
}
};

int main() {
std::cout << "=== std::jthread 示例程序 ===" << std::endl;

WorkerPool pool;
pool.start(4); // 启动4个工作线程

// 让线程运行一段时间
std::this_thread::sleep_for(std::chrono::seconds(3));

// 停止所有线程
pool.stop_all();

std::cout << "程序结束" << std::endl;
return 0;
}

编译命令

1
2
3
4
# 需要支持 C++20 的编译器
g++ -std=c++20 -pthread example.cpp -o example
# 或
clang++ -std=c++20 -pthread example.cpp -o example

总结

std::jthread 是 C++20 中一个重要的并发工具,它解决了 std::thread 的一些痛点:

  1. 自动资源管理:无需手动调用 join(),避免资源泄漏
  2. 优雅停止机制:提供协作式取消,便于线程的优雅退出
  3. 更好的异常安全:自动加入确保异常安全
  4. 现代API设计:整合了停止令牌等现代并发概念

对于新的 C++ 项目,推荐使用 std::jthread 替代 std::thread,除非有特定的性能考量或需要与现有代码兼容。


std::jthread 全面解析
https://www.psnow.sbs/2025/12/29/std-jthread-全面解析/
作者
Psnow
发布于
2025年12月29日
许可协议