TaskFlow + WorkStealing

高性能C++ DAG执行框架 之 WorkStealing策略

Posted by Wang Junwei on 2023-08-02
TaskFlow = DAG + WorkStealing

前言

本文首发于个人博客:🔗Wang Junwei’s Blog : TaskFlow + WorkStealingl

最开始使用学习brpc的时候,总听人提起大名鼎鼎的 bthread,也在之前的工作实践中用到过bthread线程池, M:N 的线程库能在多核应用中提供更好的scalability 和 cache locality。在C++微服务中能够支持高并发低延迟的任务并发调度。

线程池只是任务简单的并发,各个task之间互相独立运行。

在系统设计中,很多时候会遇到有依赖的逻辑,这个时候就需要task之间有一定的依赖关系;

我的前同事yc大佬,是非常资深的C程序员,从C98时代走来,编码技术很强。在前司做召回引擎的时候,他自己写了一个C++ DAG框架,koi2,基于bthread,维护了一套Node执行框架,还支持context传递的时候约束成员的可见性。可惜在职期间没有好好地学习一下源代码。

最近周末没事干阅读了一下taskflow的源代码,就浅浅地记录一下吧。

TaskFlow Overview

img

Taskflow 是一个header-only的轻量级图计算框架 (Task Graph Computing System)。

支持如下graph

Static img
Dynamic image-20231202155822524
Composable image-20231202155806919
Conditional image-20231202155730053
Heterogeneous image-20231202155704658

Dive into the src

tf::Node

tf::Node::{Static,Dynamic,Condition…}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
struct Dynamic {
template <typename C>
Dynamic(C&& c) : work{std::forward<C>(c)} {}
std::function<void(Subflow&)> work;
Graph subgraph;
};
struct Dynamic {
template <typename C>
Dynamic(C&&) : work{std::forward<C>(c)} {}
std::function<void(Subflow&)> work;
Graph subgraph;
};
struct Condition {
template <typename C>
Condition(C&&) : work{std::forward<C>(c)} {}
std::variant<
std::function<int()>, std::function<int(Runtime&)>
> work;
};
struct MultiCondition {
template <typename C>
MultiCondition(C&&) : work {std::forward<C>(c)} {}
std::variant<
std::function<SmallVector<int>()>,
std::function<SmallVector<int>(Runtime&)>
> work;
};
struct Module {
template <typename T>
Module(T&) : graph{ obj.graph() } {}
Graph& graph;
};
struct DependentAsync {
template <typename C>
DependentAsync(C&&) : work {std::forward<C>(c)} {}
std::variat<
std::function<void()>, std::function<void(Runtime&)>
> work;
std::atomic<size_t> use_count {1};
std::atomic<AsyncState> state {AsyncState::UNFINISHED};
};


using handle_t = std::variant<
Placeholder, // placeholder
Static, // static tasking
Dynamic, // dynamic tasking
Condition, // conditional tasking
MultiCondition, // multi-conditional tasking
Module, // composable tasking
Async, // async tasking
DependentAsync // dependent async tasking
>;

这里面的task的ctor入参都是Callable的类型,完美转发给work对象初始化。

tf::Node

Node 是taskflow里面最重要的一个数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class Node {
public:
template <typename... Args>
Node(const std::string& name ,
unsigned priority,
Topology* topology,
Node* parent,
size_t join_counter,
Args&&... args) :
_name {name},
_priority {priority},
_topology {topology},
_parent {parent},
_join_counter {join_counter},
_handle {std::forward<Args>(args)...} {
// ctor
}

private:
std::string _name;

unsigned _priority {0};
Topology* _topology {nullptr};
Node* _parent {nullptr};
void* _data {nullptr};

SmallVector<Node*> _successors; // 依赖此Node的Node list
SmallVector<Node*> _dependents; // 此Node依赖的Node list

std::atomic<int> _state {0};
std::atomic<size_t> _join_counter {0};

std::unique_ptr<Semaphores> _semaphores;

handle_t _handle;

void _precede(Node* v) {
_successors.push_back(v);
v->_dependents.push_back(this);
}
void _set_up_join_counter();

bool _is_cancelled() const;
bool _is_conditioner() const;
bool _acquire_all(SmallVector<Node*>&);

};

Node是图里面最小的执行单元,通过_successors_dependents来表达图里面各个节点的连接关系。学过图论的都知道,DAG一般是一个邻接链表形式的有向图。这里通过SmallVector来避免指针跳转导致的cache unfriendly.

tf::Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Task {
public:
Task(Node* node) : _node {node} {}

template <typename... Ts>
Task& precede(Ts&&... tasks);

template <typename... Ts>
Task& succeed(Ts&&... tasks);

Task& release(Semaphore& semaphore);
Task& acquire(Semaphore& semaphore);

Task& priority(TaskPriority p);
TaskPriority priority() const;

//...
private:
Node* _node{nullptr};
};

tf::Task 对图里的Node* 进一步做了一些封装, 对外提供了

  1. precede / succed
  2. work
  3. composed_of
  4. release / acquire
  5. priority

等方法,可以简单的理解为对tf::Node* 的一个wrapper。tf::Task 非常轻量级,甚至只有一个Node* 指针,可以trivially copy

TrivialClass

“Trivially copyable” 是 C++ 的一个特性,它是一个用于描述类型的属性。一个类型如果是 trivially copyable,意味着它可以通过 std::memcpy 这样的底层内存拷贝操作进行复制,而不需要调用任何特殊的拷贝构造函数或者析构函数。

一个trivial class 必须支持以下条件:

  1. 拷贝构造函数和拷贝赋值运算符是 trivial。
  2. 类型没有虚函数。
  3. 类型没有虚基类。
  4. 类型的基类也是 trivially copyable。
  5. 所有非静态成员都是 trivially copyable。
  6. 类型的析构函数是 trivial 或者是 deleted。

tf:TaskFlow

tf::Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Graph {
public:
Graph(Graph&& other) : _nodes {std::move(other._nodes)} {}
Graph& operator = (Graph&& other) {
_clear();
_nodes = std::move(other._nodes);
return *this;
}


private:
void _clear_detached() {
auto mid = std::partition(_nodes.begin(), _nodes.end(), [] (Node* node) {
return !(node->_state.load(std::memory_order_relaxed) & Node::DETACHED);
});
for(auto itr = mid; itr != _nodes.end(); ++itr) {
node_pool.recycle(*itr);
}
_nodes.resize(std::distance(_nodes.begin(), mid));
}

template <typename ...ArgsT>
Node* _emplace_back(ArgsT&&... args) {
_nodes.push_back(node_pool.animate(std::forward<ArgsT>(args)...));
return _nodes.back();
}

private:
std::vector<Node*> _nodes;
};

graph是计算图中Node存储的实体,具体存储在 _nodes的一维数组里面,Node与Node的之间的因果/依赖关系体现在Node*的_successors_dependents 成员中。

tf::FlowBuilder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class FlowBuilder {
friend class Executor;
public:

// emplace static task.
template <typename C, std::enable_if_t<is_static_task_v<C>, void>*>
Task emplace(C&& c) {
return Task(_graph._emplace_back("", 0, nullptr, nullptr, 0,
std::in_place_type_t<Node::Static>{}, std::forward<C>(c)
));
}
// main entrance.
template <typename... C, std::enable_if_t<(sizeof...(C)>1), void>*>
auto FlowBuilder::emplace(C&&... cs) {
return std::make_tuple(emplace(std::forward<C>(cs))...);
}

void erase(Task task);

template <typename T>
Task composed_of(T& object);
private:
Graph& _graph; //必须在构建时绑定。
};

FlowBuilder可以理解为Builder Pattern(建造者模式),不直接new Node()操作,而是通过FlowBuilder::emplace() 结构来构建每一个节点, 而节点与节点之间的依赖关系,由tasks之间单独定义依赖与被依赖关系;

除此之外,提供了计算图编辑的功能,通过earse() 和 composed_of() 等方法调整图.

tf::TaskFlow

1
2
3
4
5
6
7
8
9
10
class Taskflow : public FlowBuilder {
public:
Taskflow(const std::string& name) : FlowBuilder {_graph}, _name {name} {}
private:
mutable std::mutex _mutex;
std::string _name;
Graph _graph;
std::queue<std::shared_ptr<Topology>> _topologies;
std::optional<std::list<Taskflow>::iterator> _satellite;
};

Taskflow 是对图及图的执行控制的最后一个描述,加入了一些meta属性;也是调度的参数,携带了一个局部mutex🔒来多线程同步。

其中_topologies_satellite 等是executor调度执行时的参与者,记录了一些运行时的上下文。

tf::Executor

executor 是最核心的调度引擎,根据tf::Workflow的图结构,结合多核CPU架构,用线程池来调度整张图。

tf::Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class Executor {
public:
// ctor
explicit Executor(size_t N = std::thread::hardware_concurrency()) :
_MAX_STEALS {((N+1) << 1)},
_threads {N},
_workers {N},
_notifier {N} {
if(N == 0) {
TF_THROW("no cpu workers to execute taskflows");
}
_spawn(N);
// instantite the default observer if requested
if(has_env(TF_ENABLE_PROFILER)) {
TFProfManager::get()._manage(make_observer<TFProfObserver>());
}
}

// entry
tf::Future<void> run(Taskflow& taskflow) {
return run_n(f, 1, [](){});
}

template <typename C>
run_n(Taskflow& f, size_t repeat, C&& c) {
return run_until(
f, [repeat]() mutable { return repeat-- == 0; }, std::forward<C>(c)
);
}

template<typename P, typename C>
tf::Future<void> run_until(Taskflow& taskflow, P&& pred, C&& callable) {
// 1.增加一个执行逻辑(atmoically)
_increment_topology();
// 2. 判断图是否为空(需要原子操作,因为可能会有动态任务改图)
bool empty;
{
std::lock_guard<std::mutex> lock(f._mutex);
std::cout << "taskflow.size = " << f.num_tasks() << std::endl;
empty = f.empty();
}
// 3. 判空返回 or 执行次数够了就 return dummy
if(empty || p()) {
c();
std::promise<void> promise;
promise.set_value();
_decrement_topology();
return tf::Future<void>(promise.get_future());
}

// 3. create a topology for this run
auto t = std::make_shared<Topology>(f, std::forward<P>(p), std::forward<C>(c));

// need to create future before the topology got torn down quickly
tf::Future<void> future(t->_promise.get_future(), t);

// modifying topology needs to be protected under the lock
{
std::lock_guard<std::mutex> lock(f._mutex);
f._topologies.push(t);
std::cout << "f._topologies = " << f._topologies.size() << std::endl;
if(f._topologies.size() == 1) {
_set_up_topology(_this_worker(), t.get())
}
}
}

// 入口
void _set_up_topology(Worker* worker, Topology* tpg) {
// ---- under taskflow lock ----
tpg->_sources.clear();
tpg->_taskflow._graph._clear_detached();

_set_up_graph(tpg->_taskflow._graph, nullptr, tpg, 0, tpg->_sources);
tpg->_join_counter.store(tpg->_sources.size(), std::memory_order_relaxed);

if(worker) {
_schedule(*worker, tpg->_sources);
}
else {
_schedule(tpg->_sources);
}
}

// set up graph
void _set_up_graph(
Graph& g, Node* parent, Topology* tpg, int state, SmallVector<Node*>& src
) {
for(auto node : g._nodes) {
node->_topology = tpg;
node->_parent = parent;
node->_state.store(state, std::memory_order_relaxed);
if(node->num_dependents() == 0) {
src.push_back(node); // 将graph里面所有入度为0度的节点push到src(topogoly->source)里面
}
node->_set_up_join_counter();
}
}
private:

const size_t _MAX_STEALS;
std::mutex _wsq_mutex;
std::mutex _taskflows_mutex;

std::condition_variable _topology_cv;
std::mutex _topology_mutex;
size_t _num_topologies {0};

std::unordered_map<std::thread::id, size_t> _wids; // 线程id及对应的workerid
std::vector<std::thread> _threads; // 线程池 (一般等于CPU核数)
std::vector<Worker> _workers; // workers(一般等于CPU核数)
std::list<Taskflow> _taskflows; // workers ()
Notifier _notifier;
TaskQueue<Node*> _wsq;
std::atomic<bool> _done {0};
std::unordered_set<std::shared_ptr<ObserverInterface>> _observers;
};
  1. 线程生成循环:
    • 使用 for 循环迭代生成 N 个线程,其中 id 表示线程的唯一标识。
    • 在每次循环中,为每个线程分配一个唯一的 _id_vtm,指向当前执行器的指针 _executor,以及一个指向当前线程等待器的指针 _waiter
    • 在循环中,通过 std::thread 创建一个新的线程,并传递一个 Lambda 表达式作为线程的执行体。
  2. Lambda 表达式(线程函数):
    • 在 Lambda 表达式中,首先将当前线程的 _thread 设置为指向 _threads 数组中相应线程的指针。
    • 使用 std::scoped_lock 锁定互斥锁 mutex,然后将当前线程的 ID 映射到 _wids 容器中。
    • 输出当前线程的 ID 和已生成的线程数量。
    • 如果已生成的线程数量达到了总线程数量 N,通过 cond.notify_one() 唤醒等待的线程。
  3. 任务执行循环:
    • 在进入无限循环后,首先通过 _exploit_task 函数处理任务(这部分的实现在代码中没有展示)。
    • 然后通过 _wait_for_task 函数等待新的任务。如果 _wait_for_task 返回 false,表示线程需要退出,就会跳出循环。
  4. 等待所有线程生成:
    • 在生成完所有线程后,通过 std::unique_lock 锁定 mutex,然后使用 cond.wait 等待条件满足。
    • 条件是 lambda 表达式 [&](){ return n==N; },即等待直到已生成的线程数量达到总线程数量 N

通过互斥锁和条件变量实现线程同步,确保所有线程都已经生成后再继续执行后续逻辑。

tf::Executor::_invoke()

_invoke() 是Node/Task的callee,下面看一下源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
void Executor::_invoke(Worker& worker, Node* node) {

// synchronize all outstanding memory operations caused by reordering
while(!(node->_state.load(std::memory_order_acquire) & Node::READY));

begin_invoke:

SmallVector<int> conds;

// no need to do other things if the topology is cancelled
if(node->_is_cancelled()) {
if(node = _tear_down_invoke(worker, node); node) {
goto invoke_successors;
}
return;
}

// if acquiring semaphore(s) exists, acquire them first
if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
SmallVector<Node*> nodes;
if(!node->_acquire_all(nodes)) {
_schedule(worker, nodes);
return;
}
node->_state.fetch_or(Node::ACQUIRED, std::memory_order_release);
}

// condition task
//int cond = -1;

// switch is faster than nested if-else due to jump table
switch(node->_handle.index()) {
// static task
case Node::STATIC:{
_invoke_static_task(worker, node);
}
break;
// dynamic task
// condition task
// multi-condition task
// module task
// async task
// dependent async task
// monostate (placeholder)
default:
break;
}

invoke_successors:

// if releasing semaphores exist, release them
if(node->_semaphores && !node->_semaphores->to_release.empty()) {
_schedule(worker, node->_release_all());
}

if((node->_state.load(std::memory_order_relaxed) & Node::CONDITIONED)) {
node->_join_counter.fetch_add(node->num_strong_dependents(), std::memory_order_relaxed);
}
else {
node->_join_counter.fetch_add(node->num_dependents(), std::memory_order_relaxed);
}

// acquire the parent flow counter
auto& j = (node->_parent) ? node->_parent->_join_counter :
node->_topology->_join_counter;

// Here, we want to cache the latest successor with the highest priority
worker._cache = nullptr;
auto max_p = static_cast<unsigned>(TaskPriority::MAX);

// Invoke the task based on the corresponding type
switch(node->_handle.index()) {

// condition and multi-condition tasks
case Node::CONDITION:
case Node::MULTI_CONDITION: {
for(auto cond : conds) {
if(cond >= 0 && static_cast<size_t>(cond) < node->_successors.size()) {
auto s = node->_successors[cond];
// zeroing the join counter for invariant
s->_join_counter.store(0, std::memory_order_relaxed);
j.fetch_add(1, std::memory_order_relaxed);
if(s->_priority <= max_p) {
if(worker._cache) {
_schedule(worker, worker._cache);
}
worker._cache = s;
max_p = s->_priority;
}
else {
_schedule(worker, s);
}
}
}
}
break;

// non-condition task
default: {
for(size_t i=0; i<node->_successors.size(); ++i) {
//if(auto s = node->_successors[i]; --(s->_join_counter) == 0) {
if(auto s = node->_successors[i];
s->_join_counter.fetch_sub(1, std::memory_order_acq_rel) == 1) {
j.fetch_add(1, std::memory_order_relaxed);
if(s->_priority <= max_p) {
if(worker._cache) {
_schedule(worker, worker._cache);
}
worker._cache = s;
max_p = s->_priority;
}
else {
_schedule(worker, s);
}
}
}
}
break;
}

// tear_down the invoke
if(node = _tear_down_invoke(worker, node); node) {
goto invoke_successors;
}

// perform tail recursion elimination for the right-most child to reduce
// the number of expensive pop/push operations through the task queue
if(worker._cache) {
node = worker._cache;
//node->_state.fetch_or(Node::READY, std::memory_order_release);
goto begin_invoke;
}
}
  1. Memory Synchronization:

    1
    while(!(node->_state.load(std::memory_order_acquire) & Node::READY));

    这一行使用循环等待的方式,确保所有与任务节点状态相关的内存操作都已完成。Node::READY 表示任务节点已准备就绪。

  2. Cancellation Check:

    1
    2
    3
    4
    5
    6
    if(node->_is_cancelled()) {
    if(node = _tear_down_invoke(worker, node); node) {
    goto invoke_successors;
    }
    return;
    }

    检查任务节点是否被取消,如果是则执行相应的取消操作。如果任务节点已取消,则调用 _tear_down_invoke 函数,该函数执行取消的相关清理工作,然后跳转到 invoke_successors 标签。

  3. Semaphore Acquisition:

    1
    2
    3
    4
    5
    6
    7
    8
    if(node->_semaphores && !node->_semaphores->to_acquire.empty()) {
    SmallVector<Node*> nodes;
    if(!node->_acquire_all(nodes)) {
    _schedule(worker, nodes);
    return;
    }
    node->_state.fetch_or(Node::ACQUIRED, std::memory_order_release);
    }

    如果任务节点具有相关的信号量(semaphores),并且需要获取信号量,那么尝试获取这些信号量。如果无法获取,将相关的节点放入调度队列,并返回。

  4. Task Invocation Based on Type:

    1
    2
    3
    switch(node->_handle.index()) {
    // ... cases for different types of tasks ...
    }

    使用 switch 语句根据任务类型执行相应的任务调度操作。在代码中展示了针对不同任务类型的 case,但这些情况的具体实现在注释中没有展示出来。

  5. Release Semaphores:

    1
    2
    3
    if(node->_semaphores && !node->_semaphores->to_release.empty()) {
    _schedule(worker, node->_release_all());
    }

    释放任务节点相关的信号量。如果有需要释放的信号量,将相应的节点放入调度队列。

  6. Join Counter Adjustment:

    1
    2
    3
    4
    5
    6
    if((node->_state.load(std::memory_order_relaxed) & Node::CONDITIONED)) {
    node->_join_counter.fetch_add(node->num_strong_dependents(), std::memory_order_relaxed);
    }
    else {
    node->_join_counter.fetch_add(node->num_dependents(), std::memory_order_relaxed);
    }

    调整任务节点的加入(join)计数器。这是与条件任务相关的操作。

  7. Invoke Successors Based on Priority:

    1
    2
    3
    auto& j = (node->_parent) ? node->_parent->_join_counter :
    node->_topology->_join_counter;
    // ... code to invoke successors based on priority ...

    根据后继节点的优先级调度它们的执行。

  8. Teardown and Tail Recursion Elimination:

    1
    2
    3
    4
    5
    6
    7
    if(node = _tear_down_invoke(worker, node); node) {
    goto invoke_successors;
    }
    if(worker._cache) {
    node = worker._cache;
    goto begin_invoke;
    }

    执行与调用任务节点相关的清理工作,并尝试进行尾递归消除(tail recursion elimination)。如果有缓存的任务节点,将其重新设置为当前节点,以减少任务队列的推送和弹出操作

Work Stealing

image-20231203223733323
Work stealing(工作窃取)是一种用于并行计算的调度策略,通常用于任务并行模型中。该模型基于线程池,其中每个线程都有一个本地任务队列,而当一个线程的任务队列为空时,它可以从其他线程的队列中"窃取"任务。

下面是 Work Stealing 的一般思路:

  1. 线程池: 创建一个包含多个线程的线程池。每个线程都有自己的本地任务队列,用于存储待执行的任务。
  2. 任务分割: 将大的任务划分为小的子任务,并将这些子任务分配给线程池中的不同线程。每个线程将任务放入自己的本地队列中。
  3. 任务执行: 每个线程从自己的本地队列中取出任务执行。当一个线程的本地队列为空时,它可以从其他线程的队列中"窃取"(work stealing)任务。
  4. 工作窃取: 当一个线程窃取任务时,它会从被窃取线程的队列末尾取出一个任务。这样做的原因是因为任务很可能是按照顺序生成的,所以从末尾窃取可以保持一定的局部性。
  5. 减小竞争: 使用本地队列减小线程之间的竞争。每个线程只需在自己的本地队列上进行操作,减少了锁的争用。
  6. 递归执行: 线程在执行任务时,如果任务可以被分割,可以递归地将子任务放入本地队列。

Work Stealing 的好处在于有效地利用了多核处理器,避免了线程之间的频繁同步和锁竞争。这种策略在任务数量不均匀分布的情况下表现良好,因为空闲线程可以窃取其他线程的任务,从而更好地实现负载均衡。

_spawn

我们深入源代码看一下,taskflow实现了一个多线程执行器,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
inline void Executor::_spawn(size_t N) {
std::mutex mutex;
std::condition_variable cond;
size_t n=0;

for(size_t id=0; id<N; ++id) {

_workers[id]._id = id;
_workers[id]._vtm = id;
_workers[id]._executor = this;
_workers[id]._waiter = &_notifier._waiters[id];

std::cout << "id " << id << std::endl;
_threads[id] = std::thread([&, &w=_workers[id]] () {
w._thread = &_threads[w._id];
{
std::scoped_lock lock(mutex);
_wids[std::this_thread::get_id()] = w._id;
std::cout << "num_workers " << num_workers() << std::endl;
if(n++; n == num_workers()) {
cond.notify_one();
}
}
Node* t = nullptr;
while(1) {
// execute the tasks.
std::cout << std::this_thread::get_id() << " " << std::boolalpha << (t == nullptr) << std::endl;
_exploit_task(w, t);
// wait for tasks
if(_wait_for_task(w, t) == false) {
break;
}
}
});
}
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [&](){ return n==N; });
}

// Procedure: _exploit_task
inline void Executor::_exploit_task(Worker& w, Node*& t) {
while(t) {
std::cout << "t->name = " << t->name() << std::endl;
_invoke(w, t);
t = w._wsq.pop();
}
}

inline bool Executor::_wait_for_task(Worker& worker, Node*& t) {

explore_task:

_explore_task(worker, t);

// The last thief who successfully stole a task will wake up
// another thief worker to avoid starvation.
if(t) {
_notifier.notify(false);
return true;
}
_notifier.prepare_wait(worker._waiter);

if(!_wsq.empty()) {
_notifier.cancel_wait(worker._waiter);
worker._vtm = worker._id;
goto explore_task;
}

if(_done) {
_notifier.cancel_wait(worker._waiter);
_notifier.notify(true);
return false;
}
for(size_t vtm=0; vtm<_workers.size(); vtm++) {
if(!_workers[vtm]._wsq.empty()) {
_notifier.cancel_wait(worker._waiter);
worker._vtm = vtm;
goto explore_task;
}
}
_notifier.commit_wait(worker._waiter);

goto explore_task;
}
  • Step 1 (explore_task): 尝试探索任务,即从任务队列中获取任务。
  • Step 2 (_wsq check): 检查当前线程的本地任务队列 _wsq 是否有任务。如果有,取消等待并继续探索任务。
  • Step 3 (done check): 检查是否已完成任务。如果是,取消等待并通知所有线程(_notifier.notify(true)),表示执行结束。
  • Step 4 (scan other workers’ queues): 遍历其他线程的任务队列,如果发现有任务,取消等待并继续探索任务。
  • Step 5 (wait for notification): 如果以上步骤都未找到任务,等待通知。在等待过程中,线程将释放锁,允许其他线程继续执行。一旦收到通知,继续探索任务。

这个方法采用了一种类似于两阶段提交 (Two-Phase Commit) 的机制,通过 _notifier 对象协调线程之间的等待和通知。

notifier

Notifier 是一个经典的基于条件变量的非阻塞算法。

Dekker's algorithm - Wikipedia

class Notifier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class Notifier {
public:
struct Waiter {
std::atomic<Waiter*> next;
uint64_t epoch;
enum : unsigned {
kNotSignaled = 0,
kWaiting,
kSignaled,
};
std::mutex mu;
std::condition_variable cv;
unsigned state;
};

explicit Notifier(size_t N) : _waiters{N} {
assert(_waiters.size() < (1 << kWaiterBits) - 1);
// Initialize epoch to something close to overflow to test overflow.
_state = kStackMask | (kEpochMask - kEpochInc * _waiters.size() * 2);
}

void prepare_wait(Waiter* w) {
w->epoch = _state.fetch_add(kWaiterInc, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
}


private:
std::atomic<uint64_t> _state;
std::vector<Waiter> _waiters;
};

Notifier 类,实现了一种通知机制,用于在线程之间进行同步。该类主要用于实现等待-通知模式,其中一个或多个线程等待某个条件变为真,而另一个线程在条件变为真时发送通知。

这里有几个重要的概念和功能:

  1. Waiter 结构体: 内嵌在 Notifier 类中,用于表示等待的线程。Waiter 结构体包含了一个 atomic<Waiter*> next,用于构建一个无锁栈,以及一个状态 state,表示等待状态(未通知、等待中、已通知)。
  2. 状态管理: _state 是一个 std::atomic<uint64_t> 类型的变量,用于管理整体状态。具体的状态信息被分割成几个字段,包括一个栈字段 kStackBits、一个等待者计数字段 kWaiterBits、一个修改计数字段 kEpochBits。这些字段通过位运算进行组合和操作。
  3. 等待和通知操作: prepare_wait 函数用于准备等待,commit_wait 函数用于确认等待,cancel_wait 函数用于取消等待。notifynotify_n 函数用于发送通知,可以选择是唤醒一个线程还是唤醒所有等待线程。
  4. 无锁栈: 使用 _state 中的栈字段,通过无锁操作实现了一个等待者栈,这样可以高效地管理等待线程。
  5. 条件变量和原子操作: 根据编译器支持的特性,使用了条件变量和原子操作来实现线程的等待和通知。根据 __cpp_lib_atomic_wait 的定义,选择不同的实现方式。

epoch

epoch 是一个表示状态修改次数的计数器。该计数器用于实现非阻塞算法中的等待机制。以下是对 epoch 的理解:

  1. 基本概念

    • epoch 是一个 64 位的计数器。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      static const uint64_t kStackBits = 16;
      static const uint64_t kStackMask = (1ull << kStackBits) - 1;
      static const uint64_t kWaiterBits = 16;
      static const uint64_t kWaiterShift = 16;
      static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
      << kWaiterShift;
      static const uint64_t kWaiterInc = 1ull << kWaiterBits;
      static const uint64_t kEpochBits = 32;
      static const uint64_t kEpochShift = 32;
      static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
      static const uint64_t kEpochInc = 1ull << kEpochShift;
    • 它用于标识状态的修改次数。

  2. 用途

    • epoch 主要用于非阻塞等待机制,允许线程等待某个条件的改变,而不必使用互斥锁或条件变量。
  3. 计数方式

    • epoch 的计数方式是通过在状态发生改变时递增的。
    • 当状态发生改变时,epoch 会被递增,使得等待线程能够感知到状态的变化。
  4. 等待过程中的使用

    • 当线程调用 prepare_wait 时,会获取当前的 epoch 值,并在 epoch 的基础上进行一些计算,用于等待期间的判断。
    • 当线程调用 commit_waitcancel_wait 时,会对 epoch 进行修改,以通知其他线程或取消等待。
  5. 避免 ABA 问题

    • epoch 的设计还考虑了避免 ABA 问题(一个值被修改两次,但中间的修改值被其他线程修改回来)。
    • 通过使用 kEpochInc 来进行增量计数,避免 ABA 问题。
  6. 线程等待时的使用

    • 当线程等待时,会使用 epoch 来判断状态是否发生了改变,以决定是继续等待还是取消等待。

Ending

越写越发散,好烦,下一个topic好好讲一下内存序吧。

Reference

  1. T.-W. Huang, D.-L. Lin, C.-X. Lin, and Y Lin, “Taskflow: A Lightweight Parallel and Heterogeneous Task Graph Computing System,” IEEE Transactions on Parallel and Distributed Systems (TPDS), vol. 33, no. 6, pp. 1303-1320, June 2022.
  2. Brpc Overview