map(Swiss Tables)
约 3938 字大约 13 分钟
2024-05-19
map 的底层结构
Go 1.24 起,map 的底层实现正式从旧版的 hmap(固定 8-slot 桶 + 溢出链接桶)切换为 Swiss map(基于 SwissTable 算法),采用可扩展哈希(Extendible Hashing)+ 分组探测(组内 SIMD 批量比较 + 组间三角数探测),相比旧版有更好的缓存局部性和查找性能。
Map internal/runtime/maps/map.go
// SwissMapGroupSlotsBits = 3
// SwissMapGroupSlots = 1 << SwissMapGroupSlotsBits // 8
type Map struct {
// 已填充 slot 的数量(即所有 table 中元素的总数),不包括已删除的 slot。
// 必须排在第一位(编译器需要知道此字段,用于内置函数 len())。
used uint64
// seed 是哈希种子,每个 map 都会计算一个唯一的随机数作为种子。
seed uintptr
// table 的目录。
//
// 通常情况下,dirPtr 指向一个 table 指针数组:
//
// dirPtr *[dirLen]*table
//
// 该数组的长度(dirLen)为 `1 << globalDepth`,多个条目可能指向同一张 table。
// 详情请参阅顶层注释。
//
// 小 map 优化:如果 map 中的元素数量始终不超过 abi.SwissMapGroupSlots,
// 则整个 map 可以放入单个 group 中。此时 dirPtr 直接指向该 group:
//
// dirPtr *group
//
// 在这种情况下,dirLen 为 0,used 记录该 group 中已使用 slot 的数量。
// 注意,小 map 不会有已删除的 slot(因为无需维护探测序列)。
dirPtr unsafe.Pointer
dirLen int
// 在 table 目录查找中使用的位数。
globalDepth uint8
// 目录查找时需要从哈希值中移出的位数。
// 在 64 位系统上,该值为 64 - globalDepth。
globalShift uint8
// writing 是一个标志位,在 map 写入期间会通过异或 1 进行切换。
// 通常写入时该值为 1,但若存在多个并发写入者,
// 通过切换可以增大双方检测到竞争条件的概率。
writing uint8
// tombstonePossible 为 false 时,表示已知该 map 中没有任何 table 包含墓碑标记(tombstone)。
tombstonePossible bool
// clearSeq 是调用 Clear 的序列计数器,用于在迭代过程中检测 map 是否被清空。
clearSeq uint64
}table internal/runtime/maps/table.go
Map 通过dirPtr管理一或多个 table,每个 table 是一个完整的哈希表分片
type table struct {
// 已填充的 slot 数(不含墓碑)。
used uint16
// 总 slot 数,始终为 2^N。等于 `(groups.lengthMask+1)*abi.SwissMapGroupSlots`。
capacity uint16
// 触发 rehash 前还可插入的数量。
// 当 used + tombstones > loadFactor * capacity 时触发 rehash。
growthLeft uint16
// 该 table 使用哈希高位的位数(用于目录查找)。
// 可能小于 globalDepth(目录扩展后,table 尚未分裂时)。
localDepth uint8
// 该 table 在目录中的起始索引。
// index == -1 表示该 table 已废弃(被分裂替换)。
index int
// slot group 数组。每个 group 包含 8 个 key/elem slot 及其控制字节。
groups groupsReference
}group
group 是 SwissTable 的核心数据单元,每个 group 包含 8 个 slot(key/elem 对)及对应的控制字节。由于 key/elem 类型在编译期才确定,运行时通过两个类型擦除的句柄来管理 group,实际布局由 *abi.SwissMapType 在运行时提供。
groupsReference
table.groups 字段的类型,表示一张 table 内所有 group 组成的数组:
type groupsReference struct {
// data 指向 group 数组的起点。
data unsafe.Pointer // data *[length]typ.Group
// lengthMask 等于 group 数量减一(group 数量始终为 2^N)。
// 用于快速取模:i % length == i & lengthMask。
lengthMask uint64
}通过 group(typ, i) 方法可以取出第 i 个 group,返回一个 groupReference:
func (g *groupsReference) group(typ *abi.SwissMapType, i uint64) groupReference {
offset := uintptr(i) * typ.GroupSize // 每个 group 大小在运行时确定
return groupReference{
data: unsafe.Pointer(uintptr(g.data) + offset),
}
}groupReference
表示单个 group 的操作句柄,本质是一个裸指针,类型信息完全擦除:
type ctrl uint8
type ctrlGroup uint64
// groupReference 是一个包装类型,用于表示存储在 data 处的单个 slot group。
// 一个 group 包含 abi.SwissMapGroupSlots(8) 个 slot 及其控制字(control word)。
type groupReference struct {
// data 指向这个 group,该 group 由 typ.Group 描述,其内存布局如下:
//
// type group struct {
// ctrls ctrlGroup
// slots [abi.SwissMapGroupSlots]slot
// }
//
// type slot struct {
// key typ.Key
// elem typ.Elem
// }
data unsafe.Pointer // data *typ.Group
}两者的层级关系:groupsReference(数组) → groupReference(单个 group) → ctrls / key / elem(slot 内字段)。
控制字节ctrl
每个 slot 对应一个控制字节(ctrl),有三种状态:
| Control Byte | 状态 | 说明 | Probe 行为 |
|---|---|---|---|
0b10000000 | Empty | 空闲 slot(从未使用,或删除后被清除) | Stop |
0b11111110 | Deleted | 已删除的 slot(所在 group 全满,无法清除) | Continue |
0b0??????? | Full | 已占用 slot,低 7 位存储 h2(hash) | Continue |
控制字节的最高位区分"Empty/Deleted"(1)与"Full"(0),使得 SIMD 指令(AMD64 上)或位运算可以一次并行匹配 8 个 slot,大幅减少 key 比较次数。
小 map 优化
当 map 中元素数量始终不超过 abi.SwissMapGroupSlots(8)时,整个 map 可以放入单个 group,此时:
dirPtr直接指向该 group,而非目录数组dirLen为 0- 小 map 不会产生墓碑(无探测序列需要维护)
- 插入第 9 个元素时,触发
growToTable,从 group 迁移到正式的 table 结构
哈希分离与探测序列
h1 / h2
Swiss map 将哈希值拆分为两部分(internal/runtime/maps/map.go):
func h1(h uintptr) uintptr { return h >> 7 } // 高位部分(64 位系统为高 57 位):用于在 table 内定位起始 group
func h2(h uintptr) uintptr { return h & 0x7f } // 低 7 位:写入control byte,用于快速过滤目录索引则使用哈希值的最高 globalDepth 位:dirIndex = hash >> globalShift。
| 哈希位段 | 用途 |
|---|---|
最高 globalDepth 位 | 目录索引,定位 table |
hash >> 7(h1) | table 内定位起始 group |
hash & 0x7f(h2) | control byte,快速过滤 |
注意:h1 =
hash >> 7的结果包含了目录索引所用的高位,但同一张 table 内的所有元素共享相同的高localDepth位(正是这些位决定了它们属于哪张 table),因此 h1 在 table 内的真正区分信息来自中间位段(bit 7 ~ bit (64-localDepth-1)),高位重叠不会造成冲突。
探测序列
探测序列采用三角数列(Triangular Probing):
// probeSeq 维护 probe sequence 的状态,用于遍历 table 中的所有 group。
// 该序列是如下形式的三角级数:
//
// p(i) := (i^2 + i)/2 + hash (mod mask+1)
//
// 序列输出的是 group 的索引。group 机制允许以极少的分支一次检查整个 group。
//
// 当 group 数量为 2^N 时,该 probe sequence 恰好访问每个 group 一次,
// 因为 (i^2+i)/2 是 Z/(2^m) 上的 双射(bijection)。
// 参见 https://en.wikipedia.org/wiki/Quadratic_probing
type probeSeq struct {
mask uint64 // group 数量 - 1,用于快速取模(i & mask == i % N)
offset uint64 // 当前所在 group 的索引
index uint64 // 当前探测步数(0, 1, 2, 3 ...)
}
func makeProbeSeq(hash uintptr, mask uint64) probeSeq {
return probeSeq{mask: mask, offset: uint64(hash) & mask, index: 0}
}
func (s probeSeq) next() probeSeq {
s.index++
s.offset = (s.offset + s.index) & s.mask
return s
}每次调用 next() 时,index 先加 1,再把新的 index 累加到 offset。因此第 i 步的偏移量是 1+2+…+i = i*(i+1)/2,移动距离依次递增(+1、+2、+3…),比简单线性探测(每次 +1)更分散,能缓解"聚集"问题。
双射:当 group 数量为 2^N 时,i*(i+1)/2 对 2^N 取模的结果恰好是 0, 1, …, 2^N-1 的一个全排列——即遍历 2^N 步后每个 group 恰好被访问一次,不重不漏,这就是"双射"(一一对应)的含义。以 4 个 group、起点为 0 为例:
步骤 i | 累计偏移 i*(i+1)/2 | 实际 group(mod 4) |
|---|---|---|
| 0 | 0 | 0 |
| 1 | 1 | 1 |
| 2 | 3 | 3 |
| 3 | 6 | 2 |
查找流程
以 m[key] 为例:
- 计算
hash := typ.Hasher(key, m.seed) - 取
hash >> (m.globalShift & 63)定位目录中的table - 在
table内,以h1(hash)为起点构造探测序列
seq := makeProbeSeq(h1(hash), t.groups.lengthMask)- 取出探测序列当前位置对应的 group
g := t.groups.group(typ, seq.offset)- 对该 group 批量比较
ctrls(8 个控制字节)与h2(hash)(AMD64 使用 SIMD,其他架构使用位运算)
match := g.ctrls().matchH2(h2(hash))- 对每个控制字节匹配的 slot,再做完整的 key 比较
for match != 0 {
i := match.first()
slotKey := g.key(typ, i)
if typ.IndirectKey() {
slotKey = *((*unsafe.Pointer)(slotKey))
}
if typ.Key.Equal(key, slotKey) {
slotElem := g.elem(typ, i)
if typ.IndirectElem() {
slotElem = *((*unsafe.Pointer)(slotElem))
}
return slotElem, true
}
match = match.removeFirst()
}- 若 group 内存在empty slot(
ctrlEmpty),则探测终止,返回零值
match = g.ctrls().matchEmpty()
if match != 0 {
// 找到一个empty slot意味着我们已经到达了探测序列的末尾。
// key 一定不会出现在第一个 empty 之后
return nil, false
}- 若未命中且无空 slot,继续探测下一个 group
h2(hash)过滤使得绝大多数情况下ctrl不匹配就能跳过,极大减少了Equal调用次数
完整源码
// Get performs a lookup of the key that key points to. It returns a pointer to
// the element, or false if the key doesn't exist.
func (m *Map) Get(typ *abi.SwissMapType, key unsafe.Pointer) (unsafe.Pointer, bool) {
return m.getWithoutKey(typ, key)
}
func (m *Map) getWithoutKey(typ *abi.SwissMapType, key unsafe.Pointer) (unsafe.Pointer, bool) {
if m.Used() == 0 {
return nil, false
}
if m.writing != 0 {
fatal("concurrent map read and map write")
}
hash := typ.Hasher(key, m.seed)
if m.dirLen == 0 {
_, elem, ok := m.getWithKeySmall(typ, hash, key)
return elem, ok
}
idx := m.directoryIndex(hash)
return m.directoryAt(idx).getWithoutKey(typ, hash, key)
}
func (t *table) getWithoutKey(typ *abi.SwissMapType, hash uintptr, key unsafe.Pointer) (unsafe.Pointer, bool) {
seq := makeProbeSeq(h1(hash), t.groups.lengthMask)
for ; ; seq = seq.next() {
g := t.groups.group(typ, seq.offset)
match := g.ctrls().matchH2(h2(hash))
for match != 0 {
i := match.first()
slotKey := g.key(typ, i)
if typ.IndirectKey() {
slotKey = *((*unsafe.Pointer)(slotKey))
}
if typ.Key.Equal(key, slotKey) {
slotElem := g.elem(typ, i)
if typ.IndirectElem() {
slotElem = *((*unsafe.Pointer)(slotElem))
}
return slotElem, true
}
match = match.removeFirst()
}
match = g.ctrls().matchEmpty()
if match != 0 {
// Finding an empty slot means we've reached the end of
// the probe sequence.
return nil, false
}
}
}删除与墓碑
删除一个 key 时(internal/runtime/maps/table.go):
// 若所在 group 存在 ctrlEmpty slot → 将此 slot 标记为 ctrlEmpty,t.growthLeft++
// 若所在 group 无 ctrlEmpty slot → 标记为 ctrlDeleted(墓碑),t.growthLeft 不变
//("无 ctrlEmpty"指 group 内 8 个 slot 全为 Full 或 Deleted,并非仅指全为 Full)墓碑的作用是维护探测序列的连续性:探测过程中遇到墓碑不会终止,而遇到空 slot 才终止。若将已删除 slot 直接标为空,则可能导致在该 slot 之后插入的元素无法被找到。
当墓碑数量超过 capacity 的 10% 时,pruneTombstones 会尝试批量清理可复用的墓碑 slot(无需移动元素),若清理量不足则触发 rehash。
扩容机制
Swiss map 采用可扩展哈希(Extendible Hashing),扩容时只操作单个 table,而非整体 rehash,从而实现增量式扩容。
负载因子
每个 table 的负载因子上限为 7/8(maxAvgGroupLoad = 7),即每 8 个 slot 最多填满 7 个。growthLeft 追踪当前 table 在触发 rehash 前还能插入的数量。
grow(原地扩容)
当 table.capacity * 2 <= maxTableCapacity(= 1024)时,直接将容量翻倍并重建 groups 数组,将所有元素重新插入新数组(同时消除所有墓碑):
func (t *table) rehash(typ *abi.SwissMapType, m *Map) {
newCapacity := 2 * t.capacity
if newCapacity <= maxTableCapacity {
t.grow(typ, m, newCapacity) // 容量翻倍,原地扩容
return
}
t.split(typ, m) // 超过上限,分裂为两张 table
}split(table 分裂)
当 table 达到最大容量(1024 个 slot)时触发分裂,将一张 table 拆成 left / right 两张新 table。
分裂步骤:
① 扩展目录(仅当 localDepth == globalDepth 时)
每张 table 用 localDepth 位哈希来标识自己归属的目录区间,目录大小为 1 << globalDepth。当两者相等时,目录里该 table 只占一个条目,没有多余空间放两张新 table,必须先将目录扩容一倍:
- 原来第
i个条目变成2i和2i+1两个条目,都指向同一张旧 table(指针共享,不复制 table) globalDepth++,globalShift--
② 分裂
localDepth++,以新 localDepth 对应的那一位哈希值为依据:该位为 0 流入 left,为 1 流入 right。所有元素重新插入两张新 table,旧 table 标记为废弃(index = -1)。
③ 安装新 table
left 占据旧 table 在目录中的起始位置,right 紧随其后。由于目录中可能有多个条目指向同一张 table(localDepth < globalDepth 的情况),left / right 各自可能占多个条目,由 replaceTable 批量写入。
示例: 初始 2 张 table,table B 触发分裂:
每次分裂只 rehash 被分裂的那张 table,其余 table(如 table A)完全不受影响。
预分配
已知元素数量时,建议通过 make 预分配容量,避免多次扩容:
// 预分配可容纳 1000 个元素的 map,减少 rehash 次数
m := make(map[string]int, 1000)并发安全
Go 的 map 不是并发安全的。Swiss map 通过 writing 标志位辅助检测并发写入:
writing初始值为 0;写操作开始前 XOR 1(变为1),结束后再 XOR 1(恢复为 0);其他 goroutine 在写操作开始时检测writing != 0即可发现竞争- 多个并发写入者会使
writing反复切换,提高双方检测到竞争条件的概率(但不保证必然检测到) - 检测到并发写入时触发
fatal(非 panic,无法 recover)
// 错误:多 goroutine 并发读写会触发 fatal
go func() { m["a"] = 1 }()
go func() { _ = m["a"] }()
// 正确:使用 sync.RWMutex 保护
var mu sync.RWMutex
var m = make(map[string]int)
// 写
mu.Lock()
m["key"] = 1
mu.Unlock()
// 读
mu.RLock()
v := m["key"]
mu.RUnlock()也可以使用标准库的
sync.Map,它针对读多写少或各 goroutine 操作不同 key 的场景做了专门优化。
使用注意事项
零值不可写
map 的零值为 nil,对 nil map 写入会 panic,但读取安全(返回零值):
var m map[string]int
_ = m["key"] // OK,返回 0
m["key"] = 1 // panic: assignment to entry in nil mapkey 必须可比较
map 的 key 必须是支持 == 的可比较类型,slice、map、func 不能作为 key:
m := map[[]int]int{} // 编译错误:invalid map key type []int迭代顺序随机
map 的迭代顺序是故意随机化的(每次迭代使用不同起始位置),不应依赖迭代顺序:
m := map[int]int{1: 1, 2: 2, 3: 3}
for k := range m {
fmt.Println(k) // 每次运行顺序可能不同
}需要有序遍历时,应先将 key 收集到切片并排序:
keys := make([]int, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Ints(keys)
for _, k := range keys {
fmt.Println(k, m[k])
}迭代期间增删元素
Go 规范允许在迭代 map 时增删元素,但行为不确定:
- 迭代期间删除尚未遍历的 key → 该 key 不会出现在后续迭代中
- 迭代期间新增的 key → 可能出现,也可能不出现在本次迭代中
若在迭代期间调用 clear(m) 清空整个 map,迭代器会通过 Map.clearSeq 字段检测到并提前终止:迭代器启动时记录 clearSeq 的当前值,每次推进前对比该值是否变化,若发生变化则立即中止迭代。
判断 key 是否存在
直接读取不存在的 key 会返回零值,需用二值赋值区分"不存在"与"值为零":
v, ok := m["key"]
if ok {
fmt.Println("found:", v)
} else {
fmt.Println("not found")
}