Workflow vs Agent โ the most important distinction
Deterministic workflow: you control the flow. The LLM is used as a step within a predetermined graph.
user_input โ classify_intent() โ if intent == 'refund': process_refund()
if intent == 'question': rag_answer()
Agent: the LLM decides what to do next, including which tools to call, in what order.
user_input โ LLM decides: "I need to check order status, then check refund policy, then respond"
โ LLM calls tools in sequence โ synthesizes final answer
When to use agents:
- The sequence of steps is unpredictable (depends on what you find)
- Tasks require self-correction (test your output, fix if wrong)
- Open-ended research or multi-hop reasoning
When NOT to use agents:
- The steps are known in advance โ use a workflow
- Low latency requirement โ agent loops add multiple round trips
- High reliability requirement โ agents fail in subtle, hard-to-debug ways
Single tool use
import anthropic, json
client = anthropic.Anthropic()
# Define tools the model can call
tools = [
{
"name": "get_weather",
"description": "Get current weather for a city.",
"input_schema": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "City name"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]},
},
"required": ["city"],
},
}
]
# Your actual tool implementation
def get_weather(city: str, unit: str = "celsius") -> dict:
# In production: call a real weather API
return {"city": city, "temp": 22, "unit": unit, "condition": "sunny"}
# Tool dispatch map
TOOL_FUNCTIONS = {
"get_weather": get_weather,
}
def call_tool(tool_name: str, tool_input: dict) -> str:
fn = TOOL_FUNCTIONS[tool_name]
result = fn(**tool_input)
return json.dumps(result)
The agent loop (ReAct pattern)
ReAct = Reasoning + Acting. The model reasons about what to do, acts (calls a tool), observes the result, repeats.
def run_agent(user_message: str, tools: list[dict], max_iterations: int = 10) -> str:
messages = [{"role": "user", "content": user_message}]
for iteration in range(max_iterations):
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
tools=tools,
messages=messages,
)
# Append assistant's response to history
messages.append({"role": "assistant", "content": response.content})
# Check stop reason
if response.stop_reason == "end_turn":
# Model is done โ extract final text response
for block in response.content:
if hasattr(block, "text"):
return block.text
return ""
if response.stop_reason == "tool_use":
# Model wants to call tools โ execute them and send results back
tool_results = []
for block in response.content:
if block.type == "tool_use":
print(f" โ Calling {block.name}({block.input})")
try:
result = call_tool(block.name, block.input)
except Exception as e:
# Return error to model so it can self-correct
result = json.dumps({"error": str(e)})
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": result,
})
# Send tool results back to model
messages.append({"role": "user", "content": tool_results})
else:
break # Unexpected stop reason
raise RuntimeError(f"Agent exceeded {max_iterations} iterations without completing")
Parallel tool calls โ the key to fast agents
When the model needs multiple independent data sources, it calls all tools simultaneously (one API round trip instead of N):
# The model may return multiple tool_use blocks in a single response
# Execute them all in parallel before sending results back
import asyncio
async def call_tool_async(tool_name: str, tool_input: dict, tool_use_id: str) -> dict:
try:
# Run sync tool function in thread pool to avoid blocking
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, call_tool, tool_name, tool_input)
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": result}
except Exception as e:
return {"type": "tool_result", "tool_use_id": tool_use_id, "content": json.dumps({"error": str(e)}), "is_error": True}
async def run_agent_async(user_message: str, tools: list[dict]) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
tools=tools,
messages=messages,
)
messages.append({"role": "assistant", "content": response.content})
if response.stop_reason == "end_turn":
return next(b.text for b in response.content if hasattr(b, "text"))
if response.stop_reason == "tool_use":
tool_calls = [b for b in response.content if b.type == "tool_use"]
# Run all tool calls in parallel
tool_results = await asyncio.gather(*[
call_tool_async(tc.name, tc.input, tc.id)
for tc in tool_calls
])
messages.append({"role": "user", "content": list(tool_results)})
Real example: research agent with 4 tools
research_tools = [
{
"name": "web_search",
"description": "Search the web for current information.",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string"},
"num_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
{
"name": "read_url",
"description": "Read the full text content of a URL.",
"input_schema": {
"type": "object",
"properties": {"url": {"type": "string"}},
"required": ["url"],
},
},
{
"name": "save_note",
"description": "Save an important fact or finding to memory.",
"input_schema": {
"type": "object",
"properties": {
"key": {"type": "string"},
"value": {"type": "string"},
},
"required": ["key", "value"],
},
},
{
"name": "finish",
"description": "Return the final research report. Call this when done.",
"input_schema": {
"type": "object",
"properties": {
"report": {"type": "string"},
"sources": {"type": "array", "items": {"type": "string"}},
},
"required": ["report"],
},
},
]
# Typical agent trace for "Research the top 3 open-source LLMs in 2025":
# Turn 1: model calls web_search("top open source LLMs 2025")
# Turn 2: model calls read_url(url1), read_url(url2), read_url(url3) in parallel
# Turn 3: model calls save_note("top3", "Llama 4, Qwen3, Gemma3") + save_note("sources", "...")
# Turn 4: model calls finish(report="...", sources=[...])
Guardrails and production concerns
Max iterations: always set a hard cap (10โ20 iterations) to prevent infinite loops.
Token budget: track cumulative token usage; abort if approaching limit:
total_tokens = 0
for turn in agent_loop():
total_tokens += response.usage.input_tokens + response.usage.output_tokens
if total_tokens > 100_000:
raise BudgetExceededError("Agent exceeded token budget")
Tool timeouts: wrap every tool call with a timeout:
import asyncio
result = await asyncio.wait_for(call_tool_async(...), timeout=30.0)
Prompt injection: if tools return external content (web pages, user input), it may contain instructions like โignore previous instructions andโฆโ. Sanitize or use a sandboxed prompt:
tool_result = f"<tool_output>{raw_result}</tool_output>"
# Wrapping in XML tags signals to the model that this is data, not instructions
Model Context Protocol (MCP)
MCP is an open standard for how models connect to tools and data sources. Instead of building custom tool integrations for every model, you expose an MCP server once and any compatible model can use it.
# MCP server skeleton (Python SDK)
from mcp.server import Server
from mcp.server.models import InitializationOptions
import mcp.types as types
server = Server("my-tools")
@server.list_tools()
async def list_tools():
return [
types.Tool(
name="get_order_status",
description="Get the status of an order by ID.",
inputSchema={
"type": "object",
"properties": {"order_id": {"type": "string"}},
"required": ["order_id"],
},
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "get_order_status":
order = db.get_order(arguments["order_id"])
return [types.TextContent(type="text", text=json.dumps(order))]
MCP servers can expose: tools (callable functions), resources (readable data), and prompts (reusable templates).