Pine 源码阅读笔记

Pine 是一个使用 C++17 编写的高并发框架,支持 Linux / MacOS 跨平台;它也是 30 天自制 C++ 服务器(目前更至第十六天)的配套代码。

本文为 Github 源码 阅读笔记。

前置技能:

  • Modern C++ (包括并行库);
  • 网络编程基础
    • 《Linux 高性能服务器编程》—— 游双:服务器编程必读;
    • 《Linux 多线程服务端编程:使用 muduo C++ 网络库》—— 陈硕:经典网络库 muduo 作者,本框架采用类 muduo 架构;
    • 《C++ 服务器开发精髓》—— 张远龙:类似以上两本书的组合,编写年份较新,对部分技术做了相应更新;
    • 《UNIX网络编程卷1:套接字联网API》、《UNIX网络编程卷2:进程间通信》:可以作为字典书;

框架介绍

  • 程序采用类 muduo 结构,one loop per thread,多 Reactor 多线程模式,支持同步 / 异步 Socket I/O(仅代表 Socket 无操作时是否立即返回,本质均为同步 IO,均在应用进程中占用 CPU 时间);常使用 ET + 非阻塞 Socket IO 模式;
  • Server 是核心类,持有 main-Reactor、sub-Reactors、Acceptor、Connection、线程池;
  • Socket 为 socket 底层操作的封装,Poller 为对一系列 fd 及其 epoll 相关操作的封装(对每个管理的 fd 持有其对应的 Channel),Channel 为对每个文件描述符的具体操作的封装;Buffer 为 I/O 缓冲区的封装;
  • 每个事件循环 EventLoop 与一个 Poller 对应,支持随时增删需要管理的 fd;
  • main-Reactor 对应 Acceptor,在主线程中执行;
  • 每个 Connection 维护一个连接,随机将其分配给一个 sub-Reactor 持有的事件循环;
  • 每个 Reactor 即一个事件循环;线程池中每个线程对应一个 sub-Reactor,从有锁任务队列中取出一个 task;每个 task 与一个事件循环对应;
  • 可能存在某时刻一个线程恰好完成了它的事件循环维护的所有连接,也没有再给它分配新的连接;此时它可以再从有锁任务队列中取一个 task;但由于目前设计为一个线程对应一个 task,因此实际上它会立即返回;
  • 任务、事件循环、Reactor、线程、Poller 在此框架中一一对应;

各模块分析

Socket

  • 直接持有一个文件描述符 fd;
  • 支持设置 fd,获取 addr,设置 Socket I/O 阻塞模式,获取缓冲区中可读取字节数,socket()、bind()、listen()、accept()、connect() 的函数封装;

Channel

  • 直接持有 fd,监听事件功能二进制状态,准备事件功能二进制状态,是否存在,由 Acceptor 与 Connection 指定的读回调、写回调;以裸指针形式指向其所在事件循环;
  • 支持对直接持有的成员进行设置;根据准备事件二进制状态执行对应回调;

Poller

  • 直接持有一个 fd 和一个 epoll_event 指针(Linux)或一个 kevent 指针(MacOS);
  • 支持创建 epoll,拉取 epoll 并相应设置 Channel 二进制状态(返回有更新的 Channel 序列),根据 Channel 注册、删除事件(同时删除 Channel);

Buffer

  • 直接持有一个 std::string 缓冲区,封装了对它的赋值、取长度、清空操作;

EventLoop

  • unique_ptr 形式持有一个 Poller;
  • 支持无限循环地执行 epoll 中有更新的各个 Channel 的回调,以及更新与删除某个 Channel;

ThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// ThreadPool.h
class ThreadPool {
public:
DISALLOW_COPY_AND_MOVE(ThreadPool);
explicit ThreadPool(unsigned int size = std::thread::hardware_concurrency());
~ThreadPool();

template <class F, class... Args>
auto Add(F &&f, Args &&...args) -> std::future<typename std::invoke_result<F, Args...>::type>;

private:
std::vector<std::thread> workers_; // 每个 worker 持有一个 lambda 函数,循环处理每个 task,某一时刻发现队列空则返回
std::queue<std::function<void()>> tasks_; // 每个 task 持有一个 package_task,在 worker 中的循环中被调用
std::mutex queue_mtx_; // 对 task 队列的互斥锁
std::condition_variable queue_cv_; // 对 task 队列的条件变量
std::atomic<bool> stop_{false}; // 是否停止该线程池运行,析构时直接设为 true 并逐个 worker 调用 join()
};

// 不能放在cpp文件,C++编译器不支持模版的分离编译
template <class F, class... Args>
auto ThreadPool::Add(F &&f, Args &&...args) -> std::future<typename std::invoke_result<F, Args...>::type> {
using return_type = typename std::invoke_result<F, Args...>::type;

auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mtx_);

// don't allow enqueueing after stopping the pool
if (stop_) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}

tasks_.emplace([task](){ (*task)(); });
}
queue_cv_.notify_one();
return res;
}

// ThreadPool.cpp
ThreadPool::ThreadPool(unsigned int size) {
for (unsigned int i = 0; i < size; ++i) {
workers_.emplace_back([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mtx_);
queue_cv_.wait(lock, [this]() { return stop_ || !tasks_.empty(); });
if (stop_ && tasks_.empty()) {
return;
}
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}

ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mtx_);
stop_ = true;
}
queue_cv_.notify_all();
for (std::thread &th : workers_) {
if (th.joinable()) {
th.join();
}
}
}

Acceptor

  • 每个 Acceptor 以 unique_ptr 形式持有一个 Socket 和一个 Channel 用于管理监听端口行为,直接持有一个由 Server 指定的连接回调 new_connection_callback();
  • 构造时,构造一个新的 Socket 绑定到固定端口 127.0.0.1:1234 并开启监听,并为其构造一个对应 Channel,将自己的连接函数 AcceptConnection()(包含接受客户端连接操作和 new_connection_callback())设为 Channel 的读回调;

Connection

  • 每个 Connection 以 unique_ptr 形式持有一个 Socket 和一个 Channel,以 unique_ptr 形式持有读缓冲和写缓冲;直接持有由 Server 指定的一个 delete_connection() 回调和一个 on_recv() 回调;持有一个状态:非法、连接中、已连接、已关闭四个之一;
  • 构造时新建一个 Channel 并将 Server 指定的 fd 与 EventLoop 指定给新 Channel,开启 ET 读;
  • 支持阻塞 / 非阻塞连接读 / 写;

TcpServer

  • unique_ptr 形式持有一个 main-Reactor(即 EventLoop)、多个 sub-Reactors、一个 Acceptor、一个 ThreadPool;
  • 直接持有一个 map<int, unique_ptr>,以及由程序员指定的 on_connect() 与 on_recv() 回调;