5 - 互斥Mutex

对于简单数据(整数、布尔值等标量)的线程安全,使用原子操作就够了。但当数据更复杂且必须在多个线程中使用这些数据时,就得使用显式的同步机制。

标准库支持互斥的形式包括 互斥体类(mutex)锁类,接下来看看这些类。

互斥体类

互斥体(mutual exclusion, mutex)的基本使用机制如下:

  • 希望与其他线程共享内存读写的一个线程试图锁定互斥体对象。如果另一个线程正在持有这个锁,希望获得访问的线程将被阻塞,直到锁被释放/超时。
  • 一旦线程获得锁,这个线程就可随意使用共享的内存。
  • 线程读写完共享内存后,线程将锁释放,使其他线程有机会获取锁。如果有多个线程正在等待锁,不保证哪个线程将优先获得锁。

C++标准提供了非定时和定时的互斥体类,有递归和非递归两种风格。不过首先来看看自旋锁。

自旋锁

自旋锁是互斥锁的一种,其中线程使用忙碌循环方式来尝试获取锁,之后执行工作并释放锁。自旋锁虽然让线程保持活跃,但它们完全可以在自己代码中实现,不需要对操作系统进行任何昂贵的调用,也不会造成线程切换的任何开销。

使用atomic_flag实现的自旋锁如下:

atomic_flag spinlock = ATOMIC_FLAG_INIT;
void doWork()
{
    // 获取自旋锁
	while (spinlock.test_and_set()) {}
    // do something
    // 不用锁记得释放
	spinlock.clear();
}

非定时的互斥体类

标准库有三个非定时的互斥体类:mutexrecursive_mutexshared_mutex。前两个类在<mutex>中定义,最后一个在<shared_mutex>中定义。

这些类都有下列方法:

  • lock():调用线程尝试获取锁,并阻塞直到获取锁。这个方法会 无限期阻塞线程,使用定时互斥体类可以对阻塞限时。
  • try_lock():调用线程尝试获取锁。如果当前锁被其他线程持有,调用会立即返回false;否则返回true
  • unlock():释放由调用线程持有的锁,使另一个线程能获取这个锁。

接下来详细介绍一下这三个非定时互斥体类:

  • mutex:标准的具有独占所有权语义的互斥体类,同时只能有一个线程拥有互斥体。已经拥有该互斥体的线程不能再调用lock()try_lock(),否则可能会导致死锁

  • recursive_mutex:和mutex差不多,区别在于允许已经拥有该互斥体所有权的线程在该互斥体上再次调用lock()try_lock()。并且调用unlock()的次数应该等于前面上锁的次数。

  • shared_mutex:支持“共享锁拥有权”的概念,这也称为readerswriters锁,线程可以获得锁的 独占所有权共享所有权

    • 独占所有权:也称为“写锁”,仅当没有其他线程拥有独占或共享所有权时才能获得。和它相关的方法是lock()try_lock()unlock()
    • 共享所有权:也称为“读锁”,如果其他线程都没有独占所有权即可获得,且允许其他线程获取共享所有权。和它相关的方法是lock_shared()try_lock_shared()unlock_shared()

    不允许在已获取shared_mutex的线程上获取第二个锁,否则会产生死锁

定时的互斥体类

定时的互斥体类为lock()方法提供了对阻塞限时的功能,防止死锁。标准库提供了3个定时的互斥体类:timed_mutexrecursive_timed_mutexshared_timed_mutex。前两个类在<mutex>中定义,最后一个在<shared_mutex>中定义。除了支持非定时互斥体类的方法外,它们还支持以下方法:

  • try_lock_for(rel_time):调用线程尝试在给定的相对时间内获取锁,返回布尔值。这里超时时间是std::chrono::duration
  • try_lock_until(abs_time):调用线程尝试获取锁,直到系统时间等于或超过给定的绝对时间,返回布尔值。这里绝对时间是std::chrono::time_point

锁类

由于我们常常上了锁忘记解锁,且锁类是资源,适用于RAII原则。于是标准库提供了四种类型的RAII锁类,它们可以通过析构函数去自动释放所关联的互斥体。

lock_guard

<mutex>中定义,有两个构造函数:

  • explicit lock_guard(mutex_type& m);

    接受一个互斥体引用的构造函数。这个构造函数尝试获取互斥体上的锁,并阻塞直到获得锁。

  • lock_guard(mutex_type& m, adopt_lock_t adopt_lock);

    接受一个互斥体引用和一个adopt_lock。这个构造函数假设调用线程已经获得互斥体上的锁,无需阻塞获得锁操作。

unique_lock

<mutex>中定义,是一类更复杂的锁。它允许将获得锁的时间延迟到需要计算的时候,而不是声明变量之后。它的构造函数如下:

  • explicit unique_lock(mutex_type& m);

    接受一个互斥体引用的构造函数。这个构造函数尝试获取互斥体上的锁,并阻塞直到获得锁。

  • unique_lock(mutex_type& m, defer_lock_t defer_lock);

    接受一个互斥体引用和一个defer_lock。这个构造函数存储互斥体的引用,但不立即尝试获得锁,锁可以稍后获得。

  • unique_lock(mutex_type& m, try_to_lock_t try_to_lock);

    接受一个互斥体引用和一个try_to_lock。这个构造函数尝试立即获取互斥体上的锁,获取不到不会阻塞,而是稍后尝试获取锁。

  • unique_lock(mutex_type& m, adopt_lock_t adopt_lock);

    lock_guard描述。

  • unique_lock(mutex_type& m, const chrono::time_point<Clock, Duration>& abs_time);

    接受一个互斥体引用和一个绝对时间。这个构造函数尝试获取一个锁,直到系统时间超出绝对时间。

  • unique_lock(mutex_type& m, const chrono::duration<Rep, Period>& rel_time);

    接受一个互斥体引用和一个相对时间。这个构造函数尝试获取一个锁,直到到达给定的相对超时时间。

shared_lock

<shared_mutex>中定义,构造函数和方法与unique_lock相同。可用作unique_lock的替代品,但获得的是共享锁,而不是独占锁。

scoped_lock

<mutex>中定义,与lock_guard类似,但可以接受数量可变的互斥体。这样就能方便获取多个锁。

一些函数

一次性获得多个锁

可以通过可变参数模板函数lock()来一次性获得多个锁,它的定义如下:

template <class L1, class L2, class... L3> void lock(L1&, L2&, L3&...);

该函数不按指定顺序锁定所有给定的互斥体对象,没有出现死锁的风险。如果其中一个互斥锁调用发生异常,则在所有已经获得的锁上调用unlock()

除此之外还有try_lock(),它的定义和前者类似,按顺序调用每个给定互斥体对象的try_lock()。如果都成功调用,返回-1;否则对所有已经获得的锁调用unlock(),返回失败互斥体的参数位置。

下例演示使用泛型函数lock()

mutex mut1;
mutex mut2;

int main()
{
	// 创建两个锁, 但不获取锁
	unique_lock<mutex> lock1(mut1, defer_lock);
	unique_lock<mutex> lock2(mut2, defer_lock);

	// 获得锁
	lock(lock1, lock2);
	cout << "Do something...\n";
	// 释放锁

	return 0;
}

而使用scope_lock的示例如下,可以发现更简便一些:

mutex mut1;
mutex mut2;

int main()
{
	// 获得锁
	scoped_lock<mutex, mutex> locks (mut1, mut2);
	cout << "Do something...\n";
	// 释放锁

	return 0;
}

std::call_once()

std::call_once()std::once_flag的结合使用可以确保某函数/方法 正好只调用一次,不论有多少 在同一call_once 的 线程试图调用都是如此。

如图,在同一个once_clag中,线程1执行有效的call_once()调用(执行给定函数时没有抛出异常);线程2想执行call_once()却被阻塞,直到线程1的有效调用执行完成;线程3不会被阻塞,而是直接返回结果,因为线程1已经执行完成了。

下例将使用call_once(),让多线程只执行一次init()函数:

once_flag g_onceFlag;

void init()
{
	cout << "Init...\n";
}

int main()
{
	vector<thread> threads(3);
	for (auto& t : threads)
	{
		t = thread([] {
			call_once(g_onceFlag, init);
			cout << "Processing...\n";
		});
		t.join();
	}
	return 0;
}

// 输出
Init...
Processing...
Processing...
Processing...

具体示例

以线程安全方式写入流

2-线程的基本使用 这篇文章中,多线程同时输出会导致输出内容混乱。这里使用两种解决方案:

  1. 使用同步流

    C++20引入了std::basic_osyncstream,并分别为char流和wchar流预定义了类型别名osyncstreamwosyncstream,均在<syncstream>中定义。这些类 保证所有通过它们完成的输出都将在同步流被销毁的那一刻出现在最终输出流中,且不会导致输出交错

    仅需像使用cout那样即可:

    osyncstream(cout) << "xxxx";
  2. 使用锁

    要添加一个静态互斥对象,然后用lock_guard进行管理:

    class Counter
    {
    public:
        Counter(int id, int numIterations)
            : m_id(id), m_numIterations(numIterations) {}
        void operator() () const
        {
            for (int i = 0; i < m_numIterations; ++i)
            {
                lock_guard<mutex> lock(ms_mutex);
                cout << "Counter " << m_id << " has value " << i << endl;
            }
        }
    private:
        int m_id;
        int m_numIterations;
        inline static mutex ms_mutex;
    };

    注意这里在for循环内使用lock_guard,防止阻塞其他线程时间过长,失去这段代码的所有多线程特性。

使用定时锁

使用上例,但是将互斥体替换为定时的互斥体,它将于200ms后尝试获取锁,获取到就输出,否则不输出:

class Counter
{
public:
    Counter(int id, int numIterations)
        : m_id(id), m_numIterations(numIterations) {}
    void operator() () const
    {
        for (int i = 0; i < m_numIterations; ++i)
        {
            unique_lock<timeed_mutex> lock(ms_timedMutex, 200ms);
            if (lock)
            {
                cout << "Counter " << m_id << " has value " << i << endl;
			} 
            else
            {
                // 没有获取到锁, 不做任何操作
            }
        }
    }
private:
    int m_id;
    int m_numIterations;
    inline static timed_mutex ms_timedMutex;
};

参考资料

  • 飘零的落花 - 现代C++详解
  • C++20高级编程