任务池
三种角色:Task,Worker(执行 Task 的 Goroutine),Pool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| package main
import ( "fmt" "sync" "sync/atomic" )
type Task struct { f func() error }
func NewTask(f func() error) *Task { return &Task{f: f} }
type Pool struct { RunningWorkers int64 Capacity int64 Jobch chan *Task sync.Mutex }
func NewPool(capacity int64, taskNum int) *Pool { return &Pool{ Capacity: capacity, Jobch: make(chan *Task, taskNum), } }
func (p *Pool) IncRunning() { atomic.AddInt64(&p.RunningWorkers, 1) }
func (p *Pool) DecRunning() { atomic.AddInt64(&p.RunningWorkers, -1) }
func (p *Pool) GetRunning() int64 { return atomic.LoadInt64(&p.RunningWorkers) }
func (p *Pool) Run() { p.IncRunning()
go func() { defer func() { p.DecRunning() }()
for task := range p.Jobch { task.f() } }() }
func (p *Pool) AddTask(task *Task) { p.Lock() defer p.Unlock()
if p.GetRunning() < p.Capacity { p.Run() } p.Jobch <- task }
func main() { pool := NewPool(5, 10)
for i := range 15 { j := i task := NewTask(func() error { fmt.Println("Processing task", j) return nil }) pool.AddTask(task) }
select {} }
|
Reference
协程池 https://golangstar.cn/go_series/go_advanced/goroutine_pool.html