RWMutex

RWMutex

RWMutex为借助Mutex实现的不可重入读写锁,在读多写少的情况下有着比互斥锁Mutex更好的效率,下面让我们从源码实现上看看RWMutex的高效性。

数据结构

基本结构如下:

type RWMutex struct {
    w           Mutex  // held if there are pending writers
    writerSem   uint32 // semaphore for writers to wait for completing readers
    readerSem   uint32 // semaphore for readers to wait for completing writers
    readerCount int32  // number of pending readers
    readerWait  int32  // number of departing readers
}

其中readerSemwriterSem为信号量,对应当前被阻塞的读者队列与写者队列。readerCountreaderWait会用于动态的表示活跃的读者数量,这两个变量会同时在读写锁中涉及到,因此在研究这两个变量时,需要同时结合读写锁实现来研究。

读锁

读锁的实现相对简单,首先看看如何获取读锁。

RLock

func (rw *RWMutex) RLock() {
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
}

读锁的获取需要查看当前是否有写者处于等待之中,判断的依据是atomic.AddInt32(&rw.readerCount, 1)原子加后readerCount是否小于0:

  • 如果小于0则请求readcerSem信号量,阻塞直到写者完成操作,由写者唤醒阻塞在信号量中的读者。
  • 如果大于等于0则说明当前没有写者,原子加记录读者数量后即可立即返回

至于为什么从readerCount来判断是否有写者?可以带着疑问来看下一节中写锁实现。

RUnlock

func (rw *RWMutex) RUnlock() {
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    // A writer is pending.
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

读锁释放需要判断r := atomic.AddInt32(&rw.readerCount, -1)原子减后readerWait是否小于0:

  • 如果小于0说明当前有写者处于等待队列中,由最后的读者来唤醒等待中的写者。
  • 如果大于等于0则说明无写者等待,因此原子减读者数量后即可立即返回。

readerWait变量的使用同样需要结合写锁的实现来仔细学习。

小结

在使用读锁时,会根据readerCount的值来判断当前是否有写者处于等待状态中,如果没有写者,那么直接通过quick path的原子操作修改读者数量即可返回。而如果发现有写者等待的话,则得通过slow path唤醒写者(RUnlock)或读者进入等待状态(RLock)

写锁

写者的实现则相对复杂,与读锁类似,同样分为slow path和quick path。

Lock

func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
}

对于写锁来说,必须得跟所有其他锁互斥,因此第一步便是取得互斥锁,将其他写者阻塞。

随后则是一条十分重要的语句

r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

这条语句需要拆分成两部分看,前一部分为原子减,减去了常量rwmutexMaxReaders=1<<30,减去这一步的目的是使得readerCount一定远远小于0,以此来表示写者正在获取锁的状态。这也是为什么在RLock()中根据readerCount+1后是否小于0可以判断当前是否已存在执行中的写者,以便将锁交给写者。

后半部分则是将常量加回表示当前活跃读者数量的快照,由于可能会有读者释放读锁,因此变量r作为当前的快照是可能比实际数量大的。因此如果快照r比实际读者数量大,则说明当前有读者正在释放读锁。

随后则通过快照r的值来判断当前是否存在已获取读锁的活跃读者,并通过readerWait来判断实际活跃的读者。此时,readerWait处于[-r, r]的区间之中,具体的数值取决于读锁的释放时机:

  • 假如readerWait == -r,这说明在写者获取锁时的活跃读者已将全部读锁释放,因此readerWait在原子加后必定为0,写者可以直接获取写锁而无需进入slow path。
  • 如果readerWait处于(-r, 0),说明已有部分活跃读者已将读锁释放,但由于仍存在活跃读者,原子加后readerWait处于(0, r)中,写者需要通过写者信号量进入slow path等待。
  • 如果readerWait == 0,说明没有活跃读者释放读锁,原子加后readerWait即等于活跃读者数。

Unlock

func (rw *RWMutex) Unlock() {
    // Announce to readers there is no active writer.
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

    // Unblock blocked readers, if any.
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
}

写锁的释放相对简单,因为写锁一旦获得说明当前只有自身一个写者。首先通过原子加 readerCount + rwmutexMaxReaders,来使得新进入的读者可以直接获取读锁。随后则是将阻塞在读者信号量中的读者逐一唤醒,最后释放互斥锁来允许新的写者。

总结

RWMutex 通过细粒度的状态控制将读写锁实现为quick path和slow path,并且通过原子操作来使得读不加锁,以此最大化读写锁的效率。