WaitGroup 原理解析
约 523 字大约 2 分钟
2024-12-08
WaitGroup 是 Go 中常用的并发同步工具,用于等待一组 goroutine 完成。下面我们从源码层面解析它的内部机制。
type WaitGroup struct {
noCopy noCopy
// Bits (high to low):
// bits[0:32] counter
// bits[32] flag: synctest bubble membership
// bits[33:64] wait count
state atomic.Uint64
sema uint32
}- counter 代表目前尚未完成的个数。WaitGroup.Add(n) 将会导致 counter += n, 而 WaitGroup.Done() 将导致 counter--
- wait count代表目前已调用 WaitGroup.Wait() 的 goroutine 的个数。
- 因为需要同时原子更新 counter 和 wait count,用一个 atomic.Uint64 可以保证它们在并发下的一致性
- sema 对应于 golang 中 runtime 内部的信号量的实现。WaitGroup 中会用到 sema 的两个相关函数,runtime_SemacquireWaitGroup 和 runtime_Semrelease。runtime_SemacquireWaitGroup 表示增加一个信号量,并挂起 当前 goroutine。runtime_Semrelease 表示减少一个信号量,并唤醒 sema 上其中一个正在等待的 goroutine。
Add()
func (wg *WaitGroup) Add(delta int) {
//...
state := wg.state.Add(uint64(delta) << 32) // counter+=delta
//...
v := int32(state >> 32)
w := uint32(state & 0x7fffffff)
//...
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return // 被等待协程没做完,或者没人在等待,返回
}
// ...
// Reset waiters count to 0.
wg.state.Store(0)
// ...
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0) // 被等待协程都做完,且有人在等待,唤醒所有sema中的协程
}
}- counter+=delta
- 被等待协程没做完,或者没人在等待,返回
- 被等待协程都做完,且有人在等待,唤醒所有sema中的协程
Done()
func (wg *WaitGroup) Done() {
wg.Add(-1)
}Wait()
func (wg *WaitGroup) Wait() {
// ...
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state & 0x7fffffff)
if v == 0 { // 如果没有等待的协程,直接返回
// ...
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) { // wait加一
// ...
runtime_SemacquireWaitGroup(&wg.sema, synctestDurable) // 挂起当前goroutine
//...
return
}
}
}- 如果没有等待的协程,直接返回
- 否则,wait count加一,挂起当前goroutine
