Orchestrator¶
The Orchestrator routes messages to the appropriate agent and manages multi-agent workflows.
Quick Start¶
from flowagents import Orchestrator, OpenAIClient, StandardAgent, flowagent, InputField, AgentStatus
@flowagent(triggers=["book table", "reservation"])
class BookingAgent(StandardAgent):
guests = InputField("How many guests?")
async def on_running(self, msg):
return self.make_result(status=AgentStatus.COMPLETED, raw_message="Booked!")
@flowagent(triggers=["leave request", "time off"])
class LeaveAgent(StandardAgent):
start_date = InputField("Start date?")
async def on_running(self, msg):
return self.make_result(status=AgentStatus.COMPLETED, raw_message="Submitted!")
# Create orchestrator
llm = OpenAIClient(api_key="sk-xxx", model="gpt-4o-mini")
orchestrator = Orchestrator(llm_client=llm)
await orchestrator.initialize()
# Routes automatically based on triggers
result = await orchestrator.handle_message("user_1", "I need to book a table") # → BookingAgent
result = await orchestrator.handle_message("user_1", "Request time off") # → LeaveAgent
Routing Strategies¶
Trigger-based (Default)¶
Routes based on triggers in @flowagent:
LLM-based¶
Uses LLM to understand intent:
Custom Routing¶
class MyOrchestrator(Orchestrator):
async def route_message(self, message: str, context: dict):
if "urgent" in message.lower():
return RoutingDecision(agent_type="UrgentAgent", confidence=1.0)
return await super().route_message(message, context)
Context Hooks¶
class MyOrchestrator(Orchestrator):
async def prepare_context(self, message: str, context: dict) -> dict:
"""Add context before processing"""
context["user_prefs"] = await get_user_prefs(context["tenant_id"])
return context
async def should_process(self, message: str, context: dict) -> bool:
"""Guard: rate limiting, content filtering"""
return not await is_rate_limited(context["tenant_id"])
async def post_process(self, result, context: dict):
"""After processing: logging, save to history"""
await save_to_history(context["tenant_id"], result)
return result
Multi-tenant¶
Streaming¶
async for event in orchestrator.process_stream(message, tenant_id="user_123"):
if event.type == "message_chunk":
print(event.content, end="")
elif event.type == "state_change":
print(f"\n[State: {event.new_state}]")
Agent Pool¶
# Get active agents
agents = orchestrator.get_active_agents(tenant_id="user_123")
# Cleanup old agents
orchestrator.cleanup_completed_agents(max_age_seconds=3600)
Error Handling¶
class MyOrchestrator(Orchestrator):
async def handle_no_match(self, message: str, context: dict):
"""No agent matched"""
return AgentResult(
status=AgentStatus.COMPLETED,
raw_message="I can help with bookings and leave requests."
)
Best Practices¶
- Use specific triggers - Better routing accuracy
- Set timeouts - Prevent runaway agents
- Implement guards - Rate limiting, content filtering
- Clean up - Remove completed agents from pool