文件注释

1
2
3
4
5
6
7
8
9
10
// Invariants:
// At least one of c.sendq and c.recvq is empty,
// except for the case of an unbuffered channel with a single goroutine
// blocked on it for both sending and receiving using a select statement,
// in which case the length of c.sendq and c.recvq is limited only by the
// size of the select statement.
//
// For buffered channels, also:
// c.qcount > 0 implies that c.recvq is empty.
// c.qcount < c.dataqsiz implies that c.sendq is empty.

翻译一下:
定式:
在无缓冲通道,除非send和rece都在单线程下阻塞, c.sendq和c.recvq至少有一个是空的,而且c.sendq和c.recvq的长度只受select语法限制

hchanSize

1
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

看结果是结构体hchan的实际大小,因为hchan的对齐值是8

func makechan(t chantype, size int) hchan

  1. makechan64(t *chantype, size int64) *hchan中,size不能越界

    1
    2
    3
    4
    5
    6
    7
    func makechan64(t *chantype, size int64) *hchan {
    if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
    }

    return makechan(t, int(size))
    }
  2. 元素类型大小不能超过64k

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    // 意思大概是编译需要,实际上没有哪个的类型的大小会超过64k
    // 对应在sendDirect/recvDirect的typeBitsBulkBarrier里面的64k限制
    if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
    }
    // 基本上就是说elem的对齐了量不能超过
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
    }
    // 1.size不能小于0
    // 2.go内存内配限制,不允许分配大小超过限制
    // 3.和2差不多意思
    if size < 0 || uintptr(size) > maxSliceCap(elem.size) || uintptr(size)*elem.size > maxAlloc-hchanSize {
    panic(plainError("makechan: size out of range"))
    }

    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    // Hchan存储元素与元素是否是指针值有关,具体体现在下面
    var c *hchan
    switch {
    case size == 0 || elem.size == 0:
    // 大概这就是无缓冲通道
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    // Race detector uses this location for synchronization.
    c.buf = unsafe.Pointer(c)
    case elem.kind&kindNoPointers != 0:
    // Elements do not contain pointers.
    // Allocate hchan and buf in one call.
    // 如果是无指针类型,buf和hchan是一起分配的
    c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
    // Elements contain pointers.
    // 这里hchan和buf是分开分配的
    c = new(hchan)
    c.buf = mallocgc(uintptr(size)*elem.size, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)

    if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
    }
    return c
    }

    type chantype struct {
    typ _type
    elem *_type
    dir uintptr
    }

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
// 停止当前线程,幷设置好错误信息
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}
// 大概是竟态检测
if raceenabled {
racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.

// 无缓冲通道如果没有接收者,无法完成
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

// 已关闭通道,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

// dequeue 会查找到一个sudog,如果有暂时从队列中删掉
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 如果存在(空闲的)sudog,直接将值send到接收方,而不管缓冲通道
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 否则,在缓冲区未满的时候,将值放入缓冲区
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// GO有write/read barrier
// 涉及GC三色标记
// 作用应该是重新指定不可gc内存
typedmemmove(c.elemtype, qp, ep)
// buf是循环引用的,标记可send指针
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 不是阻塞发是不用加锁的
if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
// 如果无缓冲空间可用了,阻塞通道
gp := getg() // 获取当前g
mysg := acquireSudog()
mysg.releasetime = 0
// 收集block信息的情况下,将releasetime设置为-1,估计是当作一个标记
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// enqueue 重新排队到队末(注意这个sudog不是程序自己写的receiver)
c.sendq.enqueue(mysg)
// 将当前g设置为阻塞等待(waiting)状态,直到被唤醒
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

// someone woke us up.
// 当被唤醒的时候,mysg不是当前g所等的waiting状态,这就是大问题了(当前g信息在阻塞期间被操作过)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
// 记录阻塞事件
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
// 主动释放这个从m缓冲池里取出来的sudog
releaseSudog(mysg)
return true
}


// 这是acquireSudog的注释
//go:nosplit
func acquireSudog() *sudog {
// Delicate dance: the semaphore implementation calls
// acquireSudog, acquireSudog calls new(sudog),
// new calls malloc, malloc can call the garbage collector,
// and the garbage collector calls the semaphore implementation
// in stopTheWorld.
// Break the cycle by doing acquirem/releasem around new(sudog).
// The acquirem/releasem increments m.locks during new(sudog),
// which keeps the garbage collector from being invoked.
// 大意是说semaphore(信号?调用者?)调用acquireSudog直接new(sudog)的话,
// new方法调用了malloc,malloc会调用gc,gc会再次调用semaphore,从而引发循环.打破循环的
// 方法是用acquirem/releasem,因为这个会屏蔽gc
// 这里调用acquirem后获取m,幷从其缓存的sudog列表中取出最后一个
  1. closechan
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    func closechan(c *hchan) {
    if c == nil {
    panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
    }

    if raceenabled {
    callerpc := getcallerpc()
    racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
    racerelease(unsafe.Pointer(c))
    }

    c.closed = 1

    // 这个先读后写,按顺序去除一个g,然后把所有的g通过schedlink连接到一起
    var glist *g

    // release all readers
    for {
    sg := c.recvq.dequeue()
    if sg == nil {
    break
    }
    if sg.elem != nil {
    typedmemclr(c.elemtype, sg.elem)
    sg.elem = nil
    }
    if sg.releasetime != 0 { // 上面 releasetime = -1 的用处
    sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
    raceacquireg(gp, unsafe.Pointer(c))
    }
    gp.schedlink.set(glist)
    glist = gp
    }

    // release all writers (they will panic)
    for {
    sg := c.sendq.dequeue()
    if sg == nil {
    break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
    sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
    raceacquireg(gp, unsafe.Pointer(c))
    }
    gp.schedlink.set(glist)
    glist = gp
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for glist != nil {
    gp := glist
    glist = glist.schedlink.ptr()
    gp.schedlink = 0
    goready(gp, 3) // goreadey解除程阻塞状态
    }
    }

chanrecv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
// ep 必须指向堆空间或者调用栈,主要是防gc
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
// 1. 非阻塞无缓冲没有发送方
// 2. 非阻塞有缓冲没有消息并且没有关闭(没有缓消息的另一种情况是数据直接发送到接收方而不是主动接收)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

// 没有关闭,没有消息
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
unlock(&c.lock)
if ep != nil {
// clear标记白色,允许gc
typedmemclr(c.elemtype, ep)
}
return true, false
}

if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 发现发送者,基本上是阻塞(无缓冲/满缓冲)的发送者
// recv 方法:如果无缓冲,直接接收
// 如果有缓冲,肯定是满缓冲
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 赋值
typedmemmove(c.elemtype, ep, qp)
}
// 移除
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

// no sender available: block on this channel.
// 没有发送方,也没有数量
// 和chansend相似的操作
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

chanrecv和chansend的block为false的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func TestChan6(t *testing.T) {
var ch (chan int)
select {
case c := <-ch:
t.Log(c)
default:
t.Log("default")
}
}

func TestChan7(t *testing.T) {
var ch (chan int)
select {
case ch <- 1:
t.Log(1)
default:
t.Log("default")
}
}

default或其他条件的存在使得ch不是block读

补充:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// compiler implements
//
// select {
// case c <- v:
// ... foo
// default:
// ... bar
// }
//
// as
//
// if selectnbsend(c, v) {
// ... foo
// } else {
// ... bar
// }
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}

虽然文档说这样子是非阻塞调用,但是在debug的时候并没有见到该函数被调用!!