C++ Concurrency in Action - SECOND EDITION

本文为此书中文译本前四章的读书笔记。

https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019

<pthread.h>

<pthread.h>是贴近底层的 POSIX 线程接口:

1
2
3
4
5
6
7
8
pthread_create():创建一个新的线程。
pthread_join():等待指定的线程退出,并回收其资源。
pthread_detach():将指定的线程设置为分离状态,使其资源在退出时自动回收。
pthread_exit():使当前线程退出,并返回指定的状态值。
pthread_mutex_init():初始化互斥锁。
pthread_mutex_destroy():销毁互斥锁。
pthread_mutex_lock():尝试获取互斥锁,如果已经被其他线程占用,则阻塞等待。
pthread_mutex_unlock():释放互斥锁。

<thread>

C++11 提供#include <thread> 头文件支持多线程,对C语言的<pthread.h> (POSIX线程接口)做了进一步的抽象和封装。新标准 std::thread 提供了更高级的线程实现,使用面向对象、RAII 技术,在对象销毁时自动调用 join()detach() 方法,避免手动管理线程资源和状态,且支持更好的可移植性:

1
2
3
4
5
6
7
8
9
10
11
12
explicit thread( Callable&& f, Args&&... args ); // 构造函数,参数可以是lambda函数
void join(); // 阻塞等待线程退出并回收资源
void detach(); // 将线程对象与线程执行状态分离,使其独立执行
unsigned int hardware_concurrency() noexcept; // 返回当前系统支持的线程并发数
bool joinable() const noexcept; // 判断线程是否可被加入
thread::id get_id() noexcept; // 获取当前线程 id,等同于在线程内调用 std::this_thread::get_id(),无线程时返回默认值 std::thread::type
void yield() noexcept; // 让出当前线程时间片
// 休眠指定时间或到指定时间点
template< class Rep, class Period >
void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration );
template< class Clock, class Duration >
void sleep_until( const std::chrono::time_point<Clock, Duration>& sleep_time );
  • 向函数传递指针或引用参数时应注意,线程的运行长度可能会超过参数的生命周期;
  • std::thread 对象销毁之前仍未决定 join()detach(),程序会执行 std::terminate() 终止;
  • 线程不可加入 / 分离的情况(均为 joinable() ):已经加入/分离、未绑定函数启动;
  • 有可能主线程已经结束,新线程构造函数还没执行完,因此可以用 std::ref() 表示左值引用参数,使用 std::move() 移动右值参数;
  • 通过 std::move() 转移线程所有权;
  • 执行的任务不能有返回值,需要返回值应使用 future
  • 给一个已绑定线程的变量赋值新线程会造成 std::terminate()
  • C++17 标准建议但未纳入标准的 RAII 的线程类型封装(C++20 中的 std::jthread ):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class joining_thread {
std::thread t;
public:
joining_thread() noexcept = default;

template<typename Callable,typename ... Args>
explicit joining_thread(Callable&& func, Args&& ... args):
t(std::forward<Callable>(func), std::forward<Args>(args)...) {}

explicit joining_thread(std::thread t_) noexcept:
t(std::move(t_)){}

joining_thread(joining_thread&& other) noexcept:
t(std::move(other.t)){}

joining_thread& operator=(joining_thread&& other) noexcept {
ifjoinable())join();
t = std::move(other.t);
return *this;
}

joining_thread& operator=(std::thread other) noexcept {
if (joinable()) join();
t = std::move(other);
return *this;
}

~joining_thread() noexcept {
if (joinable()) join();
}

void swap(joining_thread& other) noexcept { t.swap(other.t); }
std::thread::id get_id() const noexcept{ return t.get_id(); }
bool joinable() const noexcept { return t.joinable(); }
void join() { t.join(); }
void detach() { t.detach(); }
std::thread& as_thread() noexcept { return t; }
const std::thread& as_thread() const noexcept { return t; }
};

<chrono>

<chrono>提供了一系列时间相关类型和函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
typedef std::chrono::time_point<std::chrono::system_clock> system_time_point;
typedef std::chrono::time_point<std::chrono::high_resolution_clock> high_res_time_point;

typedef std::chrono::duration<int, std::ratio<1, 1000>> milliseconds;
typedef std::chrono::duration<double> seconds;

typedef std::chrono::system_clock system_clock;
typedef std::chrono::steady_clock steady_clock;
typedef std::chrono::high_resolution_clock high_res_clock;

auto start = std::chrono::system_clock::now(); // 获取当前时间点
std::chrono::milliseconds duration = std::chrono::milliseconds(1000); // 定义一个时间段
std::this_thread::sleep_for(duration); // 线程休眠指定的时间段
auto end = std::chrono::system_clock::now(); // 获取当前时间点
std::chrono::duration<double> elapsed_seconds = end - start; // 计算时间点之间的差值
std::cout << elapsed_seconds.count() << "s\n"; // 输出时间间隔

std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds> (ms); // 有截断的显式类型转换

std::chrono::hours // 小时
std::chrono::minutes // 分钟
std::chrono::seconds // 秒
std::chrono::milliseconds // 毫秒
std::chrono::microseconds // 微秒
std::chrono::nanoseconds // 纳秒

<mutex>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
std::mutex::lock() // 尝试获得锁,若被占用则阻塞
std::mutex::try_lock() // 尝试获得锁,若被占用则立即返回 false
std::mutex::unlock() // 释放锁
// 不建议使用以上成员函数,因为需要手动保证任何情况下结束都unlock(),以下是标准库提供的 RAII 的模板类

// 使用 lock_guard 对象进行对参数对象的加锁解锁,不需要再手动调用对象的lock()和unlock()
// lock_guard 析构时会自动调用对应对象的解锁,常与 mutex 一同私有定义在加锁类中
// adopt_lock 表示此锁已经被外部线程或外部代码锁定,以便接管它并在该线程退出作用域时自动释放锁
// defer_lock 表示不立即锁定
std::lock(lhs.m, rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
// C++17 支持自动模板类参数推导,以及可以同时为多个互斥量上锁的 scoped_lock
std::scoped_lock lock(mutex1, mutex2, ...);

// unique_lock 可以手动 lock() 和 unlock(),也可以使用 std::lock(ul1, ul2)
std::unique_lock<std::mutex> lock(std::mutex)
// 占用空间稍多,稍慢,但还提供:try_lock(),
// release() 释放所有权但不解锁互斥量, reset() 释放所有权且自动解锁互斥量
// try_lock_for(), try_lock_until() 尝试获得锁,等待一定时间或等到特定时间
// owns_lock()返回是否持有互斥量所有权

template <class Callable, class... Args>
void call_once(std::once_flag& flag, Callable&& f, Args&&... args);

//
// C++17 引入的“读者-写者”锁 shared_mutex,支持 lock() 和 try_lock()
// 使用 shared_lock 获取多个共享锁(读锁),使用 unique_lock 获取一个独占锁(写锁)
// C++14 引入的 shared_timed_lock
  • 在需要多个互斥量时,按固定顺序上锁,最好使用 lock 同时上锁;
  • 避免将被保护量暴露给外界传入的可调用对象,避免在持有锁时调用外部代码;
  • C++11 已保障 static 变量的线程安全;
  • 嵌套锁 recursive_mutex 支持多次上锁,同等次数解锁后方可使用;
  • C++17 引入的“读者-写者”锁 shared_mutex,支持 lock()try_lock(),使用 shared_lock 获取多个共享锁(读锁),使用 unique_lock 获取一个独占锁(写锁);
  • C++14 引入的 shared_timed_lock 在 shared_mutex 基础上还支持了 try_lock_for()try_lock_until()
  • std::time_mutexstd::recursive_timed_mutex 支持 try_lock_for()try_lock_until()

<condition_variable>

1
2
3
4
5
6
// 一般持有一个 unique_lock,先 unlock(),然后 wait() 阻塞,在被其它线程唤醒后判断 Callable 是否为真
// 若为真,wait() 返回,重新 lock() 并继续执行;否则 wait() 继续阻塞或等待
// 伪唤醒次数不可知,故 Callable 不应有副作用
std::condition_variable.wait(lock, Callable)
std::condition_variable.notify_one() // 随机唤醒一个等待的线程
std::condition_variable.notify_all() // 唤醒所有等待线程
  • 一般持有一个 unique_lock,先 unlock(),然后 wait() 阻塞,在被其它线程唤醒后判断 Callable 是否为真。若为真,wait() 返回,重新 lock() 并继续执行;否则 wait() 继续阻塞或等待;

<future>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
std::future<int> answer = std::async(func);
cout << answer.get(); // 阻塞直到得到结果

std::package_task<int(int, int)> task(func);
std::future<int> result = task.get_future();
int sum = result.get();

std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t(func, std::ref(p));
int result = f.get(); // 等待到 func() 函数执行 p.set_value(int) 或 p.set_exception(std::current_exception())
t.join();
// 对应的函数写法:
try{ p.set_value(func()); } catch (...) { p.set_exception(std::copy_exception(std::logic_error("func"))); }

// 可拷贝 future
std::shared_future<int> sf(std::move(f)); // 或 p.get_future() 或 f.share()

// 锁存器,位于 <experimental/latch>,当 count_down() 到 0 之后就绪
std::experimental::latch done(thread_count);
std::vector<std::future<void>> threads(thread_count);
threads[i] = std::async(std::launch::async, []{ done.count_down(); });
done.wait();

// 栅栏,位于 <experimental/barrier>,当所有线程到达后会就绪
// 锁存器不可复用,栅栏可复用,故可用于循环
std::experimental::barrier sync(num_threads);
std::vector<thread> threads(num_threads);
threads[i] = thread([]{ sync.arrive_and_wait(); do_something(); sync.arrive_and_wait(); }
  • 可以在 async 的 第一个参数中选择 std::launch::async 使其在独立线程运行或 std::launch::deferred 使其延迟至 wait()get() 时再运行,默认为二者的或;
  • flex_barrier 有一个额外的构造函数,传入一个函数和线程数量。当所有线程都到达栅栏处,这个函数就由其中一个线程运行。

<atomic>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
std::atomic<int> cnt(0);
load(), store() // 以原子方式读写
exchange() // 交换原子该值与给定的值,返回之前的值
// 以原子方式比较对象值和 expected 值,相等则设置为 desired 并返回 true,否则将 expected 设为当前对象的值并返回 false
bool compare_exchange_weak(T& expected, T desired,
std::memory_order success = std::memory_order_seq_cst,
std::memory_order failure = std::memory_order_seq_cst);
// 参数与 weak 相同,若比较和交换操作失败,则直接将 expected 设为当前对象值以保证强一致性
bool compare_exchange_strong()
operator T() // 类型转换函数

// atomic_flag 只有 true 和 false 两种状态,用于实现自旋锁
std::atomic_flag flag = ATOMIC_FLAG_INIT; // 特定宏定义方式初始化
while (flag.test_and_set(std::memory_order_acquire)); // 若对象被设置,则返回 true,否则设置之并返回 false
do_something();
clear() // 将值设置为 false
  • 本章底层高级操作较多,暂略