C++11锁

0x00 前言

下面是我从C++11之多线程(二、互斥对象和锁)上找的一段代码

std::set<int> int_set;
auto f = [&int_set]() {
try {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(1, 1000);
for(std::size_t i = 0; i != 100000; ++i) {
int_set.insert(dis(gen));
}
} catch(...) {}
};
std::thread td1(f), td2(f);
td1.join();
td2.join();

由于std::set::insert不是多线程安全的,多个线程同时对同一个对象调用insert其行为是未定义的(通常导致的结果是程序崩溃)。因此需要一种机制在此处对多个线程进行同步,保证任一时刻至多有一个线程在调用insert函数。

0x01 锁的使用

下面的这段代码,是我在上一篇博客中所写的多线程中进行异常处理的一种方式,就是把所有的异常全部放在一个vector里面,我们需要确保在同一时刻只有一个线程对vector进行插入操作,所以我们必须为其加上一个锁,锁这个东西,依据我个人的理解,是一种互斥关系,有一个线程创建了这个互斥关系,那么当第二个线程再去创建同样的互斥关系的时候就会受到阻塞,就需要等待当前持有锁的线程来解锁,然后继续访问临界资源。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
std::vector<std::exception> exptr;
std::mutex mut;
void func()
{
// 加锁
std::lock_guard<std::mutex> lock(mut);
exptr.push_back(std::exception{ "ERROR!" });
}
int main()
{
std::thread thd1(func);
thd1.detach();
std::thread thd2(func);
thd2.detach();
for(auto & e : exptr)
{
std::cout << e.what() << std::endl;
}
system("pause");
return 0;
}

通过上面的代码来简单介绍一下锁的使用,在C++11中的mutex头文件中定义了四种锁:

  • mutex:提供了核心的 lock()unlock() 方法,用来加解锁,以及当 mutex 不可用时就会返回的非阻塞方法 try_lock()

  • recursive_mutex:依据名字可以看出这是递归锁,就是允许同一个线程对锁进行多重持有,多用于线程函数需要进行递归操作的情况

  • timed_mutex:时间锁,可以使用函数try_lock_for()try_lock_until()来在特定的时长内持有mutex或持有锁到某个特定的时间点

  • recursive_timed_mutexrecursive_mutextimed_mutex 的结合

0x02 std::mutex的使用

通过两个程序来演示一下用锁和不用锁的区别:

不用锁的情况:

#include <iostream>
#include <thread>
#include <mutex>
void fun()
{
std::cout << "Enter thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 1000));
std::cout << "Exit thread: " << std::this_thread::get_id() << std::endl;
}
int main()
{
srand(time(nullptr));
std::thread t1(fun);
std::thread t2(fun);
std::thread t3(fun);
t1.join();
t2.join();
t3.join();
system("pause");
return 0;
}

程序输出:

Enter thread: 9012
Enter thread: 19044
Enter thread: 20336
Exit thread: 9012
Exit thread: 19044
Exit thread: 20336

使用锁的情况:

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx;
void fun()
{
mtx.lock();
std::cout << "Enter thread: " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(rand() % 1000));
std::cout << "Exit thread: " << std::this_thread::get_id() << std::endl;
mtx.unlock();
}
int main()
{
srand(time(nullptr));
std::thread t1(fun);
std::thread t2(fun);
std::thread t3(fun);
t1.join();
t2.join();
t3.join();
system("pause");
return 0;
}

程序输出:

Enter thread: 18484
Exit thread: 18484
Enter thread: 12256
Exit thread: 12256
Enter thread: 18572
Exit thread: 18572

通过使用锁的示例我们可以看到,使用锁之后其他线程必需在等待锁释放之后才能调用线程函数,否则线程函数就一直处于阻塞状态。当占有锁的线程释放锁的时候,其他线程才有可能进入临界区。

如果将锁的定义放在线程函数fun()里面会怎么样呢?

如果将锁的定义放在线程函数fun()里面会怎么样呢?通过试验我们得知,如果将锁的定义放在线程函数里面的话,程序的输出结果会和不使用锁的情况是一样的。这是为什么呢,因为三个线程创建了三把锁,三把不一样的锁,然后各自加锁各自解锁,互不干涉。反观在全局变量中使用锁的情况,三个线程使用了同一把锁,所以才能正确地锁住线程。

0x03 递归锁recursive_mutex

在我想这个递归锁的使用示例的时候,为了比较递归锁和普通mutex的区别,我设计了一个求斐波那契数列的算法,所编制的代码如下:

#include <iostream>
#include <thread>
#include <mutex>
#include <set>
std::mutex mtx;
std::set<int> g_vecFib;
int Fibonacci(int n)
{
mtx.lock();
int ret{ n <= 1 ? n : Fibonacci(n - 1) + Fibonacci(n - 2) };
g_vecFib.insert(ret);
mtx.unlock();
return ret;
}
int main()
{
srand(time(nullptr));
std::thread thd(Fibonacci, 10);
thd.join();
for (auto i : g_vecFib)
std::cout << i << std::endl;
system("pause");
return 0;
}

这个程序没有使用递归锁,所以在运行的时候会抛出异常,因为递归后的程序无法拿到锁,只需要将锁的定义从std::mutex mtx;改为std::recursive_mutex mtx;后程序即可不抛出异常,但是在写完这个程序之后,引发了我的深思,就是这个程序为什么需要锁,我一直找不到一个合适的理由,可能是仅仅将其作为一个介绍递归锁的一个实例罢了,同时我考虑了一种情况就是分配两个线程出来,第一个线程来求区间[0-10]以内的数列,第二个线程用来求区间[11-20]内的数列,但是这样做是毫无意义的,首先第二个线程必需依赖第一个线程所产生数列的结果,也就是说第二个线程必需等待第一个线程结束后,才能从set集合中取出用于求数列的充分条件,这样的设计显然是毫无意义的,至少我是这么认为的。

看到了网上的一篇有关递归锁介绍的文章,感觉他给出的示例代码的确很不错:

template <typename T>
class container
{
std::mutex _lock;
std::vector<T> _elements;
public:
void add(T element)
{
_lock.lock();
_elements.push_back(element);
_lock.unlock();
}
void addrange(int num, ...)
{
va_list arguments;
va_start(arguments, num);
for (int i = 0; i < num; i++)
{
_lock.lock();
add(va_arg(arguments, T));
_lock.unlock();
}
va_end(arguments);
}
void dump()
{
_lock.lock();
for(auto e : _elements)
std::cout << e << std::endl;
_lock.unlock();
}
};
void func(container<int>& cont)
{
cont.addrange(3, rand(), rand(), rand());
}
int main()
{
srand((unsigned int)time(0));
container<int> cont;
std::thread t1(func, std::ref(cont));
std::thread t2(func, std::ref(cont));
std::thread t3(func, std::ref(cont));
t1.join();
t2.join();
t3.join();
cont.dump();
return 0;
}

当你运行这个程序时,会进入死锁。原因:在 mutex 被释放前,容器尝试多次持有它,这显然不可能。这就是为什么引入 std::recursive_mutex ,它允许一个线程对 mutex 多重持有。允许的最大持有次数并不确定,但当达到上限时,线程锁会抛出 std::system_error错误。因此,要解决上面例子的错误,除了修改 addrange 令其不再调用 lockunlock 之外,可以用 std::recursive_mutex 代替 mutex

另注意:

  • 递归锁效率低于普通锁

  • 需要用到递归锁定的多线程互斥处理往往本身就是可以简化的,允许递归互斥很容易放纵复杂逻辑的产生,从而导致一些多线程同步引起的晦涩问题;

  • 递归锁虽然允许同一线程多次获得同一互斥量,但是可重复获得的最大次数并未具体说明,一旦超过一定次数,再对lock进行调用就会抛出std::system错误

0x04 时间锁std::timed_mutexrecursive_timed_mutex

时间锁是用来指定锁住一定的时间段或直到一个时间点解锁。提供了两个函数try_lock_fortry_lock_until,用来设置时间,下面是一段示例代码,摘抄自深入应用C++11之多线程

std::timed_mutex;
void work()
{
std::chrono::milliseconds timeout(1000);
while(true)
{
if(mutex.try_lock_for(timeout))
{
//do some work
mutex.unlock();
}
}
}

try_lock_for是设置一个超时时间,try_lock_until是设置一个超时的时间点

0x05 RAII风格的加锁(互斥对象管理类模板)

0x00 什么是RAII

RAII(Resource Acquisition Is Initialization),也称直译为“资源获取就是初始化”,是C++语言的一种管理资源、避免泄漏的机制。 C++标准保证任何情况下,已构造的对象最终会销毁,即它的析构函数最终会被调用。RAII 机制就是利用了C++的上述特性,在需要获取使用资源RES的时候,构造一个临时对象(T),在其构造T时获取资源,在T生命期控制对RES的访问使之始终保持有效,最后在T析构的时候释放资源。以达到安全管理资源对象,避免资源泄漏的目的。

0x01 std::lock_guard

显式的加锁和解锁会导致一些问题,比如忘记解锁或者请求加锁的顺序不正确,进而产生死锁。std::lock_guard就是基于RAII原则开发的一套模板,在它的构造函数里面会调用锁的lock函数从而实现加锁,当出了他的定义域之后C++就会自动调用他的析构函数,在它的析构函数中会自动调用unlock函数进行解锁,下面来看一个用上面求斐波那契数列的例子改造过来的使用std::lock_guard的例子

std::recursive_mutex mtx;
std::set<int> g_vecFib;
int Fibonacci(int n)
{
std::lock_guard<std::recursive_mutex> lock(mtx);
int ret{ n <= 1 ? n : Fibonacci(n - 1) + Fibonacci(n - 2) };
g_vecFib.insert(ret);
return ret;
}

0x02 std::unique_lock

std::unique_lock里面实现了try_lock_fortry_lock_until两个函数,用来设置时间锁。

0x03 互斥对象管理类模板的加锁策略

上面提到的std::lock_guardstd::unique_lock对于在构造的过程中是否加锁是可选的设置,C++提供了三种加锁的策略:

策略

描述

默认

请求锁,阻塞当前线程知道成功获得锁

std::defer_lock

不请求锁

std::try_to_lock

尝试请求锁,但不阻塞线程,锁不可用时也会立即返回

std::adopt_lock

假定当前线程已经得到了锁,所以不再请求锁

各类模板的策略支持性

策略

std::lock_guard

std::unique_lock

默认

支持

支持

std::defer_lock

不支持

支持

std::try_to_lock

不支持

支持

std::adopt_lock

支持

支持

可以通过指定构造函数的第二个参数来设置加锁策略,例如:

std::unique_lock<std::mutex> lock(mt, std::defer_lock);

0x06 对所有的互斥量均不能使用const关键字

一个互斥量(不管使用的哪一种实现)必须要获取和释放,这就意味着要调用非const的lock()和unlock()方法。所以从逻辑上来讲,lock_guard的参数不能使const(因为如果该方法为const,互斥量也必需是const)。同样在类里面也不能在const函数中使用lock_guard

0x07 std::lockstd::try_lock

这个函数一般用于对多个互斥对象进行加锁的情况,现在考虑下面一段代码:

#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1;
std::mutex mtx2;
int main()
{
std::thread thd1([]()
{
std::lock_guard<std::mutex> lock1(mtx1);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::lock_guard<std::mutex> lock2(mtx2);
std::cout << "Fun1 ended" << std::endl;
});
std::thread thd2([]()
{
std::lock_guard<std::mutex> lock2(mtx2);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::lock_guard<std::mutex> lock1(mtx1);
std::cout << "Fun2 ended" << std::endl;
});
thd1.detach();
thd2.detach();
system("pause");
return 0;
}

使用Visual Studio 2017来编译执行上述代码,发生以下异常:

依据程序的输出f:\dd\vctools\crt\crtw32\stdcpp\thr\mutex.c(51): mutex destroyed while busy,我们可以得知发生了死锁,为什么会发生这种现象呢?

在两个线程函数中会以相反的顺序去获得mtx1和mtx2两把锁,现在来考虑这种情况,线程1运行第一行代码拿到了锁1,恰好这个时候,线程2也运行到第一行代码拿到了锁2,现在,来看看当前2个线程的状态

  • 线程1持有锁1,等待锁2

  • 线程2持有锁2,等待锁1

发生了死锁,在上面的例子中因为我要明显的看到发生死锁然后抛出异常的例子,所以我在代码里面加上了std::this_thread::sleep_for(std::chrono::milliseconds(500));让线程拿到锁之后休息一下,这样就大大提升了发生死锁的概率,如果去掉这行代码,发生死锁的现象将会是一个带有一定概率的事件,有时候会发生有时候不会发生。

为了避免这种死锁,可以采取以下两种措施

  • 对于任意两把锁,在加锁的时候保持前后顺序的一致(不推荐),如果是这样的话,两个线程函数将被修改为如下形式:

    std::thread thd1([]()
    {
    std::lock_guard<std::mutex> lock1(mtx1);
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    std::lock_guard<std::mutex> lock2(mtx2);
    std::cout << "Fun1 ended" << std::endl;
    });
    std::thread thd2([]()
    {
    std::lock_guard<std::mutex> lock1(mtx1); // 保持相同的加锁顺序
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    std::lock_guard<std::mutex> lock2(mtx2);
    std::cout << "Fun2 ended" << std::endl;
    });

    下面我们来看一下程序输出:

    没有异常现象的发生

  • 使用std::lock来进行加锁,std::lock会使用一种避免死锁的算法来对N个需要加锁的对象加锁,std::lock可以接受N个参数

    #include <iostream>
    #include <thread>
    #include <mutex>
    std::mutex mtx1;
    std::mutex mtx2;
    int main()
    {
    std::thread thd1([]()
    {
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
    std::lock(lock1, lock2);
    std::cout << "Fun1 ended" << std::endl;
    });
    std::thread thd2([]()
    {
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::lock(lock2, lock1);
    std::cout << "Fun2 ended" << std::endl;
    });
    thd1.detach();
    thd2.detach();
    system("pause");
    return 0;
    }

0x08 参考文献

C++11之多线程(二、互斥对象和锁)

深入应用C++11之多线程

C++11实现模板化(通用化)RAII机制