channel
约 4023 字大约 13 分钟
2025-12-28
概述
Channel 是 Go 语言实现 CSP(Communicating Sequential Processes)并发模型的核心原语。它提供了 goroutine 之间类型安全的通信与同步机制,使得"通过通信来共享内存"成为可能。
Channel 分为两类:
- 无缓冲 channel(
make(chan T)):发送和接收必须同时就绪,具有同步语义 - 有缓冲 channel(
make(chan T, n)):内部维护一个大小为 n 的环形缓冲区,缓冲区满时发送阻塞,空时接收阻塞
数据结构 runtime/chan.go
hchan
hchan 是 channel 的运行时表示,所有 channel 操作最终都作用于这个结构体:
type hchan struct {
qcount uint // 缓冲区中的元素数量
dataqsiz uint // 环形缓冲区的容量(即 make 时传入的 size)
buf unsafe.Pointer // 指向环形缓冲区的指针
elemsize uint16 // 单个元素的大小
closed uint32 // 关闭标志,0 为未关闭,1 为已关闭
timer *timer // timer channel 专用(time.After 等)
elemtype *_type // 元素的类型信息,用于数据拷贝和 GC
sendx uint // 环形缓冲区的发送索引(下一个写入位置)
recvx uint // 环形缓冲区的接收索引(下一个读取位置)
recvq waitq // 等待接收的 goroutine 队列
sendq waitq // 等待发送的 goroutine 队列
lock mutex // 保护 hchan 所有字段的互斥锁
}不变量:
- 对于有缓冲 channel:
sendq和recvq中至少有一个为空;qcount > 0意味着recvq为空(有数据可读就不需要等待);qcount < dataqsiz意味着sendq为空(有空间可写就不需要等待) - 对于无缓冲 channel:正常收发时
sendq和recvq也至少有一个为空,但在 select 场景下,同一个 goroutine 的 select 可能同时包含对同一 channel 的发送和接收。
waitq
waitq 是阻塞在 channel 上的 goroutine 等待队列,基于 sudog 实现的双向链表:
type waitq struct {
first *sudog
last *sudog
}环形缓冲区
有缓冲 channel 的 buf 字段指向一个固定大小的环形数组,通过 sendx、recvx 两个索引实现循环读写。
创建 channel - makechan runtime/chan.go
make(chan T, size) 编译后调用 runtime.makechan:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// 元素大小必须小于 64KB
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 检查内存溢出
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 无缓冲 channel 或元素大小为 0
// 只分配 hchan 本身
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case !elem.Pointers():
// 元素不含指针:hchan 和 buf 一次分配(连续内存)
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素含指针:分开分配(buf 需要被 GC 扫描)
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}三种内存分配策略的关键区别在于 GC 扫描:当元素不含指针时,整块内存不需要被 GC 扫描,可以合并分配减少一次 mallocgc 调用;当元素含指针时,buf 必须独立分配并传入 elem 类型信息,让 GC 知道需要扫描其中的指针字段。
发送 - chansend runtime/chan.go
c <- v 编译后调用 runtime.chansend1,最终进入 chansend。发送操作按优先级依次尝试三条路径:
快速路径(无锁)
if !block && c.closed == 0 && full(c) {
return false
}非阻塞模式下(select + default),先不加锁检查 channel 是否已满。这是一个竞态安全的优化,即使读到的值可能过时,由于 close 操作不可能让 channel 从"不可发送"变为"可发送",因此返回 false 在逻辑上永远是安全的。
full 函数通过单字读取判断:
func full(c *hchan) bool {
if c.dataqsiz == 0 {
return c.recvq.first == nil // 无缓冲:看是否有等待的接收者
}
return c.qcount == c.dataqsiz // 有缓冲:看缓冲区是否满
}路径一:直接发送给等待的接收者
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// 找到等待的接收者,直接发送
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}如果 recvq 中有等待的接收者,取出队首的 sudog,绕过缓冲区直接将数据拷贝到接收者的栈上:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep) // 直接跨栈拷贝
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1) // 唤醒接收者
}路径二:写入缓冲区
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep) // 拷贝到缓冲区槽位
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}缓冲区有空间时,将数据拷贝到 sendx 指向的位置,推进索引,增加计数。
路径三:阻塞等待
前两条路径都不满足时(无等待的接收者、缓冲区已满或无缓冲),如果是非阻塞模式(select + default),直接返回失败,走 default 分支:
if !block {
unlock(&c.lock)
return false
}否则当前 goroutine 需要挂起:
gp := getg()
mysg := acquireSudog()
mysg.elem = ep // 指向待发送数据
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg) // 入队到发送等待队列
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, ...)
KeepAlive(ep) // 确保数据在接收者拷贝完之前不被 GC 回收
// 被唤醒后
closed := !mysg.success // success=false 表示因 close 被唤醒
mysg.c = nil
releaseSudog(mysg)
if closed {
panic(plainError("send on closed channel"))
}
return true关键细节:
KeepAlive(ep)防止发送者的栈变量在接收者拷贝之前被 GC 回收- 唤醒后通过
mysg.success判断是正常完成还是 channel 被关闭,被 close 唤醒的发送者会 panic
接收 - chanrecv runtime/chan.go
v := <-c 编译为 chanrecv1,v, ok := <-c 编译为 chanrecv2,最终都调用 chanrecv。接收操作与发送对称,同样有三条路径:
快速路径(无锁)
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return false, false // 开放且为空 → 无法接收
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep) // 零值填充
}
return true, false // 已关闭且为空 → 返回零值
}
}这里需要两次检查 empty:第一次 empty 和 closed 之间可能有发送操作完成,因此关闭后必须再检查一次缓冲区是否仍为空。
func empty(c *hchan) bool {
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}empty 函数与发送端的 full 在逻辑上对称,但实现上有一个关键区别:empty 使用了 atomic 读取,而 full 没有。原因在于接收快速路径需要 empty(c) 和 atomic.Load(&c.closed) 两步配合,必须保证观测顺序的一致性;而发送快速路径仅依赖 full(c) 一个条件(closed==0 只是过滤已关闭的情况,不影响"返回 false"的正确性),因此 full 不需要 atomic。
路径一:直接从等待的发送者接收
lock(&c.lock)
if c.closed != 0 && c.qcount == 0 {
if ep != nil {
typedmemclr(c.elemtype, ep) // 零值填充
}
return true, false // 已关闭且为空
}
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}recv 函数根据 channel 类型有不同行为:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
// 无缓冲:直接从发送者栈拷贝到接收者
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
// 有缓冲且满:
// 1. 从缓冲区头部取出数据给接收者
// 2. 将发送者的数据写入缓冲区尾部(头尾实际是同一个槽位,因为缓冲区满)
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // buf → 接收者
}
typedmemmove(c.elemtype, qp, sg.elem) // 发送者 → buf
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // 缓冲区满时 sendx == recvx
}
sg.elem = nil
gp := sg.g
unlockf()
sg.success = true
goready(gp, skip+1) // 唤醒发送者
}对于有缓冲 channel 的满缓冲区场景,这个操作实际上是一次旋转:接收者拿走队头,发送者填入队尾(因为满时 sendx == recvx,头尾是同一个槽位),然后推进索引。
路径二:从缓冲区读取
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp) // 清零槽位,帮助 GC
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}注意 typedmemclr:读取后必须清零该槽位,否则其中的指针会阻止 GC 回收被引用的对象。
路径三:阻塞等待
前两条路径都不满足时(无等待的发送者、缓冲区为空),如果是非阻塞模式(select + default),直接返回失败:
if !block {
unlock(&c.lock)
return false, false
}否则当前 goroutine 需要挂起:
gp := getg()
mysg := acquireSudog()
mysg.elem = ep // 指向接收数据的目标地址
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.recvq.enqueue(mysg) // 入队到接收等待队列
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, ...)
// 被唤醒后
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success // success=false 表示因 close 被唤醒
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success与发送的阻塞路径对称,但唤醒后的行为不同:发送者被 close 唤醒会 panic,而接收者被 close 唤醒不会 panic,只是返回 (true, false),即零值和 ok=false。
关闭 - closechan runtime/chan.go
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// 释放所有等待的接收者:零值填充,标记 success=false
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
sg.success = false
glist.push(sg.g)
}
// 释放所有等待的发送者:标记 success=false(唤醒后会 panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
sg.success = false
glist.push(sg.g)
}
unlock(&c.lock)
// 释放锁后再唤醒所有 goroutine,避免死锁
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}关闭操作的关键行为:
- 将所有等待的接收者唤醒,它们会收到零值和
received=false - 将所有等待的发送者唤醒,它们检查
success=false后会 panic - 先释放锁再调用
goready,因为goready会将 goroutine 放入运行队列并可能触发调度,持有 channel 锁时做这件事可能违反 runtime 的锁顺序约束,导致死锁
select 与 channel runtime/select.go
编译器对 select 语句按 case 数量做了分级优化:
- 单 case 无 default:直接编译为
chansend1/chanrecv1,等同于普通的ch <- v或<-ch,阻塞语义 - 单 case + default:编译为
selectnbsend/selectnbrecv,即以block=false调用chansend/chanrecv,非阻塞语义 - 多 case(± default):编译为
selectgo,走完整的三轮扫描算法
单 case + default
编译器直接将其转换为非阻塞的 channel 操作:
// select { case c <- v: ... default: ... }
// 编译为:
if selectnbsend(c, v) { ... } else { ... }
// selectnbsend 本质上就是非阻塞发送
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, sys.GetCallerPC())
}多 case 的 selectgo
多个 case 时调用 runtime.selectgo,采用三阶段流程:
第一轮:随机轮询(不阻塞)
按随机化的 pollorder 遍历所有 case,检查是否有立即可执行的操作:
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends {
// 接收 case
sg = c.sendq.dequeue()
if sg != nil {
goto recv // 有等待的发送者 → 直接接收
}
if c.qcount > 0 {
goto bufrecv // 缓冲区有数据 → 从缓冲区读
}
if c.closed != 0 {
goto rclose // 已关闭 → 返回零值
}
} else {
// 发送 case
if c.closed != 0 {
goto sclose // 已关闭 → panic
}
sg = c.recvq.dequeue()
if sg != nil {
goto send // 有等待的接收者 → 直接发送
}
if c.qcount < c.dataqsiz {
goto bufsend // 缓冲区有空间 → 写入缓冲区
}
}
}
if !block {
// 存在 default → 所有 case 都不就绪,走 default
selunlock(scases, lockorder)
casi = -1
goto retc
}如果找到可执行的 case,通过 goto 跳转到对应的处理逻辑并返回。如果没有且存在 default(!block),解锁后直接走 default。
随机化的 pollorder 保证了多个 case 同时就绪时的公平性,避免饥饿。
第二轮:全部入队并挂起
没有 case 就绪时,按 lockorder(channel 地址排序,防止死锁)为每个 case 创建一个 sudog 并入队到对应 channel 的 sendq 或 recvq:
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.c = c
// 按 lockorder 构建等待链表
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}
gp.param = nil
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReason, traceBlockSelect, 1)第三轮:唤醒后清理
被某个 channel 操作唤醒后,重新加锁所有 channel,从所有其他 channel 的等待队列中移除自己的 sudog:
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param) // 唤醒者设置的,标识哪个 case 赢了
gp.param = nil
casi = -1
// 先保存等待链表头,再清理所有 sudog 的字段
sglist = gp.waiting
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
// 遍历所有 case,找到胜出者,移除其余的
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// 这个 sudog 就是唤醒我们的 → 记录胜出 case
casi = int(casei)
cas = k
caseSuccess = sglist.success
} else {
// 非胜出者 → 从对应 channel 的队列中移除
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}select 的竞争处理
多个 case 可能同时就绪,dequeue 中通过 selectDone 的 CAS 操作保证只有一个 case 能赢得竞争:
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil {
return nil
}
y := sgp.next
if y == nil {
q.first = nil
q.last = nil
} else {
y.prev = nil
q.first = y
sgp.next = nil
}
// select 场景:可能有多个 channel 同时尝试唤醒同一个 goroutine,
// 通过 CAS 竞争 selectDone 标志,只有第一个成功的 case 能赢
if sgp.isSelect {
if !sgp.g.selectDone.CompareAndSwap(0, 1) {
continue // 另一个 case 已经赢了,跳过这个 sudog
}
}
return sgp
}
}锁排序
select 涉及多个 channel 的加锁,为防止死锁,按 channel 的内存地址排序后统一加锁(sellock),释放时按逆序(selunlock)。
channel 操作总结
| 操作 | nil channel | 已关闭 channel | 正常 channel |
|---|---|---|---|
ch <- v | 永久阻塞 | panic | 阻塞或成功发送 |
<-ch | 永久阻塞 | 读取缓冲区剩余数据(ok=true),耗尽后返回零值(ok=false) | 阻塞或成功接收 |
close | panic | panic | 成功关闭 |
len | 0 | 缓冲区剩余元素数 | 缓冲区当前元素数 |
cap | 0 | 缓冲区容量 | 缓冲区容量 |
使用注意事项
由发送方负责关闭
关闭 channel 的职责应由发送方承担,接收方不应关闭 channel。因为发送方关闭后自己不会再发送,不会触发 panic;而接收方无法知道发送方是否还会继续发送,贸然关闭可能导致发送方 panic:
// ✅ 发送方关闭
func producer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}
// ❌ 接收方关闭 - 发送方可能 panic
func consumer(ch chan int) {
val := <-ch
close(ch) // 如果发送方继续发送,会 panic
}多个发送方的场景下,不要由任何一个发送方关闭。可以用 sync.WaitGroup 等待所有发送方完成后,由协调者关闭:
func main() {
ch := make(chan int)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
ch <- id
}(i)
}
go func() {
wg.Wait()
close(ch) // 所有发送方完成后,由协调者关闭
}()
for val := range ch {
fmt.Println(val)
}
}goroutine 泄漏
如果一个 goroutine 阻塞在 channel 操作上,而 channel 的另一端永远不会有对应的操作,这个 goroutine 将永远不会被回收,即使 channel 本身已经没有外部引用。这是因为阻塞的 goroutine 仍然是存活的,而存活的 goroutine 是 GC root,不会被垃圾回收;goroutine 通过 sudog 持有 channel 的引用,因此 channel 也无法被回收:
// ❌ goroutine 泄漏:没有接收者,goroutine 永远阻塞
func leak() {
ch := make(chan int)
go func() {
ch <- 1 // 永远阻塞,goroutine 泄漏
}()
// ch 离开作用域,但 goroutine 仍在 sendq 中等待
}常见的泄漏场景:
- 无缓冲 channel 只有发送没有接收(或反之)
context取消后没有退出阻塞的 channel 操作select中遗漏了退出分支
// ✅ 通过 context 控制退出,避免泄漏
func worker(ctx context.Context, ch <-chan int) {
for {
select {
case val := <-ch:
process(val)
case <-ctx.Done():
return
}
}
}避免对 len 做并发判断
len(ch) 返回的是调用瞬间缓冲区的元素数,但在并发场景下这个值随时可能变化,基于它做判断是不可靠的:
// ❌ 检查和发送之间可能被其他 goroutine 抢先
if len(ch) < cap(ch) {
ch <- v // 仍可能阻塞
}
// ✅ 用 select + default 实现非阻塞发送
select {
case ch <- v:
default:
// 缓冲区满,走备选逻辑
}