lock
约 3158 字大约 11 分钟
2025-12-14
概述
sync 包提供了 Go 最基础的两类互斥原语:
sync.Mutex:互斥锁,同一时刻只允许一个 goroutine 持有sync.RWMutex:读写锁,允许任意数量的读者或单个写者
两者的零值均为未加锁状态,首次使用后不可拷贝(由 noCopy 标记在 go vet 中捕获)。
sync.Mutex
Go 1.24 起,sync.Mutex 的实现迁移到 internal/sync,对外只是薄薄的转发:
type Mutex struct {
_ noCopy
mu isync.Mutex
}
func (m *Mutex) Lock() { m.mu.Lock() }
func (m *Mutex) TryLock() bool { return m.mu.TryLock() }
func (m *Mutex) Unlock() { m.mu.Unlock() }真正的实现在 internal/sync/mutex.go。
数据结构 internal/sync/mutex.go
type Mutex struct {
state int32 // 锁状态(位域)
sema uint32 // 信号量:阻塞/唤醒等待者
}state 是一个 32 位的位域,含义如下:
31 ... 3 | 2 | 1 | 0
+---------------------+---------+-------+--------+
| waiter count | starv | woken | locked |
+---------------------+---------+-------+--------+| bit | 常量 | 含义 |
|---|---|---|
| 0 | mutexLocked | 锁是否被持有 |
| 1 | mutexWoken | 已有 goroutine 被唤醒,Unlock 不必再唤醒其他等待者 |
| 2 | mutexStarving | 当前处于饥饿模式 |
| 3 ~ 31 | waiter count | 等待者数量 |
sema 是一个操作系统级信号量,用于让等待 goroutine 进入休眠与被唤醒(由 runtime 的 runtime_SemacquireMutex / runtime_Semrelease 管理)。
两种运行模式
sync.Mutex 的核心特点是正常模式与饥饿模式的自适应切换:
| 模式 | 行为 | 性能 | 公平性 |
|---|---|---|---|
| 正常模式 | 被唤醒的等待者与新到达的 goroutine 竞争锁;若竞争失败,等待者被放回队首 | 高 | 低 |
| 饥饿模式 | Unlock 时直接将锁移交给队首等待者;新到达的 goroutine 不尝试获取,也不自旋,直接入队尾 | 低 | 高 |
模式切换规则:
- 正常模式下,若某个等待者等待超过 1ms(
starvationThresholdNs = 1e6),在它重新竞争时将 mutex 切换为饥饿模式 - 饥饿模式下,等待者被移交锁后若发现 ① 自己是队列中最后一个等待者,或 ② 本次等待时间少于 1ms,则切换回正常模式
正常模式性能更高(新来的 goroutine 本身就在 CPU 上运行,无需唤醒开销),但可能导致尾延迟爆炸;饥饿模式以牺牲吞吐换取公平。Go 的设计是默认走正常模式,仅在出现尾延迟风险时临时切到饥饿模式。
Lock 流程
func (m *Mutex) Lock() {
// Fast path:未加锁时直接 CAS 抢锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
m.lockSlow()
}快速路径:state == 0 时一次 CAS 即可拿锁,这是绝大多数无竞争场景的路径,可被内联。
慢路径 lockSlow 的核心逻辑:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota // = 3,waiter 计数从第 3 位起
starvationThresholdNs = 1e6 // 1ms
)
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// ① 正常模式 + 锁已被持有 + 可自旋 → 自旋等待
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 尝试设置 mutexWoken,通知 Unlock 不必再唤醒其他等待者
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
// ② 构造新 state
new := old
if old&mutexStarving == 0 {
new |= mutexLocked // 正常模式才尝试抢锁
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift // 需要排队 → waiter++
}
if starving && old&mutexLocked != 0 {
new |= mutexStarving // 本 goroutine 触发饥饿模式
}
if awoke {
new &^= mutexWoken
}
// ③ CAS 更新 state
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 抢锁成功
}
// ④ 进入等待:若此前已等过,排到队首;否则排队尾
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 2)
// ⑤ 被唤醒
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// 饥饿模式:锁已被直接移交给本 goroutine
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving // 退出饥饿模式
}
atomic.AddInt32(&m.state, delta)
break
}
// 正常模式:被唤醒后回到循环重新竞争
awoke = true
iter = 0
} else {
old = m.state
}
}
}关键点:
- 自旋:仅在正常模式且
runtime_canSpin允许时自旋(多核、非饥饿、持锁时间短)。自旋避免了 goroutine 上下文切换的开销 mutexWoken标记:自旋中的 goroutine 设置此位,Unlock看到后就不再唤醒信号量上的其他等待者,避免多余的唤醒开销- 队首排队(LIFO):被唤醒后若再次竞争失败,优先排回队首,缩短新一轮等待时间
- 1ms 阈值:等待超过 1ms 即置
starving = true,下次 CAS 时将 mutex 切入饥饿模式
Unlock 流程
func (m *Mutex) Unlock() {
// Fast path:清除 locked 位
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex") // 未加锁就 Unlock
}
if new&mutexStarving == 0 {
// ===== 正常模式 =====
old := new
for {
// old>>mutexWaiterShift == 0 → 无需唤醒
// - mutexLocked 被设置:说明在你 unlock 之后、执行到这里之前,已经有别的 goroutine 通过 CAS 抢到锁了。锁已经有主人了,不需要你再唤醒别人
// - mutexWoken 被设置:说明已经有一个 goroutine 正在自旋等待(它在 lockSlow 的自旋阶段设置了这个标志)。它醒着呢,会自己去抢锁,不需要再额外唤醒队列里的人。
// - mutexStarving 被设置:说明在你 unlock 到现在这段时间里,锁已经被切换成饥饿模式了。饥饿模式下所有权是直接移交的,走的是另一条路径(函数最后的 else 分支)。当前 goroutine 在 unlock 时没观察到饥饿模式,说明它不属于这个移交链,不应该插手,所以退出。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 等待者数量减 1,设置 mutexWoken 标志防止其他 goroutine 重复唤醒
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// handoff=false:唤醒一个等待者,让它参与竞争(不保证一定拿到锁)
runtime_Semrelease(&m.sema, false, 2)
return
}
old = m.state
}
} else {
// ===== 饥饿模式 =====
// 什么都不改,只 Semrelease(handoff=true) 把锁"交棒"给队首等待者,
// 并让出时间片使其立即运行。state 的修正由被唤醒者自己完成。
runtime_Semrelease(&m.sema, true, 2)
}
}- 正常模式:最多唤醒一个等待者(通过
mutexWoken避免多余唤醒),不直接移交锁,被唤醒者需与新来者竞争 - 饥饿模式:
Semrelease(..., handoff=true)直接移交所有权并让出时间片,保证队首等待者立刻运行
sync.RWMutex
数据结构 sync/rwmutex.go
type RWMutex struct {
w Mutex // 写者之间的互斥(排队)
writerSem uint32 // 写者等待读者退出的信号量
readerSem uint32 // 读者等待写者退出的信号量
readerCount atomic.Int32 // 当前持有/请求读锁的读者数(可为负)
readerWait atomic.Int32 // 写者阻塞期间还未退出的读者数
}
const rwmutexMaxReaders = 1 << 30字段说明:
w:一个sync.Mutex,用于多个写者之间排队(同时只有一个写者进入"申请写锁"流程)readerCount:当前读者数量,写者到达时会被 减去rwmutexMaxReaders,从而变成负数,用于通知新读者"有写者在等"readerWait:写者进入等待后,记录还需等多少个已进入的读者释放writerSem/readerSem:两个信号量,分别用于写者等待读者退出、读者等待写者退出
rwmutexMaxReaders = 1 << 30 作为"哨兵偏移":int32 能容纳最多 1 << 30 个读者而不溢出符号位,符号位则被复用为" 写者已宣告写意图"的状态标志。
readerCount 的编码
readerCount 是 RWMutex 最精妙的设计。它用一个 int32 同时表达了"读者数量"与"是否有写者在等":
readerCount 值 | 含义 |
|---|---|
== 0 | 无读者,无写者 |
> 0 | 持有或等待读锁的读者数 |
< 0 | 有写者已宣告写意图,实际读者数 = readerCount + rwmutexMaxReaders |
新读者到达时只需一条原子加法即可得知该快速通过还是应阻塞,这是 RWMutex 读路径能保持几乎无锁的关键。
RLock / RUnlock
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
// 有写者在等 → 本读者休眠等待
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
// 有写者在等:readerWait 归 0 时,唤醒写者
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}读锁的快速路径只有一条原子加法:
RLock:readerCount++。若结果>= 0(无写者)立即返回;若< 0(有写者在等)在readerSem上休眠RUnlock:readerCount--。若结果>= 0立即返回;若< 0进入慢路径,递减readerWait,若归 0 则唤醒写者
Lock / Unlock
func (rw *RWMutex) Lock() {
// ① 与其他写者排队
rw.w.Lock()
// ② 宣告写意图:readerCount -= rwmutexMaxReaders(变负)
// r = 宣告前的活跃读者数
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// ③ 若还有活跃读者,等它们全部 RUnlock
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}
func (rw *RWMutex) Unlock() {
// ① 撤销写意图:readerCount += rwmutexMaxReaders(恢复为非负)
// r = 在写者持锁期间堆积的新读者数
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
// ② 逐个唤醒堆积的读者
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// ③ 释放写者互斥,允许下一个写者
rw.w.Unlock()
}Lock 三个阶段
- 与其他写者串行化:
rw.w.Lock()排队,保证同一时刻只有一个写者进入后续流程 - 阻断新读者:
readerCount -= rwmutexMaxReaders使其瞬间变负,此后新到达的读者全部在readerSem上阻塞 - 等待存量读者退出:仍持有读锁的读者数记为
r并写入readerWait,写者在writerSem上休眠;每个存量读者RUnlock时readerWait--,归零的那个读者负责唤醒写者
Unlock 三个阶段
- 恢复读路径:
readerCount += rwmutexMaxReaders变回非负,读者重新畅通 - 唤醒堆积读者:写者持锁期间累积在
readerSem上的r个读者逐个释放 - 放行下一个写者:
rw.w.Unlock()释放写者互斥,排队中的下一个写者得以进入
写者优先策略
RWMutex 明确采用写者优先策略:写者一旦调用 Lock(即使还未真正拿到锁),新到达的 RLock 就会阻塞,直到该写者及后续排队的写者全部处理完。
这是为了防止写饥饿。读多写少场景下,若允许新读者插队,写者可能永远拿不到锁。代价是:
RLock不可递归使用(持有读锁的 goroutine 再次RLock可能与某个Lock形成死锁)。
不支持锁升级/降级
Go 的 RWMutex 既不支持锁升级(RLock → Lock),也不支持锁降级(Lock → RLock)。必须先完全释放再重新获取。
// 错误:锁升级必然死锁
rw.RLock()
rw.Lock() // 会阻塞等自己 RUnlock,死锁使用注意事项
不可拷贝
所有 sync 包中的锁类型在首次使用后不可拷贝。拷贝会造成两个独立的状态,导致锁失效:
type Counter struct {
mu sync.Mutex
n int
}
func bad(c Counter) { // ❌ 拷贝了 mu
c.mu.Lock()
defer c.mu.Unlock()
c.n++
}
func good(c *Counter) { // ✅ 传指针
c.mu.Lock()
defer c.mu.Unlock()
c.n++
}go vet 会检测此类拷贝并报错(copylocks 分析器)。
不可重入
可重入锁(Reentrant Lock) 是指同一个线程(或 goroutine)可以对同一把锁多次加锁而不会阻塞自身,内部通过记录持有者身份和加锁次数来实现。每次 Lock 计数加一,每次 Unlock 计数减一,计数归零时才真正释放锁。Java 的 ReentrantLock 和 synchronized 就是典型的可重入锁。
Go 的 Mutex 和 RWMutex 都不是可重入锁。Mutex 内部不记录持有者身份,第二次 Lock 无法判断"调用者就是持锁者" ,会像普通竞争者一样阻塞,而此时 Unlock 永远不会被调用,形成自死锁:
func doA(mu *sync.Mutex) {
mu.Lock()
defer mu.Unlock()
doB(mu)
}
func doB(mu *sync.Mutex) {
mu.Lock() // 死锁:当前 goroutine 已持有该锁
defer mu.Unlock()
// ...
}RWMutex 同样如此,且有更隐蔽的变体,持有读锁时再申请写锁:
func readThenWrite(rw *sync.RWMutex) {
rw.RLock()
defer rw.RUnlock()
rw.Lock() // 死锁:写者等待所有读者退出,但当前 goroutine 正是读者之一
defer rw.Unlock()
}常见的规避方式:
- 拆分内部方法:对外的公开方法负责加锁/解锁,内部方法(如
doXxxLocked)假定调用者已持锁,不再加锁 - 缩小临界区:在调用子函数前释放锁,子函数自行按需加锁
必须配对
Lock/Unlock、RLock/RUnlock 必须成对出现。推荐使用 defer 保证释放:
mu.Lock()
defer mu.Unlock()
// 临界区重复 Unlock 一个未加锁的 mutex 会直接 fatal(不可 recover):
fatal error: sync: unlock of unlocked mutex避免在持锁期间做昂贵操作
持锁期间禁止进行:
- 网络 I/O、磁盘 I/O
- 调用可能阻塞的 channel
- 调用用户回调(可能再次获取同一把锁导致死锁)
// ❌ 在锁内发 HTTP 请求
mu.Lock()
resp, _ := http.Get(url)
mu.Unlock()
// ✅ 先拷贝需要的数据,释放锁后再 I/O
mu.Lock()
snapshot := data.Clone()
mu.Unlock()
resp, _ := http.Get(snapshot.URL)