Phase 1 โ€” LLM APIs & Prompting Engineering

Call LLM APIs correctly, treat tokens as money, implement streaming, and make structured outputs reliable with tool use.

must medium โฑ 35 min llmapispromptingstructured-outputsstreamingtokensclaudeopenai
Mastery:
Why interviewers ask this
Every AI engineering role starts here โ€” calling APIs reliably, managing cost, and getting deterministic structured outputs from non-deterministic models.

Token economics โ€” know before you build

Every LLM call has a cost: (input_tokens ร— input_price) + (output_tokens ร— output_price).

Practical numbers (Claude Sonnet 4 as reference):

1 token โ‰ˆ 4 characters of English
1,000 tokens โ‰ˆ 750 words โ‰ˆ 1.5 pages

Claude Sonnet 4: ~$3 / 1M input, ~$15 / 1M output
GPT-4o:         ~$5 / 1M input, ~$15 / 1M output

A short conversation (5K tokens in, 1K out):
  Cost โ‰ˆ $0.015 + $0.015 = $0.03
  At 10K users/day ร— 5 turns = $1,500/day

To count tokens before calling:

import anthropic
client = anthropic.Anthropic()

# Dry-run token count (no generation cost)
token_count = client.messages.count_tokens(
    model="claude-sonnet-4-6",
    messages=[{"role": "user", "content": "Hello, world!"}]
)
print(token_count.input_tokens)  # 10

Basic API call pattern

import anthropic
from anthropic import RateLimitError, APIStatusError
import time

client = anthropic.Anthropic()  # reads ANTHROPIC_API_KEY from env

def call_llm(prompt: str, system: str = "") -> str:
    messages = [{"role": "user", "content": prompt}]
    
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        system=system,
        messages=messages,
    )
    return response.content[0].text

Key parameters:

  • max_tokens โ€” hard limit on output length; set conservatively (saves money, avoids runaway outputs)
  • temperature โ€” 0 for deterministic/factual, 0.7โ€“1 for creative; default is 1
  • system โ€” sets model behavior/persona; invisible to the user
  • stop_sequences โ€” stop generation at specific strings (useful for parsing)

Retry logic with exponential backoff

LLM APIs have rate limits and occasional transient failures. Always wrap calls:

import time
import anthropic
from anthropic import RateLimitError, APIConnectionError, InternalServerError

def call_with_retry(
    prompt: str,
    max_retries: int = 5,
    base_delay: float = 1.0,
) -> str:
    client = anthropic.Anthropic()
    
    for attempt in range(max_retries):
        try:
            response = client.messages.create(
                model="claude-sonnet-4-6",
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}],
            )
            return response.content[0].text
            
        except RateLimitError as e:
            # Rate limited โ€” respect the Retry-After header if present
            retry_after = float(e.response.headers.get("retry-after", base_delay * (2 ** attempt)))
            print(f"Rate limited. Waiting {retry_after}s...")
            time.sleep(retry_after)
            
        except (APIConnectionError, InternalServerError) as e:
            # Transient error โ€” exponential backoff
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"Attempt {attempt+1} failed: {e}. Retrying in {delay}s...")
            time.sleep(delay)
    
    raise Exception("Max retries exceeded")

Streaming for real-time UX

Without streaming, the user stares at a spinner until the full response is generated. With streaming, tokens appear as theyโ€™re generated โ€” like watching someone type.

import anthropic

client = anthropic.Anthropic()

# Streaming generator
def stream_response(prompt: str):
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        for text in stream.text_stream:
            yield text  # yields one token at a time

# FastAPI SSE endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(prompt: str):
    def generate():
        for chunk in stream_response(prompt):
            yield f"data: {chunk}\n\n"  # SSE format
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(generate(), media_type="text/event-stream")

Client-side (React):

async function streamChat(prompt: string, onChunk: (text: string) => void) {
  const response = await fetch('/chat', {
    method: 'POST',
    body: JSON.stringify({ prompt }),
    headers: { 'Content-Type': 'application/json' },
  });
  
  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const chunk = decoder.decode(value);
    const lines = chunk.split('\n').filter(l => l.startsWith('data: '));
    for (const line of lines) {
      const text = line.slice(6);
      if (text !== '[DONE]') onChunk(text);
    }
  }
}

Structured outputs โ€” the most important technique

Getting a model to return parseable JSON reliably requires more than just โ€œreturn JSONโ€. Use tool use (function calling):

import anthropic, json

client = anthropic.Anthropic()

# Define the schema as a "tool"
extract_order_tool = {
    "name": "extract_order",
    "description": "Extract order details from the user's message.",
    "input_schema": {
        "type": "object",
        "properties": {
            "product_name": { "type": "string" },
            "quantity": { "type": "integer", "minimum": 1 },
            "size": { "type": "string", "enum": ["S", "M", "L", "XL"] },
            "color": { "type": "string" },
        },
        "required": ["product_name", "quantity"],
    }
}

def extract_order(user_message: str) -> dict:
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=256,
        tools=[extract_order_tool],
        # Force the model to use this specific tool
        tool_choice={"type": "tool", "name": "extract_order"},
        messages=[{"role": "user", "content": user_message}],
    )
    
    # The model is forced to call the tool โ€” parse the input
    for block in response.content:
        if block.type == "tool_use" and block.name == "extract_order":
            return block.input  # Already a validated dict!
    
    raise ValueError("Model did not call the tool")

# Usage
result = extract_order("I'd like 2 large blue t-shirts")
# โ†’ { "product_name": "t-shirt", "quantity": 2, "size": "L", "color": "blue" }

Why tool use instead of โ€œreturn JSONโ€? The model is trained to output valid tool inputs. The API validates the schema. You never need to json.loads() with a try/except.


Prompt caching โ€” cut costs by 90% on repeated context

If your prompt has a large static section (system prompt, docs, RAG context), you pay for those tokens on every call. Prompt caching avoids re-processing them.

response = client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "You are a helpful assistant for Acme Corp.",
        },
        {
            "type": "text",
            "text": LARGE_PRODUCT_CATALOG,  # 50K tokens of product data
            "cache_control": {"type": "ephemeral"},  # โ† cache this section
        },
    ],
    messages=[{"role": "user", "content": user_question}],
)

# First call: full 50K tokens charged at input price
# Subsequent calls (within 5 min): 50K tokens charged at cache read price (90% cheaper)
print(response.usage.cache_creation_input_tokens)  # 50K on first call
print(response.usage.cache_read_input_tokens)       # 50K on subsequent calls

Conversation history management

LLMs are stateless โ€” you must send the full conversation history every time:

class ConversationManager:
    def __init__(self, max_tokens: int = 40_000):
        self.messages = []
        self.max_tokens = max_tokens
    
    def chat(self, user_input: str) -> str:
        self.messages.append({"role": "user", "content": user_input})
        
        # Trim history to stay within context limit
        # Keep system always, then most recent messages
        self._trim_to_token_limit()
        
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=self.messages,
        )
        
        assistant_reply = response.content[0].text
        self.messages.append({"role": "assistant", "content": assistant_reply})
        return assistant_reply
    
    def _trim_to_token_limit(self):
        # Estimate: 4 chars โ‰ˆ 1 token
        total_chars = sum(len(m["content"]) for m in self.messages)
        while total_chars > self.max_tokens * 4 and len(self.messages) > 2:
            removed = self.messages.pop(0)  # Remove oldest non-system message
            total_chars -= len(removed["content"])

Say it out loud
โ€œThe three things that matter most in production: token counting (know your cost before you ship), retry logic with exponential backoff (LLM APIs have rate limits and transient failures), and tool use for structured output (never parse free-text JSON โ€” force the model to call a typed tool). For streaming, use SSE on the server and a ReadableStream reader on the client. For large static context (RAG docs, system instructions), use prompt caching โ€” first call pays full price, subsequent calls within 5 minutes are ~90% cheaper.โ€

Likely follow-up questions
  • How do you implement streaming for a chat UI?
  • How do you get structured JSON output reliably?
  • How do you implement retry logic with exponential backoff?
  • What is prompt caching and when does it save money?

References