AIエージェントの実行フローを「途中でつかまえたい」と思ったこと、ありませんか?🤔
ストリーミング処理って便利な反面、「どこで何が起きているのか追いかけにくい」という悩みがつきものですよね。そんな課題に正面から向き合ったアップデートが、SoloEngine v0.3.0としてリリースされました!
🚀 SoloEngine v0.3.0 とは?

SoloEngineは、LLM(大規模言語モデル)を使ったエージェント処理を手軽に組み立てられるPythonフレームワークです。ReActパターン(Reasoning+Acting:推論しながら行動するアーキテクチャ)を採用していて、ツール呼び出しを含む複雑なエージェントループをシンプルに記述できるのが特徴です。
今回のv0.3.0では、大きく2つの機能が追加されました。それぞれ順番に見ていきましょう!
✅ 追加機能①:チェックポイント機構(Checkpoint Mechanism)
イメージとしては、電車の「通過駅」のようなものです。ストリーミング処理が流れる中で、特定のタイミングに「ここを通過した」という記録を残せる仕組みです。
これまでのSoloEngineでは、エージェントが途中でエラーを起こしたり、予期せず停止してしまったりした場合、最初からやり直しになっていました。長い処理ほどそのコストは大きく、「せめてどこまで進んでいたかだけでも知りたい!」という声が多かったのです。
チェックポイント機構の使い方
実際のコードで確認してみましょう。
from solo_engine import Agent, CheckpointManager
# チェックポイントマネージャーを初期化
cp_manager = CheckpointManager(storage="local", path="./checkpoints")
# エージェントにチェックポイントマネージャーを渡す
agent = Agent(
model="gpt-4o",
tools=[search_tool, calc_tool],
checkpoint_manager=cp_manager
)
# 実行(途中経過が自動的にチェックポイントへ保存される)
result = agent.run(
task="東京の天気を調べて、明日の気温を摂氏と華氏で教えて",
checkpoint_interval=2 # ステップ2回ごとにチェックポイント保存
)
print(result)
checkpoint_interval で何ステップごとに保存するかを指定できます。デフォルトは 1(毎ステップ保存)ですが、処理が軽い場合は 2〜3 に設定すると I/O コストを抑えられます。
途中から再開する方法
エラーが起きても、チェックポイントから処理を再開できます。
from solo_engine import Agent, CheckpointManager
cp_manager = CheckpointManager(storage="local", path="./checkpoints")
# 最新のチェックポイントを読み込んで再開
checkpoint = cp_manager.load_latest()
agent = Agent(
model="gpt-4o",
tools=[search_tool, calc_tool],
checkpoint_manager=cp_manager
)
# resume=True で途中から再スタート
result = agent.run(
task="東京の天気を調べて、明日の気温を摂氏と華氏で教えて",
resume=True
)
print(result)
resume=True を渡すだけで、最後に保存されたチェックポイントから自動的に処理が再開されます。長時間かかるタスクでも安心して実行できますね!
チェックポイントのストレージ種別
v0.3.0では、以下の3種類のストレージに対応しています。
| 種別 | 用途 | 指定方法 |
|---|---|---|
| ローカルファイル | 開発・テスト向け | storage="local" |
| Redis | マルチプロセス・本番環境向け | storage="redis" |
| メモリ(インメモリ) | テスト・短期タスク向け | storage="memory" |
本番環境では Redis を使うと複数ワーカー間でチェックポイントを共有できるので、スケールアウトにも対応できます。
✅ 追加機能②:メッセージキュー(Message Queue)
もう一つの目玉機能がメッセージキューです。エージェントの処理中に発生するイベント(思考ログ・ツール呼び出し・結果など)を、非同期で受け取れる仕組みです。
従来はエージェントの実行が完了するまで結果を待つしかありませんでした。しかしメッセージキューを使えば、処理の途中経過をリアルタイムで受け取りながら、UIに表示したり、ログに書き込んだりできるようになります。
メッセージキューの基本的な使い方
import asyncio
from solo_engine import Agent, MessageQueue
async def main():
queue = MessageQueue()
agent = Agent(
model="gpt-4o",
tools=[search_tool],
message_queue=queue
)
# エージェントの実行を非同期タスクとして開始
task = asyncio.create_task(
agent.arun(task="最新のPythonバージョンを調べて")
)
# キューからメッセージをリアルタイムで受け取る
while not task.done():
try:
message = await asyncio.wait_for(queue.get(), timeout=0.5)
print(f"[{message.type}] {message.content}")
except asyncio.TimeoutError:
continue
result = await task
print(f"\n最終結果: {result}")
asyncio.run(main())
実行するとこんな感じのログが流れてきます。
[thinking] 最新のPythonバージョンを検索します...
[tool_call] search_tool(query="Python latest version 2025")
[tool_result] Python 3.13.2 がリリースされています
[thinking] 結果をまとめます
[final] 最新のPythonバージョンは 3.13.2 です(2025年時点)
エージェントが「今何を考えているか」「どのツールを呼んだか」が手に取るようにわかりますね!デバッグはもちろん、チャットUIへのリアルタイム表示にもそのまま使えます。
メッセージの種類(MessageType)
v0.3.0で扱えるメッセージタイプは以下の通りです。
| メッセージタイプ | 発生タイミング |
|---|---|
thinking | LLMが思考中のとき |
tool_call | ツールを呼び出したとき |
tool_result | ツールの結果が返ってきたとき |
checkpoint_saved | チェックポイントが保存されたとき |
error | エラーが発生したとき |
final | エージェントの処理が完了したとき |
特定のメッセージだけを拾いたい場合は、フィルタを設定することもできます。
# tool_call と error だけを受け取りたい場合
queue = MessageQueue(filter_types=["tool_call", "error"])
🔗 チェックポイント × メッセージキューの組み合わせ
この2つの機能を組み合わせると、さらに強力になります。たとえば「チェックポイントが保存されたタイミングをUIに通知する」という処理がたった数行で書けます。
import asyncio
from solo_engine import Agent, CheckpointManager, MessageQueue
async def main():
cp_manager = CheckpointManager(storage="local", path="./checkpoints")
queue = MessageQueue(filter_types=["checkpoint_saved", "error", "final"])
agent = Agent(
model="gpt-4o",
tools=[search_tool, calc_tool],
checkpoint_manager=cp_manager,
message_queue=queue,
)
task = asyncio.create_task(
agent.arun(task="複数の都市の天気を調べて表にまとめて")
)
while not task.done():
try:
message = await asyncio.wait_for(queue.get(), timeout=0.5)
if message.type == "checkpoint_saved":
print(f"💾 チェックポイント保存: ステップ {message.content['step']}")
elif message.type == "error":
print(f"❌ エラー発生: {message.content}")
elif message.type == "final":
print(f"✅ 完了!")
except asyncio.TimeoutError:
continue
result = await task
print(result)
asyncio.run(main())
これで「エラー発生 → チェックポイントから再開」という流れを自動化することもできます。本番環境での信頼性が一気に上がりますね。
📦 v0.3.0 へのアップグレード方法
既存の環境への導入は pip 一発です。
pip install solo-engine --upgrade
バージョンを確認しておきましょう。
python -c "import solo_engine; print(solo_engine.__version__)"
0.3.0 と表示されればOKです!
v0.2.x からの移行時の注意点
基本的な後方互換性は維持されていますが、以下の点に注意してください。
Agent.run()の戻り値の型がstrからAgentResultオブジェクトに変わりました。文字列として使いたい場合はresult.textでアクセスしてください。checkpoint_managerを渡さない場合は従来通り動作します(チェックポイント機能はオプション扱い)。- Python 3.9以上が必要です(asyncio の仕様変更への対応のため)。
📝 v0.3.0 変更点まとめ
| 機能 | v0.2.x | v0.3.0 |
|---|---|---|
| 途中経過の保存 | ❌ なし | ✅ チェックポイント機構 |
| リアルタイムイベント取得 | ❌ なし | ✅ メッセージキュー |
| 処理の途中再開 | ❌ 最初からやり直し | ✅ resume=True で再開 |
| 非同期実行 | △ 部分対応 | ✅ arun() で完全対応 |
| ストレージ選択 | ❌ なし | ✅ local / redis / memory |
🎯 どんな人におすすめ?
今回のアップデートが特に刺さるのは、こういった用途を考えている方です。
- 🔍 エージェントのデバッグが大変な方:メッセージキューで思考ログが可視化され、どこでつまずいているか一目瞭然になります。
- ⏱️ 長時間タスクを扱う方:チェックポイントで途中再開できるので、APIエラーや通信断が起きても怖くありません。
- 🖥️ チャットUIを作っている方:メッセージキューをそのままストリーミング表示に流用でき、ユーザー体験が向上します。
- 🏭 本番環境にエージェントを載せたい方:Redisストレージ対応でマルチプロセス・スケールアウトにも耐えられます。
まとめ
SoloEngine v0.3.0 の2大新機能をまとめると、こうなります。
- ✅ チェックポイント機構:処理の途中経過を保存して、エラー時でも再開できる
- ✅ メッセージキュー:エージェントの思考・ツール呼び出しをリアルタイムで受け取れる
この2つが合わさることで、「ブラックボックス」だったエージェントの内部が透明になり、デバッグしやすく・本番環境でも安心して使えるフレームワークへと進化しました。
LLMエージェント開発に興味がある方は、ぜひ v0.3.0 を試してみてください。小さなサイドプロジェクトからでも、チェックポイントとメッセージキューの便利さをすぐに実感できると思います!💪





