Go言語でAsynqにレート制限を実装する
タスクキューは多くのWebアプリケーションで必要不可欠な機能ですが、大量のタスクを処理する際にはレート制限が重要になります。特に外部API呼び出しやメール送信などのタスクでは、適切なレート制限なしに処理すると、サービス制限に引っかかったり、相手先のサーバーに負荷をかけてしまう可能性があります。
今回は、Go言語の人気タスクキューライブラリであるAsynqにレート制限を実装する方法を、複数のアプローチとともに解説します。
プロジェクト構成
このサンプルプロジェクトは以下の構成になっています。
1 2 3 4 5 6 7 8 9 10 11 12 13
| asynq-late-limiter/ ├── go.mod ├── tasks/ │ └── tasks.go # タスクの定義と処理ロジック ├── limiter/ │ ├── limiter.go # Limiterインターフェース │ ├── rate_limiter.go # golang.org/x/time/rateを使った実装 │ ├── redis_rate_limiter.go # Redisベースの実装(シンプル版) │ ├── redis_rate_limiterv2.go # Redisベースの実装(改良版) ├── producer/ │ └── main.go # タスクを生成するプロデューサー └── worker/ └── main.go # タスクを処理するワーカー
|
1. インターフェース設計
まず、レート制限の基盤となるインターフェースを定義します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package limiter
import ( "fmt" "time" )
type Limiter interface { Check() error }
type RateLimitError struct { RetryIn time.Duration }
func (e *RateLimitError) Error() string { return fmt.Sprintf("rate limited (retry in %v)", e.RetryIn) }
|
このインターフェース設計により、異なるレート制限の実装を簡単に差し替えることができます。
2. レート制限の実装方式
2.1 golang.org/x/time/rateを使った実装
標準的なトークンバケットアルゴリズムを使った実装です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| type RateLimiter struct { limiter *rate.Limiter limit int burst int retryIn time.Duration }
func NewRateLimiter(limit int, burst int, retryIn time.Duration) *RateLimiter { return &RateLimiter{ limiter: rate.NewLimiter(rate.Limit(limit), burst), limit: limit, burst: burst, retryIn: retryIn, } }
func (rl *RateLimiter) Check() error { if !rl.limiter.Allow() { return &RateLimitError{ RetryIn: rl.retryIn, } } return nil }
|
メリット
デメリット
2.2 Redisベースの実装(改良版)
複数のワーカープロセス間でレート制限を共有する場合に適しています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| type RedisRateLimiterV2 struct { client *redis.Client key string limit int windowSize time.Duration }
func (r *RedisRateLimiterV2) Allow(taskType string) (bool, int64, error) { ctx := context.Background() key := fmt.Sprintf("rate_limit:%s", taskType) now := time.Now().Unix()
script := ` local key = KEYS[1] local limit = tonumber(ARGV[1]) local window = tonumber(ARGV[2]) local now = tonumber(ARGV[3])
-- 古いエントリを削除 redis.call("ZREMRANGEBYSCORE", key, "-inf", now - window)
-- 現在のリクエスト数を取得 local count = redis.call("ZCOUNT", key, "-inf", "+inf")
if count >= limit then -- 最も古いリクエストのタイムスタンプを取得 local oldest = redis.call("ZRANGE", key, 0, 0, "WITHSCORES") local retryIn = 0.0 if #oldest > 0 then retryIn = math.max(0, oldest[2] + window - now) end return {0, retryIn} end
-- 新しいリクエストを追加 redis.call("ZADD", key, now, tostring(now) .. "-" .. redis.call("INCR", "request_counter")) redis.call("EXPIRE", key, window)
return {1, 0} `
result, err := r.client.Eval(ctx, script, []string{key}, r.limit, int(r.windowSize.Seconds()), now).Result() }
|
メリット
- 分散環境での動作
- 複数のワーカー間でレート制限を共有
- 正確な待機時間の算出
デメリット
- Redisへの依存
- わずかなパフォーマンスオーバーヘッド
3. Asynqとの統合
タスクの定義
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func EmailNotificationTask(ctx context.Context, t *asynq.Task, limit limiter.Limiter) error { if err := limit.Check(); err != nil { return err }
var payload map[string]string if err := json.Unmarshal(t.Payload(), &payload); err != nil { return err } log.Println("Sending Email to:", payload["email"], "with subject:", payload["subject"]) return nil }
|
ワーカーの設定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func main() { emailRateLimiter := limiter.NewRedisRateLimiterV2("email", 1, time.Second)
srv := asynq.NewServer( asynq.RedisClientOpt{Addr: ":6379"}, asynq.Config{ Concurrency: 5, Queues: map[string]int{ "default": 7, "email": 3, }, IsFailure: func(err error) bool { return !IsRateLimitError(err) }, RetryDelayFunc: retryDelay, DelayedTaskCheckInterval: 100 * time.Millisecond, }, )
mux := asynq.NewServeMux() mux.HandleFunc(tasks.TypeEmailTask, func(ctx context.Context, t *asynq.Task) error { return tasks.EmailNotificationTask(ctx, t, emailRateLimiter) })
if err := srv.Run(mux); err != nil { log.Fatalf("サーバーを起動できませんでした: %v", err) } }
|
エラーハンドリングとリトライ戦略
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func IsRateLimitError(err error) bool { _, ok := err.(*limiter.RateLimitError) return ok }
func retryDelay(n int, err error, task *asynq.Task) time.Duration { var ratelimitErr *limiter.RateLimitError if errors.As(err, &ratelimitErr) { return ratelimitErr.RetryIn } return asynq.DefaultRetryDelayFunc(n, err, task) }
|
4. 実行方法
1. 依存関係のインストール
2. Redisの起動
1 2 3 4 5
| docker run -d -p 6380:6379 redis:latest
redis-server --port 6380
|
3. ワーカーの起動
4. タスクの投入
別のターミナルで以下のコマンドを実行。
5. 実装のポイント
レート制限の適切な設定
- 外部API呼び出し: APIの制限に合わせて設定(例:100リクエスト/分)
- メール送信: メール送信サービスの制限に合わせて設定(例:10通/秒)
- データベース操作: データベースの負荷を考慮して設定
エラーハンドリング
1 2 3 4 5 6 7 8 9 10 11
| srv := asynq.NewServer( redisOpt, asynq.Config{ IsFailure: func(err error) bool { return !IsRateLimitError(err) }, RetryDelayFunc: retryDelay, }, )
|
まとめ
このサンプルコードでは、Asynqにレート制限を実装する複数の方法を紹介しました。どの実装を選ぶかは、システムの要件によって決まります:
- 単一プロセス環境:
golang.org/x/time/rate
ベースの実装
- 分散環境: Redisベースの実装
インターフェースベースの設計により、後から実装を切り替えることも容易です。適切なレート制限により、安定したタスク処理システムを構築できます。
参考リンク