Abel'Blog

我干了什么?究竟拿了时间换了什么?

0%

C++-11-Thread

概述

C++ 11 标准下对thread库。阅读的书是《深入应用C++11:代码优化与工程级应用》。ISBN-987-7-111-50069-8。第5章。

之前的c++在每个平台下有不同的多线程API接口,如果之前制作多线程时候,需要考虑平台差异性。新标准将其统一了。

  • 线程同步的互斥量
  • 用于线程间通讯的条件变量
  • 线程安全的原子变量、
  • 异步操作的future、promise和task,async等。

线程

线程的创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void f1(int n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread " << n << " executing\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void f2(int& n)
{
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 2 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void test_create_thread()
{
int n = 0;
std::thread t1; // t1 is not a thread
std::thread t2(f1, n + 1); // pass by value
std::thread t3(f2, std::ref(n)); // pass by reference
std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
t2.join();
t4.join();
std::cout << "Final value of n is " << n << '\n';
}

基本用法

1
2
3
4
t.get_id(); // 获取当前线程编号
std::this_thread::sleep_for(std::chrono::milliseconds(10)); // sleep函数
std::cout << std::thread::hardware_concurrency() << std::endl; // 获取硬件CPU个数
std::this_thread::get_id(); // 获取当前的线程;

互斥量

1
2
3
4
std::mutext 独占的互斥量,不支持递归
std::timed_mutext 带超时的独互斥量,不能递归使用
std::recursive_mutex 递归互斥量,不能支持超时
std::recursive_timed_mutex 带超时的递归互斥量

可以写一个例子来尝试。

1
2
3
4
5
6
7
8
9
10
11
12
std::mutex mutex;
std::recursive_mutex r_mutex;
std::lock_guard<std::recursive_mutex> lock(mutex);
// 带超时的互斥量
std::timed_mutex t_mutex;
std::chrono::milliseconds timeout(100);
if (t_mutex.try_lock_for(timeout)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
t_mutex.unlock();
} else {
std::cout << "is timeout." << std::endl;
}

使用 RAII 方式来加锁,解锁。

  • std::lock_guard c++ 11
  • std::unique_lock c++ 11
  • std::shared_lock c++ 11
  • std::scoped_lock c++ 17

能通过在构造的时,指定加锁策略:

策略 描述
std::defer_lock 不请求锁
std::try_to_lock 尝试请求锁,但不阻塞线程,锁不可用立即返回
std::adopt_lock 假设当前线程已经获取了互斥对象的所有权,不会加锁
1
2
3
4
5
std::mutex mt;
std::unique_lock<std::mutex> lck(mt, std::defer_lock);
assert(lck.owns_lock() == false);
lck.lock();
assert(lck.owns_lock() == true);

lock_guard使用的范围比较广,效率也比较高。lock_guard里面存储的mutex是_Mutex& _MyMutex,unique_lock里面使用的_Mutex* _Pmtx。unique_lock可以将所有权转移出去。

条件变量

等待同步机制,它能阻塞一个或者多个线程,一直到收到另外以恶线程发出的通知或超时,才会唤醒当前阻塞的线程。

condition_variable 配合 std::unique_lock 进行 wait 操作。
condition_variable_any 和任意带 lock, unlck 语义的 mutex 搭配起来使用,拥有灵活性,但是效率比 condition_variable 差一些。

工作流程:

  1. 创建一个 mutex 对象;
  2. 线程循环中检查这个条件是否有;
  3. 外部线程调用 notify_once、notify_all 唤起一个线程或者唤起全部线程;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;

void worker_thread()
{
// 等待直至 main() 发送数据
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});

// 等待后,我们占有锁。
std::cout << "Worker thread is processing data\n";
data += " after processing";

// 发送数据回 main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";

// 通知前完成手动解锁,以避免等待线程才被唤醒就阻塞(细节见 notify_one )
lk.unlock();
cv.notify_one();
}

int main()
{
std::thread worker(worker_thread);

data = "Example data";
// 发送数据到 worker 线程
{
std::lock_guard<std::mutex> lk(m);
ready = true;
std::cout << "main() signals data ready for processing\n";
}
cv.notify_one();

// 等候 worker
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return processed;});
}
std::cout << "Back in main(), data = " << data << '\n';

worker.join();
}

原子操作

1
std::atomic<T> value;

call_once/once_flag的使用

为了让某个函数只调用一次,确保多线程初始化一次。就是只有once_flag。

异步操作

promise

在外部创建一个std::promise对象传入线程内部,在外部可以使用promise获取一个future探测是否已经返回了数据。

1
2
3
4
5
std::promise<int> pr;
std::thread t([](std::promise<int>& p) {
p.set_value_at_thread_exit(9); }, std::ref(pr));
std::future<int> f = pr.get_future();
auto r = f.get();

future

future存在三个状态:Deferred 未开始、 Ready 已经操作完成、Timeout 异步操作已经超时。

可调用对象的包装类std::packaged_task。可以直接使用这个类加上lambda表达式,将函数包装,传递给std::thread做一个异步操作。从task包装类中也能获取future对象。

packaged_task

1
2
3
4
std::packaged_task<int(int)> tsk(func);
std::future<int> fut = tsk.get_future();
std::thread(std::move(tsk), 2).detach();
int value = fut.get();

promise和packaged_task有些像,都是为了打通thread和外部的纽带。

std::future和std::shared_future差别是,前者只能std::move,而另外的可以被拷贝。

async

std::async比起std::promise、std::packaged_task加上std::thread更高一层。

函数原型async(std::launch::async|std::launch::deferred,f,args...)

async是马上启动这个异步函数,deferred是挂起,等着外部调用了get、wait之后才会启动函数允许。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

void test_async()
{
std::future<int> f1 = std::async(std::launch::async, []() {
return 8;
});
std::cout << f1.get() << std::endl;
std::future<int> f2 = std::async(std::launch::async, []() {
std::cout << 8 << endl;
});
f2.wait();
std::future<int> f3 = std::async(std::launch::async, []() {
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});

std::cout << "waiting ... \n";
std::future_status status;
do {
status = f3.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
}
else if (status == std::future_status::timeout) {
std::cout << "timeout!\n";
}
else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);

std::cout << "result is " << f3.get() << std::endl;
}

总结

  1. 线程的创造;
  2. 互斥量;
  3. 条件变量,调度多线程;
  4. 原子变量;
  5. call_once 多线程下面只允许一次运行;
  6. future 与 promise 或者 package_task 加上 thread 构建异步线程调用,获取返回值的;
  7. async 函数是更高层的封装,能直接将 promise / package_task + thread 概括掉

参考