使用带缓冲channel和goroutine实现并发队列消费,定义Task结构体并创建缓冲channel,启动多个worker从channel读取并处理任务,利用channel的并发安全性实现高效任务分发与执行。

在Go语言中实现并发队列消费,核心是结合 routine 和 channel 来安全高效地处理任务。你可以使用带缓冲的channel作为任务队列,多个消费者goroutine并行从channel中读取任务进行处理,从而实现并发消费。
1. 使用带缓冲Channel作为任务队列
定义一个结构体表示任务,然后创建一个带缓冲的channel存放任务实例。多个消费者同时监听这个channel,Go的channel本身是并发安全的,无需额外加锁。
示例:
type Task struct { ID int Data string } <p>taskQueue := make(chan Task, 100) // 缓冲大小为100的任务队列
2. 启动多个消费者Goroutine
启动固定数量的goroutine,每个都从channel中接收任务并处理。当任务队列关闭且所有任务被消费后,goroutine会自动退出。
示例代码:
func worker(id int, tasks <-chan Task) { for task := range tasks { fmt.Printf("Worker %d processing task %d: %sn", id, task.ID, task.Data) // 模拟处理耗时 time.Sleep(time.Second) } fmt.Printf("Worker %d stopped.n", id) } <p>// 启动3个消费者 for i := 1; i <= 3; i++ { go worker(i, taskQueue) }
3. 生产任务并关闭队列
生产者将任务发送到channel中。所有任务发送完成后,关闭channel以通知消费者不再有新任务。
立即学习“”;
AI应用商店,提供即时交付、按需付费的人工智能应用服务
56 for i := 1; i <= 10; i++ { taskQueue <- Task{ID: i, Data: fmt.Sprintf("data-%d", i)} } close(taskQueue)
注意:只有生产者需要调用 close(taskQueue),消费者不能关闭channel。
4. 等待所有消费者完成
使用sync.WaitGroup等待所有worker结束,避免主程序提前退出。
var wg sync.WaitGroup for i := 1; i <= 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() worker(id, taskQueue) }(i) } wg.Wait()
基本上就这些。这种方式简单、安全、高效,适用于大多数并发任务处理场景,比如消息消费、批量数据处理等。关键是合理设置channel缓冲大小和消费者数量,避免资源浪费或阻塞。不复杂但容易忽略细节。
以上就是如何使用Golang实现并发队列消费的详细内容,更多请关注php中文网其它相关文章!
微信扫一扫打赏
支付宝扫一扫打赏
