go design (五) channel
go channel 的设计与实现
golang 中推崇的金句就是 不要通过共享内存来通信,要通过通信的方式来共享内存,其通信的载体就是 channel , golang 特有的关键字(数据结构),在 golang 中要实现并发编程成本很低, 一个 go 关键词 就可以启动一个 goroutine ,那么多个 goroutine 之间的数据传输该怎么处理呢?就有了 channel 通道,这种数据类型 来帮助在 多个 goroutine 进行信息传输。
从实例开始 channel 介绍之旅
创建与使用
缓存与不带缓存的 channel
1 | |
make出来的chan为 实际结构地址的引用, 而 声明的channel 为 nil 则永久性的读写堵塞, 且不能被 close 不然会 panic- 不带缓存的
channel- 进行读操作的时候,如果无数据 会进入堵塞状态,直到协程内有数据被写入
- 进行写操作的时候,如果无协程在读取数据,也会进入堵塞状态,直到数据被读取
- 带缓存 的
channel- 进行写操作的时候,如果缓存还有空间则不会被堵塞,否则也会堵塞
- 进行读操作的时候,如果无数据会进入堵塞状态,直到协程内有数据被写入
1 | |
1 | |
channel 的两个属性
1 | |
len()为当前channel已经使用的缓存量cap()为 当前channel最大的缓存量
select 为多 channel 处理而生
在上述的使用中 如果在 一个协程中 使用多个 channel,如果一个 channel堵塞 ,那么代码就没法 执行到 下一个channel` ,从而导致运行时 死锁
比如:
1 | |
这种情况下如果不知道 c,d 谁会先发送数据的情况 就会 直接报错,相互死锁,程序中断。
通过 select 来对多个channel 进行收发控制 应运而生
select 的特性
select无case属性时,会直接堵塞代码执行,切记勿在主协程中使用,不然直接死锁select只有一个case channel时,会一直堵塞,直到有数据进入
1
2
3
4
5
6
7
8
9c := make(chan int) // 不带缓存的 channel
go func() {
select {
case ok := <-c :
fmt.Println(ok)
}
}()
c <- 1
time.Sleep(1 * time.Second)select能在channel上进行非阻塞的收发操作;如果 存在case default无数据直接进入default case
1
2
3
4
5
6
7
8
9
10
11
12c := make(chan int) // 不带缓存的 channel
go func() {
select {
case ok := <-c:
fmt.Println(ok)
default:
fmt.Println("未接受到数据")
}
}()
c <- 1
time.Sleep(1 * time.Second)- select 在遇到多个 channel 同时响应时,会随机执行一种情况; 这里用 同一个
channel不同channel的同一时刻,效果也是类似,只会有一个分支被读取与执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15c := make(chan int) // 不带缓存的 channel
go func() {
select {
case ok := <-c:
fmt.Println("case 1", ok)
case ok := <-c:
fmt.Println("case 2", ok)
case ok := <-c:
fmt.Println("case 3", ok)
}
}()
c <- 1
time.Sleep(1 * time.Second)
// 多次响应 的结果不一致
for range 与 channel 的配合使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
c := make(chan int, 2) // 带缓存的 channel
go func() {
for val := range c {
fmt.Println(val)
}
}()
for i := 0; i < 10; i++ {
c <- i
if i == 5 {
close(c)
break
}
}
time.Sleep(time.Second * 1)
}
- 对于
for range进行迭代 的channel除非channel被关闭,不然会一直堵塞下去 - 对于 关闭后
channel内还存在未处理完的数据情况,也会被读取出来
close() 函数与 channel 的关闭
close()只是相对于 发送方的概念,close之后的channel不能进行发送操作,但是从channel中读取数据的行为是允许的,存在数据时,拿到的是数据,不存在时 ,拿到的是 零值,且不会堵塞但是
close函数不能针对与一个channel执行多次, 不然 就会 出现panic: close of closed channel的运行时错误close之后的channel,再进行发送会出现panic: send on closed channel1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17func main() {
c := make(chan int) // 带缓存的 channel
go func() {
for val := range c {
fmt.Println(val)
}
value, ok := <-c
fmt.Println(value, ok) // 0 false
}()
c <- 0
c <- 1
close(c)
time.Sleep(time.Second * 2)
}
用法-超时控制
使用
time.After()来实现 延时的channel信息发送1
2
3
4
5
6
7
8
9
10
11
12
13func main() {
c := make(chan int) // 不带缓存的 channel
go func() {
select {
case ok := <-c:
fmt.Println(ok)
case <-time.After(time.Second * 1):
fmt.Println("一秒后超时")
}
}()
time.Sleep(2 * time.Second)
}
用法-控制并发执行协程数量(协程池)
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func main() {
limit := make(chan int, 3) // 控制同时执行的 goroutine 个数
work := [100]int{}
for k := range work {
go func(k int) {
limit <- 1
w(k)
<-limit
}(k)
}
time.Sleep(time.Second * 20)
}
func w(i int) {
time.Sleep(time.Second * 1)
fmt.Println("deal task", i)
}
用法-生产消费模型
用于解耦合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23func main() {
s := make(chan int)
// 消费
go func() {
for v := range s {
go work(v)
}
}()
// 生产
for i := 0; i < 10; i++ {
s <- i
}
time.Sleep(time.Second * 20)
}
func work(taskId int) {
time.Sleep(time.Second * 1)
fmt.Println("deal task ", taskId)
}
golang 中的 设计与实现
到目前为止,我们把 channel 的基础功能 过了一遍, 感觉上确实是挺复杂与强大的,接下来会去看看 channel 的数据结构,跟写入,读取的流程,通过这些流程,更好的理解为什么,channel 会有这样的特性.
channel 实现的数据结构
- 源码中的定义
1 | |
可以看出来底层的循环队列主要由
qcountdataqsiz、buf、sendx、recvx构建qcountchannel的总长度dataqsiz循环队列的长度buf缓冲区数据指针sendx发送操作处理到的位置recvx接收操作处理到的位置
recvqsendq代表 等待中的goroutine链表runtime.makechan()为创建channel的函数,缓存与非缓存 一个是底层分配空间大小的区别,二是对应的elemsize qcount dataqsiz属性值不同
channel 的发送
- 发送的三种情况
- 如果当前
channel的recvq上存在已经被阻塞的goroutine,那么会直接将数据发送给当前goroutine并将其设置成 可运行状态; - 如果
channel存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区sendx所在的位置上; - 如果不满足上面的两种情况,会创建一个
runtime.sudog结构并将其加入channel的sendqlist中,当前goroutine也会陷入阻塞等待其他的协程从channel接收数据;
- 如果当前
- 发送数据时两个会触发
goroutine调度的时机:recvq上存在已经被阻塞的goroutine,立刻设置处理器的runnext属性,但是并不会立刻触发调度;- 找到接收方并且缓冲区已经满了,这时会将自己加入
channel的sendq队列并调用runtime.goparkunlock触发goroutine的调度让出处理器的使用权;
channel 的接受
接受主要就是 调用
runtime.chanrecv接受的几种情况
从一个
空(nil) channel接收数据时会直接调用runtime.gopark让出处理器的使用权channel被关闭, 且缓冲区中不存在任何数据,那么会清除数据并立刻返回。channel 正常时
sendq队列中存在挂起的goroutine,会将recvx索引所在的数据拷贝到接收变量所在的内存空间上,并将sendq队列中goroutine的数据拷贝到缓冲区;缓冲区中存在数据,直接读取
recvx索引对应的数据;默认情况下会挂起当前的
goroutine,将runtime.sudog结构加入recvq队列并陷入休眠等待调度器的唤醒;
调度时机
- 当 channel 为空(nil) 时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;
close
close函数先上一把大锁,接着把所有挂在这个 channel 上的recvq和sendq全都连成一个sudog链表,再解锁。最后,再将所有的sudog全都唤醒- 所以 当 存在有
sendq被唤醒而chan本身被关闭时候,会直接被panic