TaskFlow = DAG + WorkStealing
前言
本文首发于个人博客:🔗Wang Junwei’s Blog : TaskFlow + WorkStealingl
最开始使用学习brpc的时候,总听人提起大名鼎鼎的 bthread ,也在之前的工作实践中用到过bthread线程池, M:N 的线程库能在多核应用中提供更好的scalability 和 cache locality。在C++微服务中能够支持高并发低延迟的任务并发调度。
线程池只是任务简单的并发,各个task之间互相独立运行。
在系统设计中,很多时候会遇到有依赖的逻辑,这个时候就需要task之间有一定的依赖关系;
我的前同事yc大佬,是非常资深的C程序员,从C 98时代走来,编码技术很强。在前司做召回引擎的时候,他自己写了一个C++ DAG框架,koi2,基于bthread,维护了一套Node执行框架,还支持context传递的时候约束成员的可见性。可惜在职期间没有好好地学习一下源代码。
最近周末没事干阅读了一下taskflow 的源代码,就浅浅地记录一下吧。
TaskFlow Overview
Taskflow 是一个header-only的轻量级图计算框架 (Task Graph Computing System)。
支持如下graph
Static
Dynamic
Composable
Conditional
Heterogeneous
Dive into the src
tf::Node
tf::Node::{Static,Dynamic,Condition…}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 struct Dynamic { template <typename C> Dynamic (C&& c) : work{std::forward<C>(c)} {} std::function<void (Subflow&)> work; Graph subgraph; }; struct Dynamic { template <typename C> Dynamic (C&&) : work{std::forward<C>(c)} {} std::function<void (Subflow&)> work; Graph subgraph; }; struct Condition { template <typename C> Condition (C&&) : work{std::forward<C>(c)} {} std::variant< std::function<int ()>, std::function<int (Runtime&)> > work; }; struct MultiCondition { template <typename C> MultiCondition (C&&) : work {std::forward<C>(c)} {} std::variant< std::function<SmallVector<int >()>, std::function<SmallVector<int >(Runtime&)> > work; }; struct Module { template <typename T> Module (T&) : graph{ obj.graph () } {} Graph& graph; }; struct DependentAsync { template <typename C> DependentAsync (C&&) : work {std::forward<C>(c)} {} std::variat< std::function<void ()>, std::function<void (Runtime&)> > work; std::atomic<size_t > use_count {1 }; std::atomic<AsyncState> state {AsyncState::UNFINISHED}; }; using handle_t = std::variant< Placeholder, Static, Dynamic, Condition, MultiCondition, Module, Async, DependentAsync >;
这里面的task的ctor入参都是Callable
的类型,完美转发给work
对象初始化。
tf::Node
Node 是taskflow里面最重要的一个数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 class Node { public : template <typename ... Args> Node (const std::string& name , unsigned priority, Topology* topology, Node* parent, size_t join_counter, Args&&... args) : _name {name}, _priority {priority}, _topology {topology}, _parent {parent}, _join_counter {join_counter}, _handle {std::forward<Args>(args)...} { } private : std::string _name; unsigned _priority {0 }; Topology* _topology {nullptr }; Node* _parent {nullptr }; void * _data {nullptr }; SmallVector<Node*> _successors; SmallVector<Node*> _dependents; std::atomic<int > _state {0 }; std::atomic<size_t > _join_counter {0 }; std::unique_ptr<Semaphores> _semaphores; handle_t _handle; void _precede(Node* v) { _successors.push_back (v); v->_dependents.push_back (this ); } void _set_up_join_counter(); bool _is_cancelled() const ; bool _is_conditioner() const ; bool _acquire_all(SmallVector<Node*>&); };
Node是图里面最小的执行单元,通过_successors
和_dependents
来表达图里面各个节点的连接关系。学过图论的都知道,DAG一般是一个邻接链表形式的有向图。这里通过SmallVector
来避免指针跳转导致的cache unfriendly
.
tf::Task
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Task { public : Task (Node* node) : _node {node} {} template <typename ... Ts> Task& precede (Ts&&... tasks) ; template <typename ... Ts> Task& succeed (Ts&&... tasks) ; Task& release (Semaphore& semaphore) ; Task& acquire (Semaphore& semaphore) ; Task& priority (TaskPriority p) ; TaskPriority priority () const ; private : Node* _node{nullptr }; };
tf::Task 对图里的Node* 进一步做了一些封装, 对外提供了
precede / succed
work
composed_of
release / acquire
priority
等方法,可以简单的理解为对tf::Node* 的一个wrapper。tf::Task 非常轻量级,甚至只有一个Node* 指针,可以trivially copy
TrivialClass
“Trivially copyable” 是 C++ 的一个特性,它是一个用于描述类型的属性。一个类型如果是 trivially copyable,意味着它可以通过 std::memcpy
这样的底层内存拷贝操作进行复制,而不需要调用任何特殊的拷贝构造函数或者析构函数。
一个trivial class 必须支持以下条件:
拷贝构造函数和拷贝赋值运算符是 trivial。
类型没有虚函数。
类型没有虚基类。
类型的基类也是 trivially copyable。
所有非静态成员都是 trivially copyable。
类型的析构函数是 trivial 或者是 deleted。
tf:TaskFlow
tf::Graph
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class Graph { public : Graph (Graph&& other) : _nodes {std::move (other._nodes)} {} Graph& operator = (Graph&& other) { _clear(); _nodes = std::move (other._nodes); return *this ; } private : void _clear_detached() { auto mid = std::partition (_nodes.begin (), _nodes.end (), [] (Node* node) { return !(node->_state.load (std::memory_order_relaxed) & Node::DETACHED); }); for (auto itr = mid; itr != _nodes.end (); ++itr) { node_pool.recycle (*itr); } _nodes.resize (std::distance (_nodes.begin (), mid)); } template <typename ...ArgsT> Node* _emplace_back(ArgsT&&... args) { _nodes.push_back (node_pool.animate (std::forward<ArgsT>(args)...)); return _nodes.back (); } private : std::vector<Node*> _nodes; };
graph是计算图中Node存储的实体,具体存储在 _nodes
的一维数组里面,Node与Node的之间的因果/依赖关系体现在Node*的_successors
和 _dependents
成员中。
tf::FlowBuilder
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class FlowBuilder { friend class Executor ; public : template <typename C, std::enable_if_t <is_static_task_v<C>, void >*> Task emplace (C&& c) { return Task (_graph._emplace_back("" , 0 , nullptr , nullptr , 0 , std::in_place_type_t <Node::Static>{}, std::forward<C>(c) )); } template <typename ... C, std::enable_if_t <(sizeof ...(C)>1 ), void >*> auto FlowBuilder::emplace (C&&... cs) { return std::make_tuple (emplace (std::forward<C>(cs))...); } void erase (Task task) ; template <typename T> Task composed_of (T& object) ; private : Graph& _graph; };
FlowBuilder可以理解为Builder Pattern(建造者模式),不直接new Node()
操作,而是通过FlowBuilder::emplace()
结构来构建每一个节点, 而节点与节点之间的依赖关系,由tasks之间单独定义依赖与被依赖关系;
除此之外,提供了计算图编辑的功能,通过earse() 和 composed_of() 等方法调整图.
tf::TaskFlow
1 2 3 4 5 6 7 8 9 10 class Taskflow : public FlowBuilder { public : Taskflow (const std::string& name) : FlowBuilder {_graph}, _name {name} {} private : mutable std::mutex _mutex; std::string _name; Graph _graph; std::queue<std::shared_ptr<Topology>> _topologies; std::optional<std::list<Taskflow>::iterator> _satellite; };
Taskflow 是对图及图的执行控制的最后一个描述,加入了一些meta属性;也是调度的参数,携带了一个局部mutex🔒来多线程同步。
其中_topologies
和 _satellite
等是executor调度执行时的参与者,记录了一些运行时的上下文。
tf::Executor
executor 是最核心的调度引擎,根据tf::Workflow的图结构,结合多核CPU架构,用线程池来调度整张图。
tf::Executor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 class Executor { public : explicit Executor (size_t N = std::thread::hardware_concurrency()) : _MAX_STEALS { ((N+1 ) << 1 )}, _threads {N}, _workers {N}, _notifier {N} { if (N == 0 ) { TF_THROW ("no cpu workers to execute taskflows" ); } _spawn(N); if (has_env (TF_ENABLE_PROFILER)) { TFProfManager::get ()._manage(make_observer <TFProfObserver>()); } } tf::Future<void > run (Taskflow& taskflow) { return run_n (f, 1 , [](){}); } template <typename C> run_n (Taskflow& f, size_t repeat, C&& c) { return run_until ( f, [repeat]() mutable { return repeat-- == 0 ; }, std::forward<C>(c) ); } template <typename P, typename C> tf::Future<void > run_until (Taskflow& taskflow, P&& pred, C&& callable) { _increment_topology(); bool empty; { std::lock_guard<std::mutex> lock (f._mutex) ; std::cout << "taskflow.size = " << f.num_tasks () << std::endl; empty = f.empty (); } if (empty || p ()) { c (); std::promise<void > promise; promise.set_value (); _decrement_topology(); return tf::Future <void >(promise.get_future ()); } auto t = std::make_shared <Topology>(f, std::forward<P>(p), std::forward<C>(c)); tf::Future<void > future (t->_promise.get_future(), t) ; { std::lock_guard<std::mutex> lock (f._mutex) ; f._topologies.push (t); std::cout << "f._topologies = " << f._topologies.size () << std::endl; if (f._topologies.size () == 1 ) { _set_up_topology(_this_worker(), t.get ()) } } } void _set_up_topology(Worker* worker, Topology* tpg) { tpg->_sources.clear (); tpg->_taskflow._graph._clear_detached(); _set_up_graph(tpg->_taskflow._graph, nullptr , tpg, 0 , tpg->_sources); tpg->_join_counter.store (tpg->_sources.size (), std::memory_order_relaxed); if (worker) { _schedule(*worker, tpg->_sources); } else { _schedule(tpg->_sources); } } void _set_up_graph( Graph& g, Node* parent, Topology* tpg, int state, SmallVector<Node*>& src ) { for (auto node : g._nodes) { node->_topology = tpg; node->_parent = parent; node->_state.store (state, std::memory_order_relaxed); if (node->num_dependents () == 0 ) { src.push_back (node); } node->_set_up_join_counter(); } } private : const size_t _MAX_STEALS; std::mutex _wsq_mutex; std::mutex _taskflows_mutex; std::condition_variable _topology_cv; std::mutex _topology_mutex; size_t _num_topologies {0 }; std::unordered_map<std::thread::id, size_t > _wids; std::vector<std::thread> _threads; std::vector<Worker> _workers; std::list<Taskflow> _taskflows; Notifier _notifier; TaskQueue<Node*> _wsq; std::atomic<bool > _done {0 }; std::unordered_set<std::shared_ptr<ObserverInterface>> _observers; };
线程生成循环:
使用 for
循环迭代生成 N
个线程,其中 id
表示线程的唯一标识。
在每次循环中,为每个线程分配一个唯一的 _id
,_vtm
,指向当前执行器的指针 _executor
,以及一个指向当前线程等待器的指针 _waiter
。
在循环中,通过 std::thread
创建一个新的线程,并传递一个 Lambda 表达式作为线程的执行体。
Lambda 表达式(线程函数):
在 Lambda 表达式中,首先将当前线程的 _thread
设置为指向 _threads
数组中相应线程的指针。
使用 std::scoped_lock
锁定互斥锁 mutex
,然后将当前线程的 ID 映射到 _wids
容器中。
输出当前线程的 ID 和已生成的线程数量。
如果已生成的线程数量达到了总线程数量 N
,通过 cond.notify_one()
唤醒等待的线程。
任务执行循环:
在进入无限循环后,首先通过 _exploit_task
函数处理任务(这部分的实现在代码中没有展示)。
然后通过 _wait_for_task
函数等待新的任务。如果 _wait_for_task
返回 false
,表示线程需要退出,就会跳出循环。
等待所有线程生成:
在生成完所有线程后,通过 std::unique_lock
锁定 mutex
,然后使用 cond.wait
等待条件满足。
条件是 lambda 表达式 [&](){ return n==N; }
,即等待直到已生成的线程数量达到总线程数量 N
。
通过互斥锁和条件变量实现线程同步,确保所有线程都已经生成后再继续执行后续逻辑。
tf::Executor::_invoke()
_invoke() 是Node/Task的callee,下面看一下源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 void Executor::_invoke(Worker& worker, Node* node) { while (!(node->_state.load (std::memory_order_acquire) & Node::READY)); begin_invoke: SmallVector<int > conds; if (node->_is_cancelled()) { if (node = _tear_down_invoke(worker, node); node) { goto invoke_successors; } return ; } if (node->_semaphores && !node->_semaphores->to_acquire.empty ()) { SmallVector<Node*> nodes; if (!node->_acquire_all(nodes)) { _schedule(worker, nodes); return ; } node->_state.fetch_or (Node::ACQUIRED, std::memory_order_release); } switch (node->_handle.index ()) { case Node::STATIC:{ _invoke_static_task(worker, node); } break ; default : break ; } invoke_successors: if (node->_semaphores && !node->_semaphores->to_release.empty ()) { _schedule(worker, node->_release_all()); } if ((node->_state.load (std::memory_order_relaxed) & Node::CONDITIONED)) { node->_join_counter.fetch_add (node->num_strong_dependents (), std::memory_order_relaxed); } else { node->_join_counter.fetch_add (node->num_dependents (), std::memory_order_relaxed); } auto & j = (node->_parent) ? node->_parent->_join_counter : node->_topology->_join_counter; worker._cache = nullptr ; auto max_p = static_cast <unsigned >(TaskPriority::MAX); switch (node->_handle.index ()) { case Node::CONDITION: case Node::MULTI_CONDITION: { for (auto cond : conds) { if (cond >= 0 && static_cast <size_t >(cond) < node->_successors.size ()) { auto s = node->_successors[cond]; s->_join_counter.store (0 , std::memory_order_relaxed); j.fetch_add (1 , std::memory_order_relaxed); if (s->_priority <= max_p) { if (worker._cache) { _schedule(worker, worker._cache); } worker._cache = s; max_p = s->_priority; } else { _schedule(worker, s); } } } } break ; default : { for (size_t i=0 ; i<node->_successors.size (); ++i) { if (auto s = node->_successors[i]; s->_join_counter.fetch_sub (1 , std::memory_order_acq_rel) == 1 ) { j.fetch_add (1 , std::memory_order_relaxed); if (s->_priority <= max_p) { if (worker._cache) { _schedule(worker, worker._cache); } worker._cache = s; max_p = s->_priority; } else { _schedule(worker, s); } } } } break ; } if (node = _tear_down_invoke(worker, node); node) { goto invoke_successors; } if (worker._cache) { node = worker._cache; goto begin_invoke; } }
Memory Synchronization:
1 while(!(node ->_state .load(std::memory_order_acquire) & Node ::READY ));
这一行使用循环等待的方式,确保所有与任务节点状态相关的内存操作都已完成。Node::READY
表示任务节点已准备就绪。
Cancellation Check:
1 2 3 4 5 6 if(node ->_is_cancelled ()) { if(node = _tear_down_invoke (worker, node ); node ) { goto invoke_successors; } return; }
检查任务节点是否被取消,如果是则执行相应的取消操作。如果任务节点已取消,则调用 _tear_down_invoke
函数,该函数执行取消的相关清理工作,然后跳转到 invoke_successors
标签。
Semaphore Acquisition:
1 2 3 4 5 6 7 8 if(node ->_semaphores && !node ->_semaphores- >to_acquire.empty()) { SmallVector<Node*> nodes; if(!node ->_acquire_all (nodes)) { _schedule(worker, nodes); return; } node ->_state .fetch_or(Node ::ACQUIRED , std::memory_order_release); }
如果任务节点具有相关的信号量(semaphores),并且需要获取信号量,那么尝试获取这些信号量。如果无法获取,将相关的节点放入调度队列,并返回。
Task Invocation Based on Type:
1 2 3 switch (node->_handle.index() ) { }
使用 switch
语句根据任务类型执行相应的任务调度操作。在代码中展示了针对不同任务类型的 case
,但这些情况的具体实现在注释中没有展示出来。
Release Semaphores:
1 2 3 if(node ->_semaphores && !node ->_semaphores- >to_release.empty()) { _schedule(worker, node ->_release_all ()); }
释放任务节点相关的信号量。如果有需要释放的信号量,将相应的节点放入调度队列。
Join Counter Adjustment:
1 2 3 4 5 6 if((node ->_state .load(std::memory_order_relaxed) & Node ::CONDITIONED )) { node ->_join_counter .fetch_add(node ->num_strong_dependents (), std::memory_order_relaxed); } else { node ->_join_counter .fetch_add(node ->num_dependents (), std::memory_order_relaxed); }
调整任务节点的加入(join)计数器。这是与条件任务相关的操作。
Invoke Successors Based on Priority:
1 2 3 auto & j = (node-> _parent ) ? node-> _parent -> _join_counter : node -> _topology -> _join_counter;
根据后继节点的优先级调度它们的执行。
Teardown and Tail Recursion Elimination:
1 2 3 4 5 6 7 if(node = _tear_down_invoke (worker, node ); node ) { goto invoke_successors; } if(worker._cache) { node = worker ._cache; goto begin_invoke; }
执行与调用任务节点相关的清理工作,并尝试进行尾递归消除(tail recursion elimination)。如果有缓存的任务节点,将其重新设置为当前节点,以减少任务队列的推送和弹出操作
Work Stealing
Work stealing(工作窃取)是一种用于并行计算的调度策略,通常用于任务并行模型中。该模型基于线程池,其中每个线程都有一个本地任务队列,而当一个线程的任务队列为空时,它可以从其他线程的队列中"窃取"任务。
下面是 Work Stealing 的一般思路:
线程池: 创建一个包含多个线程的线程池。每个线程都有自己的本地任务队列,用于存储待执行的任务。
任务分割: 将大的任务划分为小的子任务,并将这些子任务分配给线程池中的不同线程。每个线程将任务放入自己的本地队列中。
任务执行: 每个线程从自己的本地队列中取出任务执行。当一个线程的本地队列为空时,它可以从其他线程的队列中"窃取"(work stealing)任务。
工作窃取: 当一个线程窃取任务时,它会从被窃取线程的队列末尾取出一个任务。这样做的原因是因为任务很可能是按照顺序生成的,所以从末尾窃取可以保持一定的局部性。
减小竞争: 使用本地队列减小线程之间的竞争。每个线程只需在自己的本地队列上进行操作,减少了锁的争用。
递归执行: 线程在执行任务时,如果任务可以被分割,可以递归地将子任务放入本地队列。
Work Stealing 的好处在于有效地利用了多核处理器,避免了线程之间的频繁同步和锁竞争。这种策略在任务数量不均匀分布的情况下表现良好,因为空闲线程可以窃取其他线程的任务,从而更好地实现负载均衡。
_spawn
我们深入源代码看一下,taskflow实现了一个多线程执行器,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 inline void Executor::_spawn(size_t N) { std::mutex mutex; std::condition_variable cond; size_t n=0 ; for (size_t id=0 ; id<N; ++id) { _workers[id]._id = id; _workers[id]._vtm = id; _workers[id]._executor = this ; _workers[id]._waiter = &_notifier._waiters[id]; std::cout << "id " << id << std::endl; _threads[id] = std::thread ([&, &w=_workers[id]] () { w._thread = &_threads[w._id]; { std::scoped_lock lock (mutex); _wids[std::this_thread::get_id ()] = w._id; std::cout << "num_workers " << num_workers () << std::endl; if (n++; n == num_workers ()) { cond.notify_one (); } } Node* t = nullptr ; while (1 ) { std::cout << std::this_thread::get_id () << " " << std::boolalpha << (t == nullptr ) << std::endl; _exploit_task(w, t); if (_wait_for_task(w, t) == false ) { break ; } } }); } std::unique_lock<std::mutex> lock (mutex) ; cond.wait (lock, [&](){ return n==N; }); } inline void Executor::_exploit_task(Worker& w, Node*& t) { while (t) { std::cout << "t->name = " << t->name () << std::endl; _invoke(w, t); t = w._wsq.pop (); } } inline bool Executor::_wait_for_task(Worker& worker, Node*& t) { explore_task: _explore_task(worker, t); if (t) { _notifier.notify (false ); return true ; } _notifier.prepare_wait (worker._waiter); if (!_wsq.empty ()) { _notifier.cancel_wait (worker._waiter); worker._vtm = worker._id; goto explore_task; } if (_done) { _notifier.cancel_wait (worker._waiter); _notifier.notify (true ); return false ; } for (size_t vtm=0 ; vtm<_workers.size (); vtm++) { if (!_workers[vtm]._wsq.empty ()) { _notifier.cancel_wait (worker._waiter); worker._vtm = vtm; goto explore_task; } } _notifier.commit_wait (worker._waiter); goto explore_task; }
Step 1 (explore_task): 尝试探索任务,即从任务队列中获取任务。
Step 2 (_wsq check): 检查当前线程的本地任务队列 _wsq
是否有任务。如果有,取消等待并继续探索任务。
Step 3 (done check): 检查是否已完成任务。如果是,取消等待并通知所有线程(_notifier.notify(true)
),表示执行结束。
Step 4 (scan other workers’ queues): 遍历其他线程的任务队列,如果发现有任务,取消等待并继续探索任务。
Step 5 (wait for notification): 如果以上步骤都未找到任务,等待通知。在等待过程中,线程将释放锁,允许其他线程继续执行。一旦收到通知,继续探索任务。
这个方法采用了一种类似于两阶段提交 (Two-Phase Commit) 的机制,通过 _notifier
对象协调线程之间的等待和通知。
notifier
Notifier 是一个经典的基于条件变量的非阻塞算法。
class Notifier
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class Notifier { public : struct Waiter { std::atomic<Waiter*> next; uint64_t epoch; enum : unsigned { kNotSignaled = 0 , kWaiting, kSignaled, }; std::mutex mu; std::condition_variable cv; unsigned state; }; explicit Notifier (size_t N) : _waiters{ N} { assert (_waiters.size () < (1 << kWaiterBits) - 1 ); _state = kStackMask | (kEpochMask - kEpochInc * _waiters.size () * 2 ); } void prepare_wait (Waiter* w) { w->epoch = _state.fetch_add (kWaiterInc, std::memory_order_relaxed); std::atomic_thread_fence (std::memory_order_seq_cst); } private : std::atomic<uint64_t > _state; std::vector<Waiter> _waiters; };
Notifier
类,实现了一种通知机制,用于在线程之间进行同步。该类主要用于实现等待-通知模式,其中一个或多个线程等待某个条件变为真,而另一个线程在条件变为真时发送通知。
这里有几个重要的概念和功能:
Waiter 结构体: 内嵌在 Notifier
类中,用于表示等待的线程。Waiter
结构体包含了一个 atomic<Waiter*> next
,用于构建一个无锁栈,以及一个状态 state
,表示等待状态(未通知、等待中、已通知)。
状态管理: _state
是一个 std::atomic<uint64_t>
类型的变量,用于管理整体状态。具体的状态信息被分割成几个字段,包括一个栈字段 kStackBits
、一个等待者计数字段 kWaiterBits
、一个修改计数字段 kEpochBits
。这些字段通过位运算进行组合和操作。
等待和通知操作: prepare_wait
函数用于准备等待,commit_wait
函数用于确认等待,cancel_wait
函数用于取消等待。notify
和 notify_n
函数用于发送通知,可以选择是唤醒一个线程还是唤醒所有等待线程。
无锁栈: 使用 _state
中的栈字段,通过无锁操作实现了一个等待者栈,这样可以高效地管理等待线程。
条件变量和原子操作: 根据编译器支持的特性,使用了条件变量和原子操作来实现线程的等待和通知。根据 __cpp_lib_atomic_wait
的定义,选择不同的实现方式。
epoch
epoch
是一个表示状态修改次数的计数器。该计数器用于实现非阻塞算法中的等待机制。以下是对 epoch
的理解:
基本概念 :
epoch
是一个 64 位的计数器。
1 2 3 4 5 6 7 8 9 10 11 static const uint64_t kStackBits = 16 ;static const uint64_t kStackMask = (1ull << kStackBits) - 1 ;static const uint64_t kWaiterBits = 16 ;static const uint64_t kWaiterShift = 16 ;static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1 ) << kWaiterShift; static const uint64_t kWaiterInc = 1ull << kWaiterBits;static const uint64_t kEpochBits = 32 ;static const uint64_t kEpochShift = 32 ;static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1 ) << kEpochShift;static const uint64_t kEpochInc = 1ull << kEpochShift;
它用于标识状态的修改次数。
用途 :
epoch
主要用于非阻塞等待机制,允许线程等待某个条件的改变,而不必使用互斥锁或条件变量。
计数方式 :
epoch
的计数方式是通过在状态发生改变时递增的。
当状态发生改变时,epoch
会被递增,使得等待线程能够感知到状态的变化。
等待过程中的使用 :
当线程调用 prepare_wait
时,会获取当前的 epoch
值,并在 epoch
的基础上进行一些计算,用于等待期间的判断。
当线程调用 commit_wait
或 cancel_wait
时,会对 epoch
进行修改,以通知其他线程或取消等待。
避免 ABA 问题 :
epoch
的设计还考虑了避免 ABA 问题(一个值被修改两次,但中间的修改值被其他线程修改回来)。
通过使用 kEpochInc
来进行增量计数,避免 ABA 问题。
线程等待时的使用 :
当线程等待时,会使用 epoch
来判断状态是否发生了改变,以决定是继续等待还是取消等待。
Ending
越写越发散,好烦,下一个topic好好讲一下内存序吧。
Reference
T.-W. Huang, D.-L. Lin, C.-X. Lin, and Y Lin, “Taskflow: A Lightweight Parallel and Heterogeneous Task Graph Computing System ,” IEEE Transactions on Parallel and Distributed Systems (TPDS) , vol. 33, no. 6, pp. 1303-1320, June 2022.
Brpc Overview