本文为此书中文译本前四章的读书笔记。
https://github.com/xiaoweiChen/CPP-Concurrency-In-Action-2ed-2019
<pthread.h>
<pthread.h>是贴近底层的 POSIX 线程接口:
1 2 3 4 5 6 7 8
| pthread_create():创建一个新的线程。 pthread_join():等待指定的线程退出,并回收其资源。 pthread_detach():将指定的线程设置为分离状态,使其资源在退出时自动回收。 pthread_exit():使当前线程退出,并返回指定的状态值。 pthread_mutex_init():初始化互斥锁。 pthread_mutex_destroy():销毁互斥锁。 pthread_mutex_lock():尝试获取互斥锁,如果已经被其他线程占用,则阻塞等待。 pthread_mutex_unlock():释放互斥锁。
|
<thread>
C++11 提供#include <thread>
头文件支持多线程,对C语言的<pthread.h>
(POSIX线程接口)做了进一步的抽象和封装。新标准 std::thread
提供了更高级的线程实现,使用面向对象、RAII 技术,在对象销毁时自动调用
join()
和 detach()
方法,避免手动管理线程资源和状态,且支持更好的可移植性:
1 2 3 4 5 6 7 8 9 10 11 12
| explicit thread( Callable&& f, Args&&... args ); void join(); void detach(); unsigned int hardware_concurrency() noexcept; bool joinable() const noexcept; thread::id get_id() noexcept; void yield() noexcept;
template< class Rep, class Period > void sleep_for( const std::chrono::duration<Rep, Period>& sleep_duration ); template< class Clock, class Duration > void sleep_until( const std::chrono::time_point<Clock, Duration>& sleep_time );
|
- 向函数传递指针或引用参数时应注意,线程的运行长度可能会超过参数的生命周期;
- 若
std::thread
对象销毁之前仍未决定 join()
或 detach()
,程序会执行 std::terminate()
终止;
- 线程不可加入 / 分离的情况(均为
joinable()
):已经加入/分离、未绑定函数启动;
- 有可能主线程已经结束,新线程构造函数还没执行完,因此可以用
std::ref()
表示左值引用参数,使用 std::move()
移动右值参数;
- 通过
std::move()
转移线程所有权;
- 执行的任务不能有返回值,需要返回值应使用
future
;
- 给一个已绑定线程的变量赋值新线程会造成
std::terminate()
;
- C++17 标准建议但未纳入标准的 RAII 的线程类型封装(C++20 中的
std::jthread
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| class joining_thread { std::thread t; public: joining_thread() noexcept = default;
template<typename Callable,typename ... Args> explicit joining_thread(Callable&& func, Args&& ... args): t(std::forward<Callable>(func), std::forward<Args>(args)...) {}
explicit joining_thread(std::thread t_) noexcept: t(std::move(t_)){}
joining_thread(joining_thread&& other) noexcept: t(std::move(other.t)){}
joining_thread& operator=(joining_thread&& other) noexcept { if(joinable())join(); t = std::move(other.t); return *this; }
joining_thread& operator=(std::thread other) noexcept { if (joinable()) join(); t = std::move(other); return *this; }
~joining_thread() noexcept { if (joinable()) join(); }
void swap(joining_thread& other) noexcept { t.swap(other.t); } std::thread::id get_id() const noexcept{ return t.get_id(); } bool joinable() const noexcept { return t.joinable(); } void join() { t.join(); } void detach() { t.detach(); } std::thread& as_thread() noexcept { return t; } const std::thread& as_thread() const noexcept { return t; } };
|
<chrono>
<chrono>
提供了一系列时间相关类型和函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| typedef std::chrono::time_point<std::chrono::system_clock> system_time_point; typedef std::chrono::time_point<std::chrono::high_resolution_clock> high_res_time_point;
typedef std::chrono::duration<int, std::ratio<1, 1000>> milliseconds; typedef std::chrono::duration<double> seconds;
typedef std::chrono::system_clock system_clock; typedef std::chrono::steady_clock steady_clock; typedef std::chrono::high_resolution_clock high_res_clock;
auto start = std::chrono::system_clock::now(); std::chrono::milliseconds duration = std::chrono::milliseconds(1000); std::this_thread::sleep_for(duration); auto end = std::chrono::system_clock::now(); std::chrono::duration<double> elapsed_seconds = end - start; std::cout << elapsed_seconds.count() << "s\n";
std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds> (ms);
std::chrono::hours std::chrono::minutes std::chrono::seconds std::chrono::milliseconds std::chrono::microseconds std::chrono::nanoseconds
|
<mutex>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| std::mutex::lock() std::mutex::try_lock() std::mutex::unlock()
std::lock(lhs.m, rhs.m); std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
std::scoped_lock lock(mutex1, mutex2, ...);
std::unique_lock<std::mutex> lock(std::mutex)
template <class Callable, class... Args> void call_once(std::once_flag& flag, Callable&& f, Args&&... args);
|
- 在需要多个互斥量时,按固定顺序上锁,最好使用
lock
同时上锁;
- 避免将被保护量暴露给外界传入的可调用对象,避免在持有锁时调用外部代码;
- C++11 已保障
static
变量的线程安全;
- 嵌套锁
recursive_mutex
支持多次上锁,同等次数解锁后方可使用;
- C++17 引入的“读者-写者”锁
shared_mutex
,支持
lock()
和 try_lock()
,使用
shared_lock
获取多个共享锁(读锁),使用
unique_lock
获取一个独占锁(写锁);
- C++14 引入的 shared_timed_lock 在 shared_mutex 基础上还支持了
try_lock_for()
和 try_lock_until()
;
std::time_mutex
和
std::recursive_timed_mutex
支持 try_lock_for()
和 try_lock_until()
;
<condition_variable>
1 2 3 4 5 6
|
std::condition_variable.wait(lock, Callable) std::condition_variable.notify_one() std::condition_variable.notify_all()
|
- 一般持有一个
unique_lock
,先
unlock()
,然后 wait()
阻塞,在被其它线程唤醒后判断 Callable
是否为真。若为真,wait()
返回,重新 lock()
并继续执行;否则 wait()
继续阻塞或等待;
<future>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| std::future<int> answer = std::async(func); cout << answer.get();
std::package_task<int(int, int)> task(func); std::future<int> result = task.get_future(); int sum = result.get();
std::promise<int> p; std::future<int> f = p.get_future(); std::thread t(func, std::ref(p)); int result = f.get(); t.join();
try{ p.set_value(func()); } catch (...) { p.set_exception(std::copy_exception(std::logic_error("func"))); }
std::shared_future<int> sf(std::move(f));
std::experimental::latch done(thread_count); std::vector<std::future<void>> threads(thread_count); threads[i] = std::async(std::launch::async, []{ done.count_down(); }); done.wait();
std::experimental::barrier sync(num_threads); std::vector<thread> threads(num_threads); threads[i] = thread([]{ sync.arrive_and_wait(); do_something(); sync.arrive_and_wait(); }
|
- 可以在
async
的 第一个参数中选择
std::launch::async
使其在独立线程运行或
std::launch::deferred
使其延迟至 wait()
或
get()
时再运行,默认为二者的或;
flex_barrier
有一个额外的构造函数,传入一个函数和线程数量。当所有线程都到达栅栏处,这个函数就由其中一个线程运行。
<atomic>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| std::atomic<int> cnt(0); load(), store() exchange()
bool compare_exchange_weak(T& expected, T desired, std::memory_order success = std::memory_order_seq_cst, std::memory_order failure = std::memory_order_seq_cst);
bool compare_exchange_strong() operator T()
std::atomic_flag flag = ATOMIC_FLAG_INIT; while (flag.test_and_set(std::memory_order_acquire)); do_something(); clear()
|