搜索

使用 C++ 11 编写 Linux 多线程程序

2025-1-8 13:41| 发布者: admin| 查看: 126| 评论: 0



对于线程间的事件通知,C++11 提供了条件变量类 condition_variable,可视为 pthread_cond_t 的封装,使用条件变量可以让一个线程等待其它线程的通知 (wait,wait_for,wait_until),也可以给其它线程发送通知 (notify_one,notify_all),条件变量必须和锁配合使用,在等待时因为有解锁和重新加锁,所以,在等待时必须使用可以手工解锁和加锁的锁,比如 unique_lock,而不能使用 lock_guard,示例如下:

清单 15.例子 thread_cond_var.cc
#include <thread>
#include <iostream>
#include <condition_variable>
using namespace std;
mutex m;
condition_variable cv;
void threadCondVar(void){
# define THREAD_COUNT 10
 thread** t = new thread*[THREAD_COUNT];
 int i;
 for(i = 0; i < THREAD_COUNT; i++){
 t[i] = new thread( [](int index){
 unique_lock<mutex> lck(m);
 cv.wait_for(lck, chrono::hours(1000));
 cout << index << endl;
 }, i );
 this_thread::sleep_for( chrono::milliseconds(50));
 }
 for(i = 0; i < THREAD_COUNT; i++){
 lock_guard<mutex> _(m);
 cv.notify_one();
 }
 for(i = 0; i < THREAD_COUNT; i++){
 t[i]->join();
 delete t[i];
 }
 delete t;
}

从上例的运行结果也可以看到,条件变量是不保证次序的,即首先调用 wait 的不一定首先被唤醒。

几个高级概念

C++11 提供了若干多线程编程的高级概念:promise/future, packaged_task, async,来简化多线程编程,尤其是线程之间的数据交互比较简单的情况下,让我们可以将注意力更多地放在业务处理上。

promise/future 可以用来在线程之间进行简单的数据交互,而不需要考虑锁的问题,线程 A 将数据保存在一个 promise 变量中,另外一个线程 B 可以通过这个 promise 变量的 get_future() 获取其值,当线程 A 尚未在 promise 变量中赋值时,线程 B 也可以等待这个 promise 变量的赋值:

清单 16.例子 thread_promise_future.cc
promise<string> val;
static void
threadPromiseFuture(){
 thread ta([](){
 future<string> fu = val.get_future();
 cout << "waiting promise->future" << endl;
 cout << fu.get() << endl;
 });
 thread tb([](){
 this_thread::sleep_for( chrono::milliseconds(100) );
 val.set_value("promise is set");
 });
 ta.join();
 tb.join();
}

一个 future 变量只能调用一次 get(),如果需要多次调用 get(),可以使用 shared_future,通过 promise/future 还可以在线程之间传递异常。

如果将一个 callable 对象和一个 promise 组合,那就是 packaged_task,它可以进一步简化操作:

清单 17.例子 thread_packaged_task.cc
static mutex g_mutex;
static void
threadPackagedTask(){
 auto run = [=](int index){ 
 {
 lock_guard<mutex> _(g_mutex);
 cout << "tasklet " << index << endl;
 }
 this_thread::sleep_for( chrono::seconds(10) );
 return index * 1000;
 };
 packaged_task<int(int)> pt1(run);
 packaged_task<int(int)> pt2(run);
 thread t1([&](){pt1(2);} );
 thread t2([&](){pt2(3);} );
 int f1 = pt1.get_future().get();
 int f2 = pt2.get_future().get();
 cout << "task result=" << f1 << endl;
 cout << "task result=" << f2 << endl;
 t1.join();
 t2.join();
}

我们还可以试图将一个 packaged_task 和一个线程组合,那就是 async() 函数。使用 async() 函数启动执行代码,返回一个 future 对象来保存代码返回值,不需要我们显式地创建和销毁线程等,而是由 C++11 库的实现决定何时创建和销毁线程,以及创建几个线程等,示例如下:

清单 18.例子 thread_async.cc
static long
do_sum(vector<long> *arr, size_t start, size_t count){
 static mutex _m;
 long sum = 0;
 for(size_t i = 0; i < count; i++){
 sum += (*arr)[start + i];
 }
 {
 lock_guard<mutex> _(_m);
 cout << "thread " << this_thread::get_id() 
 << ", count=" << count
 << ", sum=" << sum << endl;
 }
 return sum;
}
static void
threadAsync(){
# define COUNT 1000000
 vector<long> data(COUNT);
 for(size_t i = 0; i < COUNT; i++){
 data[i] = random() & 0xff;
 }
 //
 vector< future<long> > result;
 size_t ptc = thread::hardware_concurrency() * 2;
 for(size_t batch = 0; batch < ptc; batch++){
 size_t batch_each = COUNT / ptc;
 if (batch == ptc - 1){
 batch_each = COUNT - (COUNT / ptc * batch);
 }
 result.push_back(async(do_sum, &data, batch * batch_each, batch_each));
 }
 long total = 0;
 for(size_t batch = 0; batch < ptc; batch++){
 total += result[batch].get();
 }
 cout << "total=" << total << endl;
}

如果是在多核或者多 CPU 的环境上面运行上述例子,仔细观察输出结果,可能会发现有些线程 ID 是重复的,这说明重复使用了线程,也就是说,通过使用 async() 还可达到一些线程池的功能。

几个需要注意的地方

thread 同时也是棉线、毛线、丝线等意思,我想大家都能体会面对一团乱麻不知从何处查找头绪的感受,不要忘了,线程不是静态的,它是不断变化的,请想像一下面对一团会动态变化的乱麻的情景。所以,使用多线程技术的首要准则是我们自己要十分清楚我们的线程在哪里?线头(线程入口和出口)在哪里?先安排好线程的运行,注意不同线程的交叉点(访问或者修改同一个资源,包括内存、I/O 设备等),尽量减少线程的交叉点,要知道几条线堆在一起最怕的是互相打结。

当我们的确需要不同线程访问一个共同的资源时,一般都需要进行加锁保护,否则很可能会出现数据不一致的情况,从而出现各种时现时不现的莫名其妙的问题,加锁保护时有几个问题需要特别注意:一是一个线程内连续多次调用非递归锁 (non-recursive lock) 的加锁动作,这很可能会导致异常;二是加锁的粒度;三是出现死锁 (deadlock),多个线程互相等待对方释放锁导致这些线程全部处于罢工状态。

第一个问题只要根据场景调用合适的锁即可,当我们可能会在某个线程内重复调用某个锁的加锁动作时,我们应该使用递归锁 (recursive lock),在 C++11 中,可以根据需要来使用 recursive_mutex,或者 recursive_timed_mutex。

第二个问题,即锁的粒度,原则上应该是粒度越小越好,那意味着阻塞的时间越少,效率更高,比如一个数据库,给一个数据行 (data row) 加锁当然比给一个表 (table) 加锁要高效,但是同时复杂度也会越大,越容易出错,比如死锁等。

对于第三个问题我们需要先看下出现死锁的条件:

资源互斥,某个资源在某一时刻只能被一个线程持有 (hold); 吃着碗里的还看着锅里的,持有一个以上的互斥资源的线程在等待被其它进程持有的互斥资源; 不可抢占,只有在某互斥资源的持有线程释放了该资源之后,其它线程才能去持有该资源; 环形等待,有两个或者两个以上的线程各自持有某些互斥资源,并且各自在等待其它线程所持有的互斥资源。

我们只要不让上述四个条件中的任意一个不成立即可。在设计的时候,非常有必要先分析一下会否出现满足四个条件的情况,特别是检查有无试图去同时保持两个或者两个以上的锁,当我们发现试图去同时保持两个或者两个以上的锁的时候,就需要特别警惕了。下面我们来看一个简化了的死锁的例子:

清单 19.例子 thread_deadlock.cc
static mutex g_mutex1, g_mutex2;
static void
inc1(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex1.lock();
 (*p)++;
 g_mutex2.lock();
 // do something.
 g_mutex2.unlock();
 g_mutex1.unlock();
 }
}
static void
inc2(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex2.lock();
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 // do other thing.
 g_mutex2.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc1, &a);
 thread tb( inc2, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

在这个例子中,g_mutex1 和 g_mutex2 都是互斥的资源,任意时刻都只有一个线程可以持有(加锁成功),而且只有持有线程调用 unlock 释放锁资源的时候其它线程才能去持有,满足条件 1 和 3,线程 ta 持有了 g_mutex1 之后,在释放 g_mutex1 之前试图去持有 g_mutex2,而线程 tb 持有了 g_mutex2 之后,在释放 g_mutex2 之前试图去持有 g_mutex1,满足条件 2 和 4,这种情况之下,当线程 ta 试图去持有 g_mutex2 的时候,如果 tb 正持有 g_mutex2 而试图去持有 g_mutex1 时就发生了死锁。在有些环境下,可能要多次运行这个例子才出现死锁,实际工作中这种偶现特性让查找问题变难。要破除这个死锁,我们只要按如下代码所示破除条件 3 和 4 即可:

清单 20.例子 thread_break_deadlock.cc
static mutex g_mutex1, g_mutex2;
static voi
inc1(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 g_mutex2.lock();
 // do something.
 g_mutex2.unlock();
 }
}
static void
inc2(int *p ){
 for(int i = 0; i < COUNT; i++){
 g_mutex2.lock();
 // do other thing.
 g_mutex2.unlock();
 g_mutex1.lock();
 (*p)++;
 g_mutex1.unlock();
 }
}
void threadMutex(void){
 int a = 0;
 thread ta( inc1, &a);
 thread tb( inc2, &a);
 ta.join();
 tb.join();
 cout << "a=" << a << endl;
}

在一些复杂的并行编程场景,如何避免死锁是一个很重要的话题,在实践中,当我们看到有两个锁嵌套加锁的时候就要特别提高警惕,它极有可能满足了条件 2 或者 4。

结束语

上述例子在 CentOS 6.5,g++ 4.8.1/g++4.9 以及 clang 3.5 下面编译通过,在编译的时候,请注意下述几点:

设置 -std=c++11; 链接的时候设置 -pthread; 使用 g++编译链接时设置 -Wl,–no-as-needed 传给链接器,有些版本的 g++需要这个设置; 设置宏定义 -D_REENTRANT,有些库函数是依赖于这个宏定义来确定是否使用多线程版本的。

具体可以参考本文所附的代码中的 Makefile 文件。

在用 gdb 调试多线程程序的时候,可以输入命令 info threads 查看当前的线程列表,通过命令 thread n 切换到第 n 个线程的上下文,这里的 n 是 info threads 命令输出的线程索引数字,例如,如果要切换到第 2 个线程的上下文,则输入命令 thread 2。

聪明地使用多线程,拥抱多线程吧。

12

鲜花

握手

雷人

路过

鸡蛋
返回顶部