使用channel做goroutine同步

channel和goroutine是golang CSP并发模型的承载体,是goroutine同步绝对主角。channel的使用场景远比MutexWaitGroup多得多。

实现和WaitGroup一样的功能

func main() { ch := make(chan struct{}, 2) go func() { defer func() { ch <- struct{}{} }() time.Sleep(1e9) fmt.Println("after 1 second") }() go func() { defer func() { ch <- struct{}{} }() time.Sleep(2e9) fmt.Println("after 2 second") }() i := 0 for _ = range ch { i++ if i == 2 { close(ch) } } fmt.Println("the end") }

生产消费模型

这是个常用的1对N的生产消费模型,常常用于消费redis队列。

package main import ( "os" "fmt" "os/signal" "syscall" "sync" ) var quit bool = false const THREAD_NUM = 5 func main() { sigs := make(chan os.Signal, 1) //signal.Notify 注册这个给定的通道用于接收特定信号。 signal.Notify(sigs, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2) quitChan := make(chan bool) rowkeyChan := make(chan string, THREAD_NUM) go func() { <-sigs //等待信号 quit = true close(rowkeyChan) }() var wg sync.WaitGroup for i := 0; i < THREAD_NUM;i++ { wg.Add(1) go func(n int) { for { rowkey,ok := <- rowkeyChan if !ok { break } //do something with rowkey fmt.Println(rowkey) } wg.Done() }(i) } go func() { wg.Wait() quitChan <- true }() for !quit { //rowkey 可能来着redis的队列 rowkey := "" rowkeyChan <- rowkey } <- quitChan }

上面的代码稍微修改下很容易支持N:M的生产消费模型,这边就不再赘述。

赛道模型

在现实很多场景我们需要并发的做个任务,我们想知道他们的先后顺序,或者只想知道最快的那个。channel的特性很容易做到这个需求

package main import ( "fmt" "net/http" ) func main() { ch := make(chan string) go func() { http.Head("https://www.taobao.com") ch <- "taobao" }() go func() { http.Head("https://www.jd.com") ch <- "jd" }() go func() { http.Head("https://www.vip.com") ch <- "vip" }() //只想知道最快的 dm := <- ch fmt.Println(dm) /* 知道它们的排名 for dm:= range ch { fmt.Println(dm) } */ }

如何控制并发数

虽然golang新开一个goroutine占用的资源很小,但是无谓的goroutine开销对golang的调度性能很有影响,同时也会浪费cpu的资源。那么golang中如何控制并发数。

方式1——使用带缓冲的channel

package main import ( "fmt" "time" ) type Task struct { Id int } func work(task *Task,limit chan struct{}) { time.Sleep(1e9) fmt.Println(task.Id) <- limit } func main() { ch := make(chan *Task,10) limit := make(chan struct{},3) go func() { for i:=0;i<10;i++ { ch <- &Task{i} } close(ch) }() for { task ,ok :=<- ch if ok { limit <- struct{}{} go work(task,limit) } } }

方式2——使用master-worker模型,worker数量固定

package main import ( "fmt" "time" ) const WorkerNum = 3 type Task struct { Id int } func work(ch chan *Task) { for {//worker中循环消费任务 task,ok :=<- ch if ok { time.Sleep(1e9) fmt.Println(task.Id) } } } func main() { ch := make(chan *Task,3) exit := make(chan struct{}) go func() { for i:=0;i<10;i++ { ch <- &Task{i} } close(ch) }() for i:=0;i<WorkerNum;i++{ //控制worker数量 go work(ch) } <- exit }

控制goroutine优雅退出

除了我们需要控制goroutine的数量之外,我们还需要控制goroutine的生命周期同样以防止不不要的资源和性能消耗。那么接下来我们介绍下控制goroutine生命周期的办法。

方法1——goroutine超时控制

package main import ( "fmt" "time" ) func main() { ch := make(chan struct{}) task := func() { time.Sleep(3e9) ch <- struct{}{} } go task() select { case <-ch: //在3s内正常完成 fmt.Println("task finish") case <-time.After(3*time.Second): //超过3秒 fmt.Println("timeout") } }
$ go run main.go timeout

我们使用了 slecettime.After 来控制goroutine超时。

方法2——使用context.Context

package main import ( "context" "fmt" "time" ) func main() { ctx ,cancel:= context.WithCancel(context.Background()) //使用带cancel函数的context task := func(ctx context.Context) { for { select { case <-ctx.Done(): //cancel函数已经被执行了 default: time.Sleep(1e9) fmt.Println("hello") } } } go task(ctx) time.Sleep(3e9) //等待3s cancel() //让goroutine退出 }
$ go run main.go hello hello

当然我们还可以使用 context.WithTimeout的方式

package main import ( "context" "fmt" "time" ) func main() { exit := make(chan struct{}) ctx,_ := context.WithTimeout(context.Background(),3*time.Second) //使用带超时的context task := func(ctx context.Context) { HE: for { select { case <-ctx.Done(): //cancel函数已经被执行了 break HE //退出for循环 default: time.Sleep(1e9) fmt.Println("hello") } } exit<- struct{}{} } go task(ctx) <-exit }
$ go run main.go hello hello hello

这样的用法和上面的用法的区别是可以不需要手动去调用cancel函数。

总结

本小节我们介绍了使用channel同步goroutine的方法,介绍了channel的常见使用场景,已经如果控制goroutine的几种方法。这些方法在很是实际开发中都会用到。