Go语言中的通道

温馨提示:点击页面下方以展开或折叠目录

摘要:看完这篇,Go语言的通道及Goroutine你就都会了…

文章说明
文章作者:鴻塵
文章链接:https://hwame.top/20220327/channel-and-goroutine-in-go.html
原文链接:https://mp.weixin.qq.com/s/-N1qUEE090wMpI6TWmFTWg
参考资料:

1.前言

在go社区有这样一句话:不要通过共享内存来通信,而是通过通信来共享内存。

go官方是建议使用管道通信的方式来进行并发。
通道 是用于协程间交流的通信载体。严格地来说,通道就是数据传输的管道,数据通过这根管道被 “传入” 或被 “读出”。 因此协程可以发送数据到通道中,而另一个协程可以从该通道中读取数据。

在这里就要引入一个新名词: 协程
将线程再细分为多个协程,比如说是一条流水线上的多人协作。那么就可以减少各个线程内部的等待时间。

2.通道简介

Go 提供一个 chan 关键词去创建一个通道。一个通道只能传入一种类型的数据,其他的数据类型不允许被传输。
picture

将线程再分成更细的协程,使得中间等待时候更少,提高效率!

picture

2.1.声明

1
2
3
4
5
6
7
8
9
10
package main
import "fmt"

func main() {
var channel chan int
//声明了一个可以传入 int 类型数据的通道 channel 。

fmt.Println(channel)
//程序会打印nil, 因为通道的 0 值是 nil。
}

一个 nil 通道是没有用的。你不能向它传递数据或者读取数据。
因此,我们必须使用 make 函数器创建一个可以使用的通道。

1
2
3
4
5
6
7
8
9
10
package main
import "fmt"

func main(){
channel := make(chan int)
//声明了一个可以传入 int 类型数据的通道 channel 。

fmt.Println(channel)
//程序会打印channel的地址。 0xc0000180c0
}

它是一个指针内存地址。通道变量默认是一个指针。多数情况下,当你想要和一个协程沟通的时候,你可以给函数或者方法传递一个通道作为参数。当从协程接收到通道参数后,你不需要再对其进行解引用就可以从通道接收或者发送数据。

2.2.读写

Go 语言提供一个非常简洁的左箭头语法 <- 去从通道读写数据。
有变量接受管道值

1
channel <- data

上面的代码意味着我们想要把 data 数据推入到通道 channel 中,注意看箭头的指向。

它表明是从 data数据 to到 通道 channel。因此我们可以当作我们正在把 data 推入到通道 channel。
无变量接受管道值

1
<- data

这个语句不会把数据传输给任何变量,但是仍然是一个有效的语句。

上面的通道操作默认是阻塞的。
在以前的课程中,我们知道可以使用 time.Sleep 去阻塞一个通道。通道操作本质上是阻塞的。当一些数据被写入通道,对应的协程将阻塞直到有其他的协程可以从此通道接收数据。

通道操作会通知调度器去调度其他的协程,这就是为什么程序不会一直阻塞在一个协程。通道的这些特性在不同的协程沟通的时候非常有用,它避免了我们使用锁或者一些 hack 手段去达到阻塞协程的目的。

2.3.通道详解

2.3.1.例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main
import "fmt"

func Rush(c chan string) {
fmt.Println("Hello "+ <-c + "!")
// 声明一个函数 greet, 这个函数的参数 c 是一个 string 类型的通道。
// 在这个函数中,我们从通道 c 中接收数据并打印到控制台上。
}

func main(){
fmt.Println("Main Start")
// main 函数的第一个语句是打印 main start 到控制台。

channel := make(chan string)
// 在 main 函数中使用 make 函数创建一个 string 类型的通道赋值给 ‘ channel ' 变量

go Rush(channel)
// 把 channel 通道传递给 greet 函数并用 go 关键词以协程方式运行它。

// 此时,程序有两个协程并且正在调度运行的是 main goroutine 主函数

channel <- "DEMO"

// 给通道 channel 传入一个数据 DEMO.

// 此时主线程将阻塞直到有协程接收这个数据. Go 的调度器开始调度 greet 协程接收通道 channel 的数据

fmt.Println("Main Stop")
// 然后主线程激活并且执行后面的语句,打印 main stopped
}

/*
Main Start
Hello DEMO!
Main Stop
*/

2.3.2.死锁

picture

当通道读写数据时,所在协程会阻塞并且调度控制权会转移到其他未阻塞的协程。
如果当前协程正在从一个没有任何值的通道中读取数据,那么当前协程会阻塞并且等待其他协程往此通道写入值。

因此,读操作将被阻塞。类似的,如果你发送数据到一个通道,它将阻塞当前协程直到有其他协程从通道中读取数据。此时写操作将阻塞 。

下面是一个主线程在进行通道操作的时候造成死锁的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package main
import "fmt"

func main() {
fmt.Println("main start")
// main 函数的第一个语句是打印 main start 到控制台。

channel := make(chan string)
// 在 main 函数中使用 make 函数创建一个 string 类型的通道赋值给 ‘ channel ' 变量

channel <- "GoLang"
// 给通道 channel 传入一个数据 DEMO.
// 此时主线程将阻塞直到有协程接收这个数据. Go 的调度器开始调度协程接收通道 channel 的数据
// 但是由于没有协程接受,没有协程是可被调度的。所有协程都进入休眠状态,即是主程序阻塞了。
fmt.Println("main stop")
}

/*
报错
main start
fatal error: all goroutines are asleep - deadlock! //所有协程都进入休眠状态,死锁
goroutine 1 [chan send]:
main.main()
*/

2.3.3.关闭通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main
import "fmt"

func RushChan(c chan string) {
<- c
fmt.Println("1")
<- c
fmt.Println("2")
}
func main() {
fmt.Println("main start")
c := make(chan string, 1)
go RushChan(c)
c <- "Demo1"

close(c)
/*
不能向一个关了的channel发信息
main start
panic: send on closed channel
*/

c <- "Demo2"

//close(c)

/*
close 放这里的话可以
main start
1
2
Main Stop
*/

fmt.Println("Main Stop")
}

第一个操作 c <- "Demo2" 将阻塞协程直到有其他协程从此通道中读取数据,因此 greet 会被调度器调度执行。
第一个操作 <-c 是非阻塞的 因为现在通道 c 有数据可读。
第二个操作 <-c 将被阻塞因为通道 c 已经没数据可读.
此时 main 协程将被激活并且程序执行 close(c) 关闭通道操作。

2.3.4.缓冲区

1
c := make(chan Type, n)

当缓冲区参数不是 0 的时候。协程将不会阻塞除非缓冲区被填满。

当缓冲区满了之后,想要再往缓冲区发送数据只有等到有其他协程从缓冲区接收数据, 此时的发送协程是阻塞的。

有一点需要注意, 读缓冲区的操作是渴望式读取,意味着一旦读操作开始它将读取缓冲区所有数据,直到缓冲区为空。

原理上来说读操作的协程将不会阻塞直到缓冲区为空。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import "fmt"

func RushChan(c chan string) {
for {
val ,_ := <-c
fmt.Println(val)
}
}
func main() {
fmt.Println("Main Start")
c := make(chan string, 1)
go RushChan(c)
c <- "Demo1"
//结果1

//c <- "Demo2" //结果2

fmt.Println("Main Stop")
}

/*
结果1:
Main Start
Main Stop
*/

/*
结果2:
Main Start
Join
Mike
Main Stop
*/

由于这是一个缓冲的通道,当我只有 c <- Demo1 的时候,这里面只是满了,但是是不会阻塞的。所以子协程接受到了这个数据 Demo1 ,但是由于是非阻塞,所以主线程没有被阻塞,并没有等子协程完成就结束了,结果1就是这样出现了。

当加多一个 c <- Demo2 的时候,这时就要等缓冲区空了,也就是等有协程把 Demo1 读取,所以就会导致主线程阻塞,此时的结果就是结果2了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package main
import "fmt"

func RushChan(c chan string) {
for {
val ,_ := <-c
fmt.Println(val)
}
}
func main() {
c := make(chan int,3)
c <- 1
c <- 2
c <- 3
close(c)
for elem := range c {
fmt.Println(elem)
}
}

这里虽然关闭了通道,但是其实数据不仅在通道里面,数据还在缓冲区中的,我们依然可以读取到这个数据。

2.3.5.通道的长度和容量

和切片类似,一个缓冲通道也有长度和容量。
通道的长度是其内部缓冲队列未读的数据量,而通道的容量是缓冲区可最大盛放的数据量。

我们可以使用 len 函数去计算通道的长度,使用 cap 函数去获得通道的容量。和切片用法神似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main
import "fmt"

func RushChan(c chan string) {
for {
val ,_ := <-c
fmt.Println(val)
}
}
func main() {
c := make(chan int,3)
c <- 1
c <- 2

fmt.Println("长度: ",len(c))
fmt.Println(<-c)
fmt.Println("长度: ",len(c))
fmt.Println(<-c)
fmt.Println("长度: ",len(c))
fmt.Println("容量: ",cap(c))
}
/*
结果:
长度: 2
1
长度: 1
2
长度: 0
容量: 3
*/

这个 c 通道容量为 3,但只盛放了 2 个数据。Go 就不用去阻塞主线程去调度其他协程。你也可以在主线程中去读取这些数据,因为虽然通道没有放满,也不会阻止你去从通道读取数据。

2.3.6.单向通道

目前为止,我们已经学习到可以双向传递数据的通道,或者说,我们可以对通道做读操作和写操作。但是事实上我们也可以创建单向通道。比如只读通道只允许读操作,只写通道只允许写操作。
单向通道也可以使用 make 函数创建,不过需要额外加一个箭头语法。

1
2
roc := make(<-chan int)
soc := make(chan<- int)

在上面的程序中, roc 是一个只读通道,<- 在 chan 关键词前面。 soc is 只写通道,<- 在 chan 关键词后面。 他们也算不同的数据类型。

但是单向通道有什么作用呢 ?
使用单向通道可以 提高程序的类型安全性, 使得程序不容易出错。

但是假如你在一个协程中只需要读操作某通道,但是在主线程中却需要读写操作这个通道该怎么办呢?

幸运的是 Go 提供了一个简单的语法去把双向通道转化为单向通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main
import "fmt"

func greet(roc <-chan string) {
fmt.Println("Hello " + <-roc ,"!")
}
func main() {
fmt.Println("Main Start")
c := make(chan string)
go greet(c)
c <- "Demo"

fmt.Println("Main Stop")
}

/*
结果
Main Start
Hello Demo !
Main Stop
*/

我们修改 greet 协程函数,把参数 c 类型从双向通道改成单向接收通道。
现在我们只能从通道中读取数据,通道上的任何写入操作将会发生错误:

“invalid operation: roc <- “Temp” (send to receive-only type <-chan string)”.

picture

2.3.7.Select

select 和 switch 很像,它不需要输入参数,并且仅仅被使用在通道操作上。
Select 语句被用来执行多个通道操作的一个和其附带的 case 块代码。

原理

让我们来看下面的例子,讨论下其执行原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
time.Sleep(3 * time.Second)
c <- "Hello from service 1"
}
func service2(c chan string) {
time.Sleep(5 * time.Second)
c <- "Hello from service 2"
}
func main() {
fmt.Println("main start", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("Response form service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response form service 2", res, time.Since(start))
}

fmt.Println("main stop ",time.Since(start))
}
/*
结果:
main start 0s
Response form service 1 Hello from service 1 3.0018445s
main stop 3.0019815s
*/

从上面的程序来看,我们知道 select 语句和 switch 很像,不同点是用通道读写操作代替了布尔操作。通道将被阻塞,除非它有默认的 default 块 (之后将介绍)。一旦某个 case 条件执行,它将不阻塞。

所以一个 case 条件什么时候执行呢 ?

如果所有的 case 语句(通道操作)被阻塞,那么 select 语句将阻塞直到这些 case 条件的一个不阻塞(通道操作),case 块执行。
如果有多个 case 块(通道操作)都没有阻塞,那么运行时将随机选择一个不阻塞的 case 块立即执行。

为了演示上面的程序,我们开启两个协程并传入对应的通道变量。然后我们写一个带有两个 case 操作的 select 语句。 一个 case 操作从 chan1 读数据,另外一个从 chan2 读数据。这两个通道都是无缓冲的 , 读操作将被阻塞 。所以 select 语句将阻塞。因此 select 将等待,直到有 case 语句不阻塞。

  • 当程序执行到select语句后,主线程将阻塞并开始调度 service1 和service2协程。 service1 休眠 3 秒 后未阻塞的把数据写入通道 chan1 与其类似,service2等待 5 秒 后未阻塞的把数据写入通道chan2
  • 因为 service1 比 service2 早一步执行完毕,case 1 将首先调度执行,其他的 cases 块 (这里指 case 2) 将被忽略。 一旦 case 块执行完毕, main 线程将开始继续执行。

所以并没有输出case2的结果

上述程序真实模拟了一个数百万请求的服务器负载均衡的例子,它从多个有效服务中返回其中一个响应。
使用协程,通道和 select 语句,我们可以向多个服务器请求数据并获取其中最快响应的那个。

为了模拟上面哪个 case 块率先返回数据,我们可以直接去掉 Sleep 函数调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
c <- "Hello from service 1"
}

func service2(c chan string) {
c <- "Hello from service 2"
}

func main() {
fmt.Println("main start", time.Since(start))

chan1 := make(chan string)
chan2 := make(chan string)

go service1(chan1)
go service2(chan2)

select {
case res := <-chan1:
fmt.Println("Response form service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response form service 2", res, time.Since(start))
}

fmt.Println("main stop ",time.Since(start))
}

结果一:
main start 0s
Response form service 1 Hello from service 1 539.3µs
main stop 539.3µs
结果二:
main start 0s
Response form service 2 Hello from service 2 0s
main stop 0s

结果一共有2个不同的结果

为了证明当所有 case 块都是非阻塞的时候,golang 会随机选择一个代码块执行打印 response,我们使用缓冲通道来改造程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
c <- "Hello from service 1"
}
func service2(c chan string) {
c <- "Hello from service 2"
}
func main() {
fmt.Println("main start", time.Since(start))
chan1 := make(chan string,2)
chan2 := make(chan string,2)
chan1 <- "Value 1"
chan1 <- "Value 2"
chan2 <- "Value 1"
chan2 <- "Value 2"
select {
case res := <-chan1:
fmt.Println("Response form service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response form service 2", res, time.Since(start))
}

fmt.Println("main stop ",time.Since(start))
}

上述的程序的结果是有不同的

结果一:
main start 0s
Response form service 1 Value 1 496.2µs
main stop 496.2µs
结果二:
main start 0s
Response form service 2 Value 1 0s
main stop 0s

在上面的程序中,两个通道在其缓冲区中都有两个值。因为我们向容量为 2 的缓冲区通道分别发送了两个值,所以这些通道发送操作不会阻塞并且会执行下面的 select 块。 select 块中的所有 case 操作都不会阻塞,因为每个通道中都有两个值,而我们的 case 操作只需要取出其中一个值。因此,go 运行时会随机选择一个 case 操作并执行其中的代码。

2.3.8.default case 块

像 switch 一样, select 语句也有 default case 块。default case 块 是非阻塞的,不仅如此, default case 块可以使 select 语句永不阻塞,这意味着, 任何通道的 发送 和 接收 操作 (不管是缓冲或者非缓冲) 都不会阻塞当前线程。

如果有 case块的通道操作是非阻塞,那么 select会执行其case 块。如果没有那么 select将默认执行 default块.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
c <- "Hello from service 1"
}
func service2(c chan string) {
c <- "Hello from service 2"
}
func main() {
fmt.Println("main start", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("Response form service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response form service 2", res, time.Since(start))
default:
fmt.Println("No Response received",time.Since(start))
}
fmt.Println("main stop ",time.Since(start))
}
/*
结果:
main start 0s
No Response received 0s
main stop 0s
*/
  • 在上面的程序中,因为通道是非缓冲的,case 块的通道操作都是阻塞的,所有 default 块将被执行。
  • 如果上面的 select 语句没有 default 块,select 将阻塞,没有 response 会被打印出来,知道通道变成非阻塞。
  • 如果带有 default, select 将是非阻塞的,调度器将不会从主线程转而调度其他协程。
  • 但是我们可以使用 time.Sleep 改变这一点。 通过这种方式,主线程将把调度权转移到其他协程,在其他协程执行完毕后,调度权从新回到主线程手里。
  • 当主线程重新执行的时候,通道里面已经有值了,case 操作将不会阻塞。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
fmt.Println("service1 start")
c <- "Hello from service 1"
}
func service2(c chan string) {
fmt.Println("service2 start")
c <- "Hello from service 2"
}
func main() {
fmt.Println("main start", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
time.Sleep(3*time.Second)
select {
case res := <-chan1:
fmt.Println("Response form service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response form service 2", res, time.Since(start))
default:
fmt.Println("No Response received",time.Since(start))
}
fmt.Println("main stop ",time.Since(start))
}
/*
结果不唯一。
main start 0s
service2 start
service1 start
Response form service 1 Hello from service 1 3.0006729s
main stop 3.0006729s
*/

2.3.9.空 select

for{} 这样的空循环很像,空 select{} 语法也是有效的。但是有一点必须要说明。
我们知道 select 将被阻塞除非有 case 块没有阻塞。因为 select{} 没有 case 非阻塞语句,主线程将阻塞并可能会导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main
import "fmt"
func service() {
fmt.Println("Hello from service")
}
func main() {
fmt.Println("main started")
go service()
select {}
fmt.Println("main stop")
}
/*
结果
main started
Hello from service
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [select (no cases)]:
*/

死锁

在上面的程序中我们知道 select 将阻塞 main 线程,调度器将会调度 service 这个协程。在 service 执行完毕后,调度器会再去调度其他可用的协程,但是此时已经没有可用的协程,主线程也正在阻塞,所以最后的结果就是发生死锁.

2.3.10.Deadlock

default 块在通道操作阻塞的时候是非常有用的,他可以避免死锁。 同时由于 default块的非阻塞特性,Go 可以避免在其他协程阻塞的时候去调度其他协程,从而避免死锁。
通道的发送操作也类似,, default 可以在其他协程不能被调度的时候被执行,从而避免死锁。

2.3.11.nil通道

2.4.多协程协同工作

写两个协程,一个用来计算数字的平方,另一个用来计算数字的立方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import "fmt"
func square(c chan int) {
fmt.Println("[square] reading")
num := <-c
c <- num * num
}
func cube(c chan int) {
fmt.Println("[cube] reading")
num := <-c
c <- num * num * num
}
func main() {
fmt.Println("[main] main started")
squareChan := make(chan int)
cubeChan := make(chan int)
go square(squareChan)
go cube(cubeChan)
testNum := 3
fmt.Println("[main] send testNum to squareChan")
squareChan <- testNum
fmt.Println("[main] resuming")
fmt.Println("[main] send testNum to cubeChane")
cubeChan <- testNum
fmt.Println("[main] resuming")
fmt.Println("[main] reading from channels")
squareVal,cubeVal := <-squareChan, <-cubeChan
sum := squareVal + cubeVal
fmt.Println("[main] sum of square and cube of",testNum," is",sum)
fmt.Println("[main] main stop")
}
/*
结果:
[main] main started
[main] send testNum to squareChan
[cube] reading
[square] reading
[main] resuming
[main] send testNum to cubeChane
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3 is 36
[main] main stop
*/

流程:

创建两个函数 square 和 cube 作为协程运行。

两个函数都有一个 int 类型通道参数c,从 c 中读取数据到变量num,最后把计算的数据再写入到通道 c 中。

在主线程中使用 make函数创建两个 int类型通道 squareChan and cubeChan然后分别运行square和cube 协程。因为调度权还在主线程,所以执行testNumb 赋值为 3。

然后我们把数据放入通道 squareChan 。主线程将阻塞直到通道的数据被读取。 一旦通道的数据被读取,主线程将继续执行。

在主线程中我们试图从这两个通道中读取数据,此时线程可能阻塞直到有数据写入到通道。这里我们使用:=语法来接收多个通道的值。

一旦这些协程把数据写入到通道,主线程将阻塞。当数据被写入通道中,主线程将继续执行,最后我们计算出数字的总和并打印到控制台。

2.5.WaitGroup

有一种业务场景是你需要知道所有的协程是否已执行完成他们的任务。这个和只需要随机选择一个条件为true 的 select 不同,他需要你满足所有的条件都是 true 才可以激活主线程继续执行。 这里的条件指的是非阻塞的通道操作。

2.5.1.简介

WaitGroup 是一个带着计数器的结构体,这个计数器可以追踪到有多少协程创建,有多少协程完成了其工作。当计数器为 0 的时候说明所有协程都完成了其工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"fmt"
"sync"
"time"
)
func service(wg *sync.WaitGroup, instance int) {
time.Sleep(2 * time.Second)
fmt.Println("Service called on instance",instance)
wg.Done() //协程数-1
}
func main() {
fmt.Println("main started")
var wg sync.WaitGroup
for i:=1;i<= 3; i++{
wg.Add(1)
go service(&wg,i)
}
wg.Wait()//阻塞
fmt.Println("main stop")
}
/*
结果:(结果是不唯一的,一共有3!次可能的结果)
main started
Service called on instance 2
Service called on instance 1
Service called on instance 3
main stop
*/

在上面的程序中,我们创建了一个sync.WaitGroup 类型的空结构体 (带着 0 值字段) wg 。 WaitGroup 结构体有一些像 noCopy, state1 和 sema 这样的内部字段。 这个结构体也有三个公开方法: Add, Wait 和 Done.

  • Add 方法的参数是一个变量名叫 delta 的int 类型参数,主要用来内部计数。 内部计数器默认值为 0. 它用于记录多少个协程在运行。
  • 当 WaitGroup创建后,计数器值为 0,我们可以通过给 Add方法传 int类型值来增加它的数量。 记住, 当协程建立后,计数器的值不会自动递增 ,因此需要我们手动递增它。
  • Wait 方法用来阻塞当前协程。一旦计数器为 0, 协程将恢复运行。 因此,我们需要一个方法去降低计数器的值。
    Done 方法可以降低计数器的值。他不接受任何参数,因此,它每执行一次计数器就减 1。

上面的例子中,我们在创建 wg 变量后,运行了三次 for 循环,每次运行的时候我们创建一个协程并给计数器加 1。

这意味着现在我们有三个协程在等待运行并且 WaitGroup 的计数器值为 3。注意我们传给协程函数的是一个指针,这是因为一旦在协程内部工作完成后,我们需要通过调用Done方法去降低计数器的值。

如果 wg 通过值复制方式传过去, 因为传递的是一个拷贝,主线程中的 wg将不会得到修改。

在 for 循环执行完成后,我们通过调用 wg.Wait()去阻塞当前主线程,并把调度权让给其他协程,直到计数器值为 0 之后,主线程才会被再次调度。

我们在另外三个协程中通过Done方法把计数器值降为 0,此时主线程将再次被调度并开始执行之后的代码。

2.5.2.工作池

顾名思义,一个工作池并发执行某项工作的协程集合。 在上面,我们已经用到的多个协程执行一个任务,但是他们并没有执行特定的工作,只是 sleep 了一下。 如果你向协程中传一个通道,他们可以去完成一些工作,变成一个工作池。

所以工作池其实就是维护了多个工作协程,这些协程的功能是可以收到任务,执行任务并返回结果。他们完成任务后我们就可以收到结果。这些协程使用相同的通道来达到自己的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package main
import (
"fmt"
"time"
)
func sqrWorker(tasks <-chan int, results chan <-int, instance int) {
for num := range tasks {
time.Sleep(time.Millisecond) //阻塞
fmt.Printf("[worker %v ] Sending result by worker %v \n",instance,instance)
results <- num*num
}
}
func main() {
fmt.Println("main started")
tasks := make(chan int,10)
results := make(chan int,10)
for i:=0;i<3;i++{
go sqrWorker(tasks,results,i)
}
for i := 0; i < 5; i++ {
tasks <- i*2
}
fmt.Println("[main] write 5 tasks")
close(tasks)
for i := 0; i < 5; i++ {
result := <-results
fmt.Println("[main] Result" , i , ":", result)
}
fmt.Println("main stop")
}
/*
//结果之一
[main] write 5 tasks
[worker 0 ] Sending result by worker 0
[worker 1 ] Sending result by worker 1
[worker 2 ] Sending result by worker 2
[main] Result 0 : 4
[main] Result 1 : 16
[main] Result 2 : 0
[worker 1 ] Sending result by worker 1
[main] Result 3 : 64
[worker 0 ] Sending result by worker 0
[main] Result 4 : 36
main stop
*/

sqrWorker 是一个带有 tasks 通道,results 通道 和 id 三个参数的协程函数。这个协程函数的任务是把从 tasks 通道接收到的数字的平方发送到 results通道。

在主函数中,我们创建了两个带缓冲区,容量为 10 的通道tasks and result。因此在缓冲区被充满之前,任何操作都是非阻塞的。所以有时候设置一个大点的缓冲区是个好办法。

然后我们循环创建多个 sqrWorker 协程,并传入 tasks 通道, results 通道 和 id 三个参数,用来传递和获取协程执行前后的数据。

接着我们向 tasks 非阻塞通道放入 5 个任务数据。

因为我们已经向任务通道放入的数据,所以我们可以关闭它,虽然这个操作不是必须的,但是如果以后运行中出现错误的话可以防止通道 range 带来的死锁问题。

然后我们开启循环 5 次从 results 通道接收数据,因为目前通道缓冲区没有数据,所以通道读取操作造成主线程阻塞,调度器将调度工作池的协程,直到有数据添加到 results通道。

当前我们有 3 个work 协程在工作,我们使用了 sleep 操作来模拟阻塞操作,所以调度器在某一个阻塞的时候会去调用其他的 work 协程,当某个 work 协程 sleep 完成后会把计算数字的平方的结果数据放入 results 缓冲无阻塞通道。

当 3 个协程依次交替把 task 通道的任务都完成后,for range 循环将完成,并且因为之前我们已经关闭了任务通道,所以协程也不会发生死锁。调度器将继续返回调度主线程。

有时候所有的工作协程可能都在阻塞,此时调度器将去调度主线程,直到 results 通道再次为空。

当所有 work 协程都完成任务退出后,主线程将继续拿到调度权并打印 results 通道剩下的数据,继续之后代码的执行。

2.5.3.Mutex

互斥是 Go 中一个简单的概念。在我解释它之前,先要明白什么是竞态条件。 goroutines 都有自己的独立的调用栈,因此他们之间不分享任何数据。但是有一种情况是数据存放在堆上,并且被多个 goroutines 使用。 多个 goroutines 试图去操作一个内存区域的数据会造成意想不到的后果.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main
import (
"fmt"
"sync"
)
var i int
func worker(wg *sync.WaitGroup) {
i = i+1
wg.Done()
}
func main() {
fmt.Println("main started")
var wg sync.WaitGroup
for i:=0;i<1000;i++{
wg.Add(1)
go worker(&wg)
}
wg.Wait()
fmt.Println("main stop",i)
}
/*
结果是不同的!!
main started
main stop 985
*/

i = i + 1 这个计算有 3 步
(1) 得到 i 的值
(2) 给 i 的值加 1
(3) 更新 i 的值

这里发生很多事情,因为go是协程,这三步里面不一定都是同时顺序执行的。有可能A是顺利执行,使得i=2,但是B是读取的是A没更新的之前的i也就是1,所以就是结果会小于等于1000的,

除非一个协程阻塞,否则其他协程是没有机会获得调度的。那么 i = i + 1 也没有阻塞,为什么 Go 的调度器会去调度其他协程呢?

在任何情况下,都不应该依赖 Go 的调度算法,而应该实现自己的逻辑来同步不同的 goroutine.

实现方法之一就是使用我们上面提到的互斥锁。互斥锁是一个编程概念,它保证了在同一时间只能有一个线程或者协程去操作同一个数据。当一个协程想要操作数据的时候,必须获取该数据的一个锁,操作完成后必须释放锁,如果没有获取到该数据的锁,那么就不能操作这个数据。

在 Go 中,互斥数据结构 ( map) 由 sync 包提供。在 Go 中,多协程去操作一个值都可能会引起竞态条件。我们需要在操作数据之前使用 mutex.Lock() 去锁定它,一旦我们完成操作,比如上面提到的 i = i + 1, 我们就可以使用 mutext.Unlock() 方法解锁。

如果在锁定的时候,有一个协程想要读写 i 的值,那么此协程将阻塞 直到前面的协程完成操作并解锁数据。因此在某一时刻有且仅有一个协程可以操作数据,从而避免竞态条件。记住,任何锁之间的变量在解锁之前对于其他协程都不是可用的。

让我们使用互斥锁修改上面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main
import (
"fmt"
"sync"
)
var i int
func worker(wg *sync.WaitGroup,m *sync.Mutex) {
m.Lock()
i = i+1
m.Unlock()
wg.Done()
}
func main() {
fmt.Println("main started")
var wg sync.WaitGroup
var m sync.Mutex
for i:=0;i<1000;i++{
wg.Add(1)
go worker(&wg,&m)
}
wg.Wait()
fmt.Println("main stop",i)
}
/*结果
main started
main stop 1000
*/

在上面的程序中,我们创建了一个互斥锁变量 m,并把它的指针传递给所有已创建的协程。
在协程内部,当我们要开始操作 i变量的时候,我们先通过 m.Lock()获得锁,操作完成后我们使用 m.Unlock()释放锁。
互斥锁可以帮助我们解决竞态条件。 但首要规则是避免 goroutine 之间共享资源。
所以官方建议不要共享内存并发,而是通过管道通信的方式并发。

3.结语

后部分go并发知识是参考作者summar的go并发以及书上的知识点,非常感谢作者的翻译工作,使得我能更好的理解go的channel并发机制!链接点这里channel

随着业务的不断扩大,并发能更好的发挥服务器的性能。