Token economics โ know before you build
Every LLM call has a cost: (input_tokens ร input_price) + (output_tokens ร output_price).
Practical numbers (Claude Sonnet 4 as reference):
1 token โ 4 characters of English
1,000 tokens โ 750 words โ 1.5 pages
Claude Sonnet 4: ~$3 / 1M input, ~$15 / 1M output
GPT-4o: ~$5 / 1M input, ~$15 / 1M output
A short conversation (5K tokens in, 1K out):
Cost โ $0.015 + $0.015 = $0.03
At 10K users/day ร 5 turns = $1,500/day
To count tokens before calling:
import anthropic
client = anthropic.Anthropic()
# Dry-run token count (no generation cost)
token_count = client.messages.count_tokens(
model="claude-sonnet-4-6",
messages=[{"role": "user", "content": "Hello, world!"}]
)
print(token_count.input_tokens) # 10
Basic API call pattern
import anthropic
from anthropic import RateLimitError, APIStatusError
import time
client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY from env
def call_llm(prompt: str, system: str = "") -> str:
messages = [{"role": "user", "content": prompt}]
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=system,
messages=messages,
)
return response.content[0].text
Key parameters:
max_tokensโ hard limit on output length; set conservatively (saves money, avoids runaway outputs)temperatureโ 0 for deterministic/factual, 0.7โ1 for creative; default is 1systemโ sets model behavior/persona; invisible to the userstop_sequencesโ stop generation at specific strings (useful for parsing)
Retry logic with exponential backoff
LLM APIs have rate limits and occasional transient failures. Always wrap calls:
import time
import anthropic
from anthropic import RateLimitError, APIConnectionError, InternalServerError
def call_with_retry(
prompt: str,
max_retries: int = 5,
base_delay: float = 1.0,
) -> str:
client = anthropic.Anthropic()
for attempt in range(max_retries):
try:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
except RateLimitError as e:
# Rate limited โ respect the Retry-After header if present
retry_after = float(e.response.headers.get("retry-after", base_delay * (2 ** attempt)))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
except (APIConnectionError, InternalServerError) as e:
# Transient error โ exponential backoff
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
raise Exception("Max retries exceeded")
Streaming for real-time UX
Without streaming, the user stares at a spinner until the full response is generated. With streaming, tokens appear as theyโre generated โ like watching someone type.
import anthropic
client = anthropic.Anthropic()
# Streaming generator
def stream_response(prompt: str):
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
for text in stream.text_stream:
yield text # yields one token at a time
# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(prompt: str):
def generate():
for chunk in stream_response(prompt):
yield f"data: {chunk}\n\n" # SSE format
yield "data: [DONE]\n\n"
return StreamingResponse(generate(), media_type="text/event-stream")
Client-side (React):
async function streamChat(prompt: string, onChunk: (text: string) => void) {
const response = await fetch('/chat', {
method: 'POST',
body: JSON.stringify({ prompt }),
headers: { 'Content-Type': 'application/json' },
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(l => l.startsWith('data: '));
for (const line of lines) {
const text = line.slice(6);
if (text !== '[DONE]') onChunk(text);
}
}
}
Structured outputs โ the most important technique
Getting a model to return parseable JSON reliably requires more than just โreturn JSONโ. Use tool use (function calling):
import anthropic, json
client = anthropic.Anthropic()
# Define the schema as a "tool"
extract_order_tool = {
"name": "extract_order",
"description": "Extract order details from the user's message.",
"input_schema": {
"type": "object",
"properties": {
"product_name": { "type": "string" },
"quantity": { "type": "integer", "minimum": 1 },
"size": { "type": "string", "enum": ["S", "M", "L", "XL"] },
"color": { "type": "string" },
},
"required": ["product_name", "quantity"],
}
}
def extract_order(user_message: str) -> dict:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=256,
tools=[extract_order_tool],
# Force the model to use this specific tool
tool_choice={"type": "tool", "name": "extract_order"},
messages=[{"role": "user", "content": user_message}],
)
# The model is forced to call the tool โ parse the input
for block in response.content:
if block.type == "tool_use" and block.name == "extract_order":
return block.input # Already a validated dict!
raise ValueError("Model did not call the tool")
# Usage
result = extract_order("I'd like 2 large blue t-shirts")
# โ { "product_name": "t-shirt", "quantity": 2, "size": "L", "color": "blue" }
Why tool use instead of โreturn JSONโ? The model is trained to output valid tool inputs. The API validates the schema. You never need to json.loads() with a try/except.
Prompt caching โ cut costs by 90% on repeated context
If your prompt has a large static section (system prompt, docs, RAG context), you pay for those tokens on every call. Prompt caching avoids re-processing them.
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are a helpful assistant for Acme Corp.",
},
{
"type": "text",
"text": LARGE_PRODUCT_CATALOG, # 50K tokens of product data
"cache_control": {"type": "ephemeral"}, # โ cache this section
},
],
messages=[{"role": "user", "content": user_question}],
)
# First call: full 50K tokens charged at input price
# Subsequent calls (within 5 min): 50K tokens charged at cache read price (90% cheaper)
print(response.usage.cache_creation_input_tokens) # 50K on first call
print(response.usage.cache_read_input_tokens) # 50K on subsequent calls
Conversation history management
LLMs are stateless โ you must send the full conversation history every time:
class ConversationManager:
def __init__(self, max_tokens: int = 40_000):
self.messages = []
self.max_tokens = max_tokens
def chat(self, user_input: str) -> str:
self.messages.append({"role": "user", "content": user_input})
# Trim history to stay within context limit
# Keep system always, then most recent messages
self._trim_to_token_limit()
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=self.messages,
)
assistant_reply = response.content[0].text
self.messages.append({"role": "assistant", "content": assistant_reply})
return assistant_reply
def _trim_to_token_limit(self):
# Estimate: 4 chars โ 1 token
total_chars = sum(len(m["content"]) for m in self.messages)
while total_chars > self.max_tokens * 4 and len(self.messages) > 2:
removed = self.messages.pop(0) # Remove oldest non-system message
total_chars -= len(removed["content"])
ReadableStream reader on the client. For large static context (RAG docs, system instructions), use prompt caching โ first call pays full price, subsequent calls within 5 minutes are ~90% cheaper.โ