「AIエージェントを複数動かしたら、お互いの処理が衝突してカオスになった…」
そんな経験、ありませんか?😅 複数のAIエージェントを連携させるマルチエージェントシステムは、単体エージェントでは難しい複雑なタスクをこなせる反面、エージェント同士の「調整」がめちゃくちゃ難しいんですよね。
今、海外の開発者コミュニティで注目されているのが、メッセージバスパターン(Message-Bus Pattern)という設計手法です。この考え方を使うと、複数のエージェントがお互いに干渉せず、スッキリと協調動作できるようになります。
この記事では、メッセージバスパターンの基本概念からPythonでの実装例まで、初〜中級者にもわかりやすく解説します!
🤔 メッセージバスって何?

イメージとしては「社内の掲示板」に近い感覚です。各エージェントは直接やり取りするのではなく、共通のメッセージバス(バス=情報の通り道)に向けてメッセージを投げたり、受け取ったりします。
つまり、エージェントAがエージェントBを直接呼び出す「密結合」ではなく、バスを介した疎結合(loose coupling)の設計になるわけです。これがポイントです!
- ✅ 各エージェントは独立して動ける
- ✅ 新しいエージェントを追加しやすい
- ✅ 障害が発生しても他に波及しにくい
- ✅ デバッグ・トレースがしやすい
📡 密結合 vs 疎結合:何が違うの?
ちょっと具体的に比較してみましょう。
❌ 密結合(直接呼び出し)の場合
エージェントAが「エージェントBのprocess()メソッドを直接呼ぶ」という形では、AはBの存在を知っている必要があります。BのAPI仕様が変わったらAも修正が必要になり、エージェントが増えるほど依存関係がスパゲッティになっていきます。
✅ 疎結合(メッセージバス)の場合
エージェントAは「data.collected というトピックにメッセージを投げる」だけ。誰がそのメッセージを受け取るかは知らなくてOKです。新しいエージェントCを追加したいときは、そのトピックをサブスクライブするだけで済みます。
🐍 Pythonでメッセージバスを実装してみよう
シンプルなメッセージバスのサンプルを見てみましょう。asyncioとQueueを使って、エージェント間の非同期メッセージングを実現しています。
① メッセージクラスとMessageBusの基本実装
import asyncio
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Any
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(name)s] %(message)s')
# メッセージの型定義
@dataclass
class Message:
topic: str # どのトピック宛か
payload: dict # 実際のデータ
sender: str # 送信元エージェント名
class MessageBus:
def __init__(self):
# トピックごとにサブスクライバー(購読者)を管理
self._subscribers: Dict[str, List[Callable]] = {}
self._logger = logging.getLogger('MessageBus')
def subscribe(self, topic: str, handler: Callable):
"""指定トピックのメッセージを受け取るハンドラを登録"""
if topic not in self._subscribers:
self._subscribers[topic] = []
self._subscribers[topic].append(handler)
self._logger.info(f'Subscribed to [{topic}]: {handler.__qualname__}')
async def publish(self, message: Message):
"""メッセージをバスに投げる(該当トピックの全サブスクライバーへ配信)"""
self._logger.info(f'Published [{message.topic}] from {message.sender}')
handlers = self._subscribers.get(message.topic, [])
if not handlers:
self._logger.warning(f'No subscribers for topic: {message.topic}')
return
# 全サブスクライバーを並列実行
await asyncio.gather(*[handler(message) for handler in handlers])
ポイントは2つです。
- subscribe():「このトピックが来たら、この関数を呼んでね」という登録
- publish():「このトピックにメッセージを投げる」という送信。受け取り手が複数いても
asyncio.gatherで並列処理できます
② エージェントを実装する
次に、メッセージバスを使う「エージェント」を作ってみましょう。ここでは「データ収集エージェント」と「データ分析エージェント」の2つを作ります。
class DataCollectorAgent:
"""データを収集して、バスに流すエージェント"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.name = 'DataCollector'
self._logger = logging.getLogger(self.name)
async def run(self):
# 擬似的にデータを収集したとして、バスに流す
data = {'temperature': 25.4, 'humidity': 60.2, 'location': 'Tokyo'}
self._logger.info(f'Collected data: {data}')
await self.bus.publish(Message(
topic='data.collected',
payload=data,
sender=self.name
))
class DataAnalyzerAgent:
"""収集されたデータを分析するエージェント"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.name = 'DataAnalyzer'
self._logger = logging.getLogger(self.name)
# 初期化時に購読登録
self.bus.subscribe('data.collected', self.handle_data)
async def handle_data(self, message: Message):
"""data.collected トピックを受け取ったときの処理"""
payload = message.payload
self._logger.info(f'Analyzing data from {message.sender}: {payload}')
# 簡単な分析:気温が30度超えたら警告
if payload.get('temperature', 0) > 30:
await self.bus.publish(Message(
topic='alert.high_temperature',
payload={'message': '気温が高すぎます!', 'value': payload['temperature']},
sender=self.name
))
else:
self._logger.info('Temperature is normal.')
class AlertAgent:
"""アラートを受け取って通知するエージェント"""
def __init__(self, bus: MessageBus):
self.bus = bus
self.name = 'AlertAgent'
self._logger = logging.getLogger(self.name)
self.bus.subscribe('alert.high_temperature', self.handle_alert)
async def handle_alert(self, message: Message):
self._logger.info(f'🚨 ALERT from {message.sender}: {message.payload["message"]} (value={message.payload["value"]})')
③ 全部まとめて動かしてみよう
async def main():
# メッセージバスを作成
bus = MessageBus()
# エージェントを初期化(サブスクライブもここで行われる)
collector = DataCollectorAgent(bus)
analyzer = DataAnalyzerAgent(bus)
alert_agent = AlertAgent(bus)
print('=== マルチエージェント開始 ===')
# データ収集エージェントを実行
await collector.run()
print()
print('=== 高温シミュレーション ===')
# 今度は高温データをシミュレート
hot_data = {'temperature': 35.0, 'humidity': 80.0, 'location': 'Osaka'}
await bus.publish(Message(
topic='data.collected',
payload=hot_data,
sender='ManualTest'
))
if __name__ == '__main__':
asyncio.run(main())
④ 実行結果イメージ
2024-01-01 12:00:00 [MessageBus] Subscribed to [data.collected]: DataAnalyzerAgent.handle_data
2024-01-01 12:00:00 [MessageBus] Subscribed to [alert.high_temperature]: AlertAgent.handle_alert
=== マルチエージェント開始 ===
2024-01-01 12:00:00 [DataCollector] Collected data: {'temperature': 25.4, ...}
2024-01-01 12:00:00 [MessageBus] Published [data.collected] from DataCollector
2024-01-01 12:00:00 [DataAnalyzer] Analyzing data from DataCollector: {'temperature': 25.4, ...}
2024-01-01 12:00:00 [DataAnalyzer] Temperature is normal.
=== 高温シミュレーション ===
2024-01-01 12:00:00 [MessageBus] Published [data.collected] from ManualTest
2024-01-01 12:00:00 [DataAnalyzer] Analyzing data from ManualTest: {'temperature': 35.0, ...}
2024-01-01 12:00:00 [MessageBus] Published [alert.high_temperature] from DataAnalyzer
2024-01-01 12:00:00 [AlertAgent] 🚨 ALERT from DataAnalyzer: 気温が高すぎます! (value=35.0)
DataCollector → MessageBus → DataAnalyzer → MessageBus → AlertAgent という流れで、各エージェントはお互いを直接知らずに連携できているのがわかりますね!
🔧 実践的なTips:もっと使いやすくするには
ワイルドカードサブスクライブを追加する
「alert.*のトピックは全部受け取りたい」というケースもよくあります。簡単な拡張で対応できます。
import fnmatch
class MessageBus:
# ...(省略)...
async def publish(self, message: Message):
self._logger.info(f'Published [{message.topic}] from {message.sender}')
matched_handlers = []
for pattern, handlers in self._subscribers.items():
# fnmatchでワイルドカードマッチング
if fnmatch.fnmatch(message.topic, pattern):
matched_handlers.extend(handlers)
if not matched_handlers:
self._logger.warning(f'No subscribers for topic: {message.topic}')
return
await asyncio.gather(*[handler(message) for handler in matched_handlers])
# 使い方例
# bus.subscribe('alert.*', some_handler) # alert.xxx系を全部受け取る
エラーハンドリングを入れる
本番運用では、一つのエージェントでエラーが起きても他のエージェントに影響が出ないようにしましょう。
async def publish(self, message: Message):
handlers = self._get_matched_handlers(message.topic)
results = await asyncio.gather(
*[handler(message) for handler in handlers],
return_exceptions=True # 例外が起きても他の処理は続行
)
for handler, result in zip(handlers, results):
if isinstance(result, Exception):
self._logger.error(f'Handler {handler.__qualname__} raised: {result}')
return_exceptions=Trueを使うだけで、一つのハンドラが例外を投げても他のハンドラは正常に動き続けます。これが「障害が波及しにくい」疎結合の強みです。
📊 メッセージバスパターンの使いどころ
どんな場面で使うと効果的なのかを整理しておきましょう。
- ✅ エージェントの数が3つ以上になるとき:直接呼び出しだと依存関係が爆発的に増えます
- ✅ 将来的に機能を追加する予定があるとき:新しいエージェントをサブスクライブするだけで拡張可能
- ✅ エージェントを並列実行したいとき:asyncioとの相性が抜群です
- ✅ 処理フローをログで追跡したいとき:バスを一点集中で通るのでトレースが簡単
逆に、エージェントが2つで固定だったり、レスポンスが必要なリクエスト/レスポンス型の通信には、シンプルな直接呼び出しのほうが適していることもあります。
🚀 まとめ
メッセージバスパターンのポイントをまとめます。
- エージェント同士を直接つなげず、バスを介して疎結合にする
- トピックのpublish/subscribeでシンプルに連携できる
asyncio.gatherと組み合わせれば並列処理もお手軽return_exceptions=Trueで障害の波及を防ぐことができる- ワイルドカードなど拡張も比較的容易
マルチエージェントシステムを作り始めると「エージェントの連携をどう設計するか」が最大の壁になります。最初からメッセージバスパターンを採用しておくと、あとで後悔するケースがグッと減りますよ。ぜひ自分のプロジェクトで試してみてください!💪
📚 関連商品・おすすめ書籍
もしも
ELEGOO Arduino用UNO R3スターターキット レベルアップ チュートリアル付 mega2560 r3 nanoと互換 [並行輸入品]
チュートリアル付きのArduino入門セット
※本記事にはアフィリエイトリンクが含まれます。




