令牌桶的简单实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| package utils
import ( "sync" "time" )
type TokenBucket struct { capacity int64 tokens int64 refillRate int64 refillPeriod time.Duration lastRefill time.Time mutex sync.Mutex stopCh chan struct{} isRunning bool }
func NewTokenBucket(capacity int64, refillRate int64) *TokenBucket { bucket := &TokenBucket{ capacity: capacity, tokens: capacity, refillRate: refillRate, refillPeriod: time.Second / time.Duration(refillRate), lastRefill: time.Now(), stopCh: make(chan struct{}), isRunning: false, }
bucket.start() return bucket }
func (tb *TokenBucket) start() { tb.mutex.Lock() if tb.isRunning { tb.mutex.Unlock() return } tb.isRunning = true tb.mutex.Unlock()
go func() { ticker := time.NewTicker(tb.refillPeriod) defer ticker.Stop()
for { select { case <-ticker.C: tb.refill() case <-tb.stopCh: return } } }() }
func (tb *TokenBucket) refill() { tb.mutex.Lock() defer tb.mutex.Unlock()
if tb.tokens < tb.capacity { tb.tokens++ tb.lastRefill = time.Now() } }
func (tb *TokenBucket) Allow() bool { return tb.AllowN(1) }
func (tb *TokenBucket) AllowN(n int64) bool { tb.mutex.Lock() defer tb.mutex.Unlock()
if tb.tokens >= n { tb.tokens -= n return true } return false }
func (tb *TokenBucket) WaitN(n int64, timeout time.Duration) bool { deadline := time.Now().Add(timeout)
ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop()
for { if tb.AllowN(n) { return true } if time.Now().After(deadline) { return false } <-ticker.C } }
func (tb *TokenBucket) GetStatus() (current int64, capacity int64) { tb.mutex.Lock() defer tb.mutex.Unlock()
return tb.tokens, tb.capacity }
func (tb *TokenBucket) Stop() { tb.mutex.Lock() defer tb.mutex.Unlock()
if tb.isRunning { close(tb.stopCh) tb.isRunning = false } }
|
Reference
图解各类限流算法|固定窗口/计数器、滑动窗口、漏桶算法、令牌桶算法
token_bucket.go