任务池

三种角色:Task,Worker(执行 Task 的 Goroutine),Pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main

import (
"fmt"
"sync"
"sync/atomic"
)

type Task struct {
f func() error
}

func NewTask(f func() error) *Task {
return &Task{f: f}
}

type Pool struct {
RunningWorkers int64 // 运行的 worker 数量
Capacity int64 // 协程池 worker 容量
Jobch chan *Task // 用于 worker 取任务
sync.Mutex
}

func NewPool(capacity int64, taskNum int) *Pool {
return &Pool{
Capacity: capacity,
Jobch: make(chan *Task, taskNum),
}
}

func (p *Pool) IncRunning() {
atomic.AddInt64(&p.RunningWorkers, 1)
}

func (p *Pool) DecRunning() {
atomic.AddInt64(&p.RunningWorkers, -1)
}

func (p *Pool) GetRunning() int64 {
return atomic.LoadInt64(&p.RunningWorkers)
}

func (p *Pool) Run() {
p.IncRunning()

go func() {
defer func() {
p.DecRunning()
}()

for task := range p.Jobch {
task.f()
}
}()
}

func (p *Pool) AddTask(task *Task) {
// 防止启动多个 worker
p.Lock()
defer p.Unlock()

// 如果任务池满了,就不再创建 worker
if p.GetRunning() < p.Capacity {
p.Run()
}
p.Jobch <- task // 将任务放入任务队列,等待消费
}

func main() {
pool := NewPool(5, 10)

for i := range 15 {
j := i
// 模拟任务处理
task := NewTask(func() error {
fmt.Println("Processing task", j)
return nil
})
pool.AddTask(task)
}

select {} // 阻塞执行
}

Reference

协程池 https://golangstar.cn/go_series/go_advanced/goroutine_pool.html