Cpp-并发编程

Cpp-并发编程

相关代码和测试:olleh-dlrow/ConcurrencyLearning: source and test for concurrency learning. (github.com)

并发概念

在单个系统里同时执行多个独立的任务,而非顺序执行一些活动。这里需要强调的是任务的独立性,即任务之间两两不存在前后依赖关系,对于单线程程序而言,并发实际是轮流调度任务的过程,对于多线程程序而言,并发能够做到真正的并行

image-20221130104736057

选择并发的理由

分离关注点

将相关的代码与无关的代码分离,可以使得程序更容易理解和调试

例如,播放器的播放模块和用户操作模块之间可以分离,后者可以使用一个单独的线程来监听用户的输入,进而控制播放模块的播放状态

提升性能

  • 任务并行:将单个任务分成几个部分,且各自并行运行,从而降低总运行时间,这要求任务之间没有依赖
  • 数据并行:每个线程在不同的数据部分执行相同的操作

并不是具有这样的场景就要使用并发,如果多线程创建、切换、调度、终止的代价大于单线程的执行代价,则不适宜强行使用并发

线程管理

std::thread - cppreference.com

共享数据

条件竞争

不变量(invariants)可以理解为数据结构中的一些关键变量,这些变量在操作前后是稳定不变的;比如,“变量包含列表中的项数”。不变量通常会在一次更新中被破坏,特别是比较复杂的数据结构,或者一次更新就要改动很大的数据结构

以双链表为例,双链表中每个节点都有一个指针指向列表中下一个节点,还有一个指针指向前一个节点。其中不变量就是节点A中指向“下一个”节点B的指针,还有前向指针。为了从列表中删除一个节点,其两边节点的指针都需要更新。当其中一边更新完成时,不变量就被破坏了,直到另一边也完成更新;在两边都完成更新后,不变量就又稳定了。

并发中竞争条件的形成,取决于一个以上线程的相对执行顺序,每个线程都抢着完成自己的任务。恶性条件竞争通常发生于完成对多于一个的数据块的修改时。例如对两个连接指针的修改,因为操作要访问两个独立的数据块,独立的指令将会对数据块将进行修改,并且其中一个线程可能正在进行时,另一个线程就对数据块进行了访问。

条件竞争往往很难复现,因为他通常是时间敏感的。程序以调试模式运行时,它们常会完全消失,因为调试模式会影响程序的执行时间

避免方法:

  • 对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态,如使用互斥锁
  • 对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程
  • 使用事务的方式去处理数据结构的更新

使用互斥量保护数据

std::mutex - cppreference.com

std::lock_guard - cppreference.com

std::unique_lock - cppreference.com

注意:切勿将受保护数据的指针或引用传递到互斥锁作用域之外,无论是函数返回值,还是存储在外部可见内存,亦或是以参数的形式传递到用户提供的函数中去

定位接口之间的条件竞争

如果某些接口之间存在条件竞争,则它们可以使用同一个互斥量来进行管理,例如stack的接口:

1
2
3
4
5
push();
pop();
empty();
top();
size();

死锁

对于多线程程序而言,线程之间需要对锁产生竞争:一对线程需要对他们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个解锁。这样没有线程能工作,因为他们都在等待对方释放互斥量

避免死锁:

  • 让两个互斥量总是以相同的顺序上锁
  • 使用锁的层次结构,锁的层次的意义在于提供对运行时约定是否被坚持的检查,当代码试图对一个互斥量上锁,在该层锁已被低层持有时,上锁是不允许的
  • 使用递归锁
  • 避免嵌套锁
  • 避免在持有锁时调用用户提供的代码。该代码中可能也进行了上锁的操作

锁的粒度

锁的粒度是一个摆手术语(hand-waving term),用来描述通过一个锁保护着的数据量大小。一个细粒度锁(a fine-grained lock)能够保护较小的数据量,一个粗粒度锁(a coarse grained lock)能够保护较多的数据量。

保护共享数据的初始化过程

双重检查锁出现的问题:

假设有一段代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void undefined_behaviour_with_double_checked_locking() {
if(!resource_ptr) { // 1
std::lock_guard<std::mutex> lk(resource_mutex); // 2
if(!resource_ptr) { // 3
resource_ptr.reset( // 4: initialize ptr, but not finish new resource
new some_resource // 5
); // 6
}
}
resource_ptr->do_something(); // 7
}

这里存在潜在的条件竞争,假设有两个线程t1, t2,他们的执行顺序如下:

1
t1(1) -> t1(2) -> t2(1) -> t2(2) -> t2(3) -> t2(4)此时resource_ptr已经被赋值 -> t1(3) false -> t1(7) ERROR!!!

可以看到,这种执行顺序下的代码会出问题,解决办法是,使用std::call_once或者使用静态局部变量

同步并发操作

std::condition_variable - cppreference.com

std::future - cppreference.com

std::promise - cppreference.com

std::async - cppreference.com

std::packaged_task - cppreference.com

待补充

C++内存模型和原子操作

std::atomic - cppreference.com

内存模型参考:

C++11中的内存模型上篇 - 内存模型基础 - codedump的网络日志

C++11中的内存模型下篇 - C++11支持的几种内存模型 - codedump的网络日志

待补充

基于锁的并发数据结构设计

设计指导与建议:

  • 确保无线程能够看到修改数据结构的“不变量”时的状态
  • 小心会引起条件竞争的接口,提供完整操作的函数,而非操作步骤
  • 注意数据结构的行为是否会产生异常,从而确保“不变量”的状态
  • 将死锁的概率降到最低。使用数据结构时,需要限制锁的范围,避免嵌套锁的存在

最终目的:如何让序列化访问最小化,让真实并发最大化?

线程安全栈

栈的操作之间几乎都会涉及到数据竞争或条件竞争,因此可以使用一个互斥量来管理临界区的使用,即栈顶元素,栈的大小

线程安全队列

可以分为粗粒度锁和细粒度锁

如果使用粗粒度锁,则入队和出队都会锁住队列本身,在出队时,有两种设计的策略:

  • 判断队列是否为空,然后决定是否弹出,并返回状态
  • 如果队列为空,使用条件变量等待,而在入队时再唤醒一个等待的线程

如果使用细粒度锁,则可以为入队和出队分别设置互斥量,只有在判断队首和队尾的位置时需要短时间的互斥,其它情况下,入队和出队操作能够独立进行

线程安全查询表

使用桶作为数组,桶内使用链表存储键值对,每个桶带有一个共享锁,控制链表的互斥写访问

对于查询操作,使用shared_lock进行多线程并行查询

对于增删操作,使用unique_lock保证访问的互斥性

线程安全链表

对于每个结点,都持有一个互斥锁,在操作时,对持有的节点群进行上锁,并且在移动到下一个节点的时,对当前节点进行释放,类似于接力棒行为

具体的实现逻辑见项目源代码

无锁并发数据结构设计

待补充

并发代码设计

线程间划分工作

线程处理前对数据进行划分

根据每个线程处理的最小数目、最大线程数目来决定每个线程处理多少数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int min_per_thread=25;	// 每个线程处理的最小数目
int max_threads=
(length+min_per_thread-1)/min_per_thread; // 最多需要多少线程

int hardware_threads=
std::thread::hardware_concurrency(); // 当前硬件线程数

int num_threads=
std::min(hardware_threads, max_threads); // 实际需要的线程数

int block_size=length/num_threads; // 前num_threads-1个线程处理的数据量

Iterator block_start=first;
for(int i=0;i<(num_threads-1);++i) // 计算数据范围,分配任务给每个线程
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(
run_task,
block_start,block_end, ...);
block_start=block_end;
}
run_task(block_start, last, ...); // 处理剩下的数据

实际测试下来,该方法具有最好的性能表现,大概是因为合理的划分减少了许多线程调度的执行时间,并且每个线程需要处理的数据量也是比较均匀的

递归划分

将任务划分成2部分,前一部分使用新的线程递归调用,后一部分使用当前线程运算。快速排序就可以使用这个方法

缺点:当前线程依赖新线程的计算结果,因此可能造成死锁

可以使用std::async创建新任务,由系统内部来负责线程之间的调度。但是,经过测试,如果递归太深,那么需要大量的线程调度和让权等待,此时std::async性能较差,也可能出现死锁的情况(这个点不太确定,但确实有些数据在有限时间内是没有跑出来的)

通过任务类型划分工作

将任务划分成操作序列,每个子任务交给专门的线程执行,尽管子任务之间可能依然具有依赖关系,但是在处理多个任务时,可以形成流水线进行工作,避免核心空置

影响并发代码性能的因素

处理器数量

显然,并发工作需要在处理器数量和每个处理器的任务量之间寻找最佳平衡点

数据争用和乒乓缓存

当两个线程并发的在不同处理器上执行,并且对同一数据进行读取,通常不会出现问题;因为数据将会拷贝到每个线程的缓存中,并且让两个处理器同时进行处理。不过,当有线程对数据进行修改的时候,这个修改需要更新到其他核芯的缓存中去,就要耗费一定的时间。根据线程的操作性质,以及使用到的内存序,这样的修改可能会让第二个处理器停下来,等待硬件内存更新缓存中的数据。根据等待时间可以简单地将情况分为高竞争低竞争

如果某个数据在每个缓存中传递若干次,那么这种情况叫做乒乓缓存。当一个处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情

伪共享

同一个缓存行中可以存储多个数据项。因此,即使每个线程都能对数据中的成员进行访问,硬件缓存还是会产生乒乓缓存。缓存行是共享的(即使没有数据存在),因此使用伪共享来称呼这种方式

或者说,单个线程修改的内存单位小于缓存切换的单位(一个缓存行),导致多个线程共享同一块缓存行

超额认购和频繁的任务切换

任务分配不均导致某个线程任务繁重或者大量线程频繁切换会严重影响性能

数据结构中的数据访问模式

  • 尝试调整数据在线程间的分布,就能让同一线程中的数据紧密联系在一起
  • 尝试减少线程上所需的数据量
  • 尝试让不同线程访问不同的存储位置,以避免伪共享

设计并发代码的注意事项

并行算法中的异常安全

在并行算法中很多操作要运行在独立的线程上。这种情况下,异常就不再允许被传播,因为这将会使调用堆栈出现问题。如果一个函数在创建一个新线程后带着异常退出,那么这个应用将会终止

解决办法:

  • 使用std::future存储新线程的异常,然后在主线程中处理
  • 当生成第一个新线程和当所有线程都汇入主线程时抛出异常;这会让线程产生泄露。最简单的方法就是捕获所有抛出的线程,汇入的线程依旧是joinable()的,并且会再次抛出异常

使用并发提高响应能力

假设有如下事件驱动型代码框架:

1
2
3
4
5
6
7
while(true) {
event_data event = get_event();
if(event.type == QUIT) {
break;
}
process(event);
}

为了确保用户输入被及时的处理,无论应时在做些什么,get_event()和process()必须以合理的频率调用。这就意味着任务要被周期性的悬挂,并且返回到事件循环中,get_event()/process()必须在一个合适地方进行调用。每个选项的复杂程度取决于任务的实现方式。

通过使用并发分离关注,可以将一个很长的任务交给一个全新的线程,并且留下一个专用的GUI线程来处理这些事件。线程可以通过简单的机制进行通讯,而不是将事件处理代码和任务代码混在一起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
std::thread task_thread;
std::atomic<bool> task_cancelled(false);

void gui_thread()
{
while(true)
{
event_data event=get_event();
if(event.type==quit)
break;
process(event);
}
}

void task()
{
while(!task_complete() && !task_cancelled)
{
do_next_operation();
}
if(task_cancelled)
{
perform_cleanup();
}
else
{
post_gui_event(task_complete);
}
}

void process(event_data const& event)
{
switch(event.type)
{
case start_task:
task_cancelled=false;
task_thread=std::thread(task);
break;
case stop_task:
task_cancelled=true;
task_thread.join();
break;
case task_complete:
task_thread.join();
display_results();
break;
default:
//...
}
}

通过这种方式对关注进行分离,用户线程将总能及时的对事件进行响应,及时完成任务需要花费很长时间。使用应用的时候,响应事件通常也是影响用户体验的重要一点,无论是特定操作被不恰当的执行(无论是什么操作),应用都会被完全锁住。通过使用专门的事件处理线程,GUI就能处理GUI指定的信息了(比如对于调整窗口的大小或颜色),而不需要中断处理器,进行耗时的处理;同时,还能向长期任务传递相关的信息

在实践中设计并发代码

并行实现std::for_each

使用数据划分或者递归划分即可

并行实现std::find

划分方式同上,在某个线程找到结果后,需要通知其它线程不再查找,因此可以设置一个原子变量std::atomic<bool> done来进行同步。除此之外,还需要返回找到的迭代器,此时可以通过std::promiseset_value进行设置,需要注意的是,可能出现多个线程同时找到该值,然后同时设置的情况,所以需要try catch一下,详见源代码ParallelAlgorithm.h

并行实现std::partial_sum

待补充

线程池

控制线程使用数量在合理范围内,在任务量不均匀时也能够合理调度,但是会增加调度时间

主要有以下几个注意点:

基本功能

主线程在初始化时创建足够的线程放到线程池中,使用时将任务提交到线程安全队列中,对于每个线程,其在循环中检查队列是否有任务,如果有则出队执行,否则让出下一个时间片。在主线程退出时,线程池需要通知所有线程退出循环,并将这些线程汇入主线程中

等待 提交到线程池中的任务

上述线程池没有考虑不同参数的任务以及任务的返回时机和返回值,即不知道任务什么时候完成,使用如下策略处理这两个问题:

对于不同参数的任务,可以对任务进行一层封装,实现FunctionWrapper<FunctionType>,然后它们继承于同一个基类,这样就可以统一存储到任务队列中

对于返回值,可以使用packaged_task + future的方式获取返回值

等待依赖任务

上述线程池的实现无法处理大量任务依赖,当线程池中的所有线程都在依赖其它任务完成时,这些剩下的任务无法再从线程池中获得线程,这种情况下就会出现死锁,例如大量数据的快速排序就可能出现这种情况

为了解决这个问题,可以让当前线程定期检查依赖线程是否完成任务,如果没有,则当前线程先到队列中取出任务执行,直到依赖线程完成任务,即不要让线程永久依赖等待

避免队列中的任务竞争和窃取任务

所有线程都从一个队列中取任务,如果想要减轻该队列的压力,可以让每个线程都持有自己的队列,其它线程也可以窃取对方的任务,因此,每个线程都需要制定一个提取任务的规则,更多细节和实现见book

并行算法

std::execution::seq, std::execution::par, std::execution::par_unseq, std::execution::unseq - cppreference.com

测试和调试多线程

测试用例的设计:

  • 编写所有单线程测试用例,排除单线程下的各种错误
  • 对各种因素进行排列组合后测试,有点类似于自变量求笛卡尔积,例如线程数(0,少于任务,多于任务,满载),任务数,容器的容量大小等

需要考虑的因素:

  • 系统中是否有足够的处理器,能让每个线程运行在属于自己的处理器上
  • “多线程”是有多少个线程(3个,4个,还是1024个?)
  • 测试需要运行在哪种处理器架构上
  • 测试中如何对“同时”进行合理的安排

构建多线程测试代码:

有些执行顺序的结果是我们比较关心的,并且在多线程环境下,我们能够推测出哪些结果是可以出现的,哪些不能

例如,对一个空队列同时push和pop,则可能出现2种情况:1. 队列不空,pop返回空 2. 队列空,pop返回不空,如果出现其它情况,则一定出现了条件竞争

如何保证push和pop同时执行?可以使用std::promise + std::shared_future来进行同步,在push和pop之前同时wait,然后在主线程中使用set_value同时唤醒两个线程,简单表示为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
push_done=std::async(std::launch::async,
[&q,ready,&push_ready]()
{
push_ready.set_value();
ready.wait();
q.push(42);
}
);
pop_done=std::async(std::launch::async,
[&q,ready,&pop_ready]()
{
pop_ready.set_value();
ready.wait();
return q.pop();
}
);
push_ready.get_future().wait();
pop_ready.get_future().wait();
go.set_value();

push_done.get();
assert(pop_done.get()==42);
assert(q.empty());

附加:输入系统的探讨

对于Windows,其 实现是基于消息循环机制,类似于生产者消费者

Windows.pdf (51testing.com)

生产者:操作系统,其捕获各个输入消息放到应用程序的队列中
消费者:应用程序,在每一次循环中,它将从队列中取出一个消息,然后将其分发给各个窗口,每个窗口再调用各自的回调函数来对消息进行处理
这就存在一个问题,即每一次循环只能够处理一个消息,如果要设计成1帧能够接受多个输入,则需要有2个循环,外循环是帧循环,内循环是消息循环,但是,如果消息源源不断地进入,比如按住某个键,则会发生阻塞,如果限制循环次数,该次数有难以决定其值

对于这个问题,GLFW的解决办法是,记录每个按键的状态,比如DOWN和UP,每一帧依然只接收一个消息,如果该按键一直是DOWN,则记录其状态为按下,这样,即使一帧存在多个不同的输入,也只需要保证在之后的几帧中能够完成其状态的设置,每次查询的时候也只需要参考按键的状态即可
通过这种方法,程序能够在一帧内捕获多个按键的输入,Unity也具有同样的效果


Cpp-并发编程
http://example.com/2023/01/10/Cpp-并发编程/
作者
Chen Shuwen
发布于
2023年1月10日
许可协议