LaravelのBatchで大量処理を並列実行する方法
大量のデータ処理を順番に実行していたら、いつまで経っても終わらない…ということがありました。LaravelにはBus::batchという仕組みがあって、複数のジョブを並列で実行できます。今回はこのBatch機能を使って、大量処理を効率よくさばく方法をまとめてみました。
目次
バッチテーブルのマイグレーション
まず前提として、Batchを使うにはバッチ情報を保存するテーブルが必要になります。以下のArtisanコマンドでマイグレーションファイルを生成します。
php artisan queue:batches-table
php artisan migrate
これでjob_batchesテーブルが作成されます。バッチの進捗状況やエラー情報はすべてこのテーブルに記録される仕組みですね。
Jobクラスの作成(Batchable trait)
Batchで使うJobクラスにはBatchableトレイトを追加する必要があります。通常のJobクラスに1行追加するだけなので簡単です。
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessCsvRow implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(
private array $row
) {}
public function handle(): void
{
// バッチがキャンセルされていたら処理を中断
if ($this->batch()->cancelled()) {
return;
}
// 実際の処理
\App\Models\Product::updateOrCreate(
["sku" => $this->row["sku"]],
$this->row
);
}
}
ポイントはuse Batchableを追加することと、handle()メソッド内で$this->batch()->cancelled()をチェックすること。バッチがキャンセルされた場合に不要な処理を続けないようにするのが定番パターンです。
Bus::batchの基本的な使い方
Jobクラスができたら、Bus::batch()でバッチを作成して実行します。
use Illuminate\Support\Facades\Bus;
use App\Jobs\ProcessCsvRow;
$rows = // CSVから読み込んだデータ
$jobs = collect($rows)->map(function ($row) {
return new ProcessCsvRow($row);
});
$batch = Bus::batch($jobs)
->name("CSVインポート")
->dispatch();
// バッチIDを保存しておくと後で進捗確認できる
$batchId = $batch->id;
name()メソッドでバッチに名前をつけておくと、後で管理画面やHorizonから確認するときにわかりやすいです。
then/catch/finallyコールバック
バッチの実行結果に応じて処理を分岐できるのがコールバック機能です。
use Illuminate\Bus\Batch;
$batch = Bus::batch($jobs)
->then(function (Batch $batch) {
// すべてのジョブが成功した場合
Log::info("バッチ {$batch->name} が正常に完了しました");
// 完了通知を送信
Notification::send($admin, new BatchCompleted($batch));
})
->catch(function (Batch $batch, \Throwable $e) {
// 最初のジョブが失敗した時点で1回だけ呼ばれる
Log::error("バッチ {$batch->name} でエラー: {$e->getMessage()}");
})
->finally(function (Batch $batch) {
// 成功・失敗にかかわらず必ず呼ばれる
Log::info("バッチ {$batch->name} が終了しました");
})
->dispatch();
then(): すべてのジョブが成功したときに実行されるcatch(): ジョブが失敗したときに実行される(最初の失敗時に1回だけ)finally(): 成功・失敗にかかわらず、バッチ終了時に必ず実行される
この3つを組み合わせれば、バッチの完了通知やエラーハンドリングが簡単に実装できますね。
バッチの進捗監視
バッチの進捗状況はバッチIDを使っていつでも確認できます。APIエンドポイントを作ってフロントエンドからポーリングするのが一般的なパターンです。
use Illuminate\Support\Facades\Bus;
Route::get("/batch/{batchId}", function (string $batchId) {
$batch = Bus::findBatch($batchId);
if (!$batch) {
return response()->json(["error" => "Batch not found"], 404);
}
return response()->json([
"name" => $batch->name,
"totalJobs" => $batch->totalJobs,
"processedJobs" => $batch->processedJobs(),
"failedJobs" => $batch->failedJobs,
"progress" => $batch->progress(), // 0〜100の整数
"finished" => $batch->finished(), // 完了したか
"cancelled" => $batch->cancelled(), // キャンセルされたか
"hasFailures" => $batch->hasFailures(), // 失敗があるか
]);
});
主なプロパティ・メソッドをまとめるとこんな感じです。
progress(): 進捗をパーセント(0〜100)で返すfinished(): バッチが完了したかどうかcancelled(): バッチがキャンセルされたかどうかtotalJobs: 総ジョブ数processedJobs(): 処理済みジョブ数failedJobs: 失敗したジョブ数
バッチをキャンセルしたい場合は$batch->cancel()を呼ぶだけでOKです。
allowFailuresでエラー耐性を高める
デフォルトでは、バッチ内のジョブが1つでも失敗すると、バッチ全体がキャンセルされます。でも「一部失敗しても他のジョブは続行したい」というケースは多いですよね。そんなときはallowFailures()を使います。
$batch = Bus::batch($jobs)
->allowFailures()
->then(function (Batch $batch) {
if ($batch->hasFailures()) {
Log::warning("一部のジョブが失敗しました: {$batch->failedJobs}件");
}
})
->dispatch();
allowFailures()をつけると、失敗したジョブがあっても残りのジョブは実行され続けます。失敗件数は$batch->failedJobsで確認できるので、then()コールバック内でハンドリングすればOKです。
チェーン(chain)との違い
Laravelにはジョブを順番に実行するBus::chain()もあります。Batchとの違いを整理してみました。
- chain: ジョブを順番に1つずつ実行する。前のジョブが終わってから次のジョブが始まる。1つでも失敗するとそこで停止
- batch: ジョブを並列に実行する。進捗監視やキャンセル、エラーハンドリングが柔軟にできる
さらに、Batch内でチェーンを使うこともできます。これが結構便利なんですよね。
use Illuminate\Bus\Queueable;
$batch = Bus::batch([
// この3つのチェーンは並列に実行される
[
new DownloadImage($url1),
new ResizeImage($url1),
new UploadImage($url1),
],
[
new DownloadImage($url2),
new ResizeImage($url2),
new UploadImage($url2),
],
[
new DownloadImage($url3),
new ResizeImage($url3),
new UploadImage($url3),
],
])->dispatch();
配列でジョブを囲むと、その中のジョブはチェーン(順次実行)になり、配列同士は並列で実行されます。「画像ごとにダウンロード→リサイズ→アップロードの順序は守りたいけど、複数画像は同時に処理したい」といった場面で活躍しますね。
実践例
CSVインポート
大量のCSVデータをDBにインポートする定番パターンです。チャンクに分割してバッチ処理します。
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\LazyCollection;
$jobs = LazyCollection::make(function () {
$handle = fopen(storage_path("app/products.csv"), "r");
$header = fgetcsv($handle);
while ($row = fgetcsv($handle)) {
yield array_combine($header, $row);
}
fclose($handle);
})
->chunk(100)
->map(function ($chunk) {
return new ImportCsvChunk($chunk->toArray());
});
Bus::batch($jobs)
->name("商品CSVインポート")
->allowFailures()
->dispatch();
LazyCollectionを使ってメモリ効率よくCSVを読み込み、100件ずつのチャンクに分割してジョブにしています。
メール一斉送信
会員全員にメールを送るようなケースです。
$users = User::where("newsletter", true)->get();
$jobs = $users->map(function ($user) use ($campaign) {
return new SendNewsletterEmail($user, $campaign);
});
$batch = Bus::batch($jobs)
->name("ニュースレター送信: {$campaign->title}")
->allowFailures()
->then(function (Batch $batch) use ($campaign) {
$campaign->update([
"sent_at" => now(),
"failed_count" => $batch->failedJobs,
]);
})
->dispatch();
allowFailures()をつけることで、一部のメール送信が失敗しても他のユーザーへの送信は続行されます。完了後に失敗件数を記録しておけば、後からリトライもできますね。
画像変換
アップロードされた画像を複数サイズにリサイズする処理です。先ほどのチェーンとバッチの組み合わせパターンですね。
$images = Image::where("status", "pending")->get();
$jobs = $images->map(function ($image) {
return [
new OptimizeImage($image),
new GenerateThumbnail($image, "sm", 150, 150),
new GenerateThumbnail($image, "md", 300, 300),
new GenerateThumbnail($image, "lg", 800, 800),
new MarkImageProcessed($image),
];
});
Bus::batch($jobs)
->name("画像変換処理")
->onQueue("image-processing")
->dispatch();
各画像ごとに「最適化→サムネイル生成(複数サイズ)→完了マーク」をチェーンで順次実行しつつ、複数画像を並列で処理しています。onQueue()で専用のキューを指定すれば、他の処理に影響を与えずに済みます。
Horizonでの監視
Laravel Horizonを使っていれば、バッチの状態をダッシュボードから視覚的に確認できます。
composer require laravel/horizon
php artisan horizon:install
php artisan horizon
Horizonをインストールして起動したら、/horizonにアクセスするとダッシュボードが表示されます。ここでバッチのジョブ実行状況、失敗したジョブの詳細、リトライなどが管理できます。
本番環境ではconfig/horizon.phpでワーカー数やメモリ制限を調整しておくと良いですね。
"environments" => [
"production" => [
"supervisor-1" => [
"maxProcesses" => 10,
"balanceMaxShift" => 1,
"balanceCooldown" => 3,
"queue" => ["default", "image-processing"],
],
],
],
バッチ処理で大量のジョブを投入する場合は、maxProcessesを適切に設定してサーバーリソースとのバランスを取ることが大事です。
まとめ
LaravelのBatch機能を使えば、大量処理を並列実行しながら、進捗監視やエラーハンドリングまで簡単に実装できます。ポイントをおさらいしておきます。
- Jobクラスに
Batchableトレイトを追加する Bus::batch()でジョブをまとめて実行then/catch/finallyでコールバック処理progress()で進捗を監視allowFailures()で一部失敗しても続行- 配列で囲めばバッチ内でチェーンも使える
- Horizonと組み合わせると監視が楽になる
CSVインポートやメール一斉送信、画像変換など、大量データを扱う処理にはBatchがめちゃくちゃ有効です。
今回は以上です!