当前位置:  首页>> 技术小册>> Golang并发编程实战

18 | 分组操作:处理一组子任务,该用什么并发原语?

在Golang的并发编程实践中,处理一组子任务是一个常见的需求,它要求程序能够高效地并行执行多个独立或相互依赖的任务,同时管理这些任务的执行流程,确保资源的有效利用和任务的正确完成。为了实现这一目标,Golang提供了一系列强大的并发原语,包括goroutines、channels、sync包中的工具(如WaitGroup、Mutex、RWMutex、Once等),以及更高级的并发控制结构如context包和errors包中的新特性。本章将深入探讨在处理一组子任务时,如何选择和应用这些并发原语。

1. 理解需求:分组操作的核心

分组操作的核心在于将一组任务分配给多个goroutines并行执行,同时提供一种机制来等待所有任务完成、收集结果或处理错误。这种模式在Web服务器处理多个请求、批量数据处理、并行测试等场景中尤为常见。

2. Goroutines与Channels:基础构建块

Goroutines 是Golang中实现并发的核心机制,它们轻量级且易于创建。每个goroutine独立执行其函数,与主goroutine并行运行。然而,仅凭goroutines本身并不足以有效管理并发任务,特别是当需要等待多个任务完成时。

Channels 作为goroutines之间的通信管道,是实现任务同步和结果传递的关键。通过channels,一个goroutine可以发送数据到另一个goroutine,从而协调它们的执行。

3. sync.WaitGroup:等待所有goroutines完成

sync.WaitGroup 是处理一组子任务时最常用的并发原语之一。它允许goroutines等待一组其他goroutines的完成。WaitGroup内部维护了一个计数器,每当一个goroutine启动时,通过调用Add(1)增加计数器;每当一个goroutine完成时,通过调用Done()(实际上是Add(-1)的封装)减少计数器。主goroutine通过调用Wait()方法阻塞,直到计数器归零,即所有子goroutines都已完成。

示例代码

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. )
  7. func worker(id int, wg *sync.WaitGroup) {
  8. defer wg.Done() // 确保在函数退出时调用Done()
  9. fmt.Printf("Worker %d starting\n", id)
  10. time.Sleep(time.Second)
  11. fmt.Printf("Worker %d done\n", id)
  12. }
  13. func main() {
  14. var wg sync.WaitGroup
  15. for i := 1; i <= 5; i++ {
  16. wg.Add(1)
  17. go worker(i, &wg)
  18. }
  19. wg.Wait() // 等待所有worker完成
  20. fmt.Println("All workers have finished")
  21. }

4. Channels与Range:收集子任务结果

当需要收集每个子任务的结果时,可以使用channels。每个goroutine将结果发送到channel中,而主goroutine则从channel中读取结果。使用range语句可以方便地遍历channel,直到其被关闭,从而自动处理所有发送的数据。

示例代码

  1. package main
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. func worker(id int, results chan<- int) {
  7. time.Sleep(time.Second)
  8. results <- id * 2 // 假设每个worker都计算其ID的两倍
  9. }
  10. func main() {
  11. results := make(chan int, 5) // 缓冲channel,防止阻塞
  12. for i := 1; i <= 5; i++ {
  13. go worker(i, results)
  14. }
  15. // 等待所有worker发送结果
  16. for i := 1; i <= 5; i++ {
  17. result := <-results
  18. fmt.Println("Result:", result)
  19. }
  20. close(results) // 关闭channel,通知range循环结束
  21. }

注意:在上面的例子中,虽然使用了固定次数的循环来读取结果,但在实际应用中,更常见的做法是使用range遍历channel,直到其关闭。

5. 错误处理与Context

在并发程序中,错误处理尤为重要。由于多个goroutine可能同时运行,一个goroutine中的错误可能会影响到其他goroutine或整个程序的执行。context包提供了一种机制,允许goroutines之间传递取消信号、超时时间以及其他请求相关的值,从而优雅地处理错误和中断。

使用Context

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. )
  7. func worker(ctx context.Context, id int) {
  8. select {
  9. case <-time.After(time.Second):
  10. fmt.Printf("Worker %d done\n", id)
  11. case <-ctx.Done():
  12. fmt.Printf("Worker %d cancelled\n", id)
  13. }
  14. }
  15. func main() {
  16. ctx, cancel := context.WithCancel(context.Background())
  17. time.AfterFunc(2*time.Second, cancel) // 2秒后取消
  18. for i := 1; i <= 5; i++ {
  19. go worker(ctx, i)
  20. }
  21. // 等待足够的时间以观察结果
  22. time.Sleep(3 * time.Second)
  23. }

在这个例子中,我们使用context.WithCancel创建了一个可取消的context,并通过cancel函数在2秒后触发取消操作。每个worker都监听context的Done channel,以便在收到取消信号时提前退出。

6. 并发控制:避免资源竞争

在处理并发任务时,还需注意资源竞争和同步问题。如果多个goroutines需要访问共享资源(如全局变量、文件句柄等),必须使用适当的同步机制,如sync.Mutexsync.RWMutex,来确保一次只有一个goroutine可以访问该资源。

7. 总结

处理一组子任务的并发编程在Golang中是一个复杂但强大的领域。通过合理使用goroutines、channels、sync包中的工具(如WaitGroup)以及context包,可以构建出高效、可靠、易于维护的并发程序。在选择并发原语时,应充分考虑任务的具体需求,如是否需要等待所有任务完成、是否需要收集任务结果、是否需要处理错误和中断等。通过不断实践和探索,你将能够更加熟练地运用这些并发原语,编写出更加优秀的并发程序。


该分类下的相关小册推荐: