Build With AgentCore Challenge
AGENT
FORGE
A hands-on curriculum covering all eight enterprise AI agent categories across AWS, Azure, and Google Cloud — with real exercises, deployment scripts, and production-grade patterns.
08
Categories
40+
Services
32
Exercises
03
Cloud Providers
Filter by cloud Click any card to expand exercises →
AGENT CATEGORIES
8 PATTERNS · 3 CLOUDS
01💬
CONVERSATIONAL & COPILOT ASSISTANTS
Enterprise knowledge assistants and workflow copilots across all three major clouds — from internal helpdesk bots to fully embedded Copilot experiences.
Amazon Q Business Amazon Lex Amazon Connect Microsoft Copilot Azure AI Bot Service Gemini for GCP Google Workspace Gemini Dialogflow
Architecture Pattern
CONV AGENT FLOW
User query → Amazon Lex / Dialogflow [NLU intent]
Intent routing → Lambda orchestrator [conditional]
Context fetch → DynamoDB session store [memory]
LLM call → Bedrock / Azure OpenAI [generation]
Response → channel (Slack / Web / Connect) [delivery]
Key Concepts
→ Intent classification + slot filling
→ Session persistence across turns
→ Omnichannel routing (web / voice / email)
→ Fallback escalation to human agents
→ Tone guardrails + content filtering
Implementation Steps
PREREQUISITES
  • AWS CLI v2 configured with AdministratorAccess (scope down before production)
  • Python 3.11+, boto3, aws-cdk-lib installed
  • S3 bucket with at least 20 FAQ documents (PDF or TXT)
  • DynamoDB table: AgentForgeSessions (PK: sessionId, SK: timestamp)
  • Bedrock model access enabled: Claude 3 Sonnet in your region
1Design Your Intent Taxonomy
Write a domain analysis document before touching the console. List every user intent your bot must handle. Group them into primary intents (direct answer), secondary intents (clarification needed), and fallback intents (hand-off to human). Define 3–5 slot types per intent.
json
# taxonomy.json
{
  "intents": [
    {
      "name": "CheckOrderStatus",
      "slots": ["orderId", "email"],
      "confidence_threshold": 0.75,
      "fallback_action": "escalate_human"
    },
    {
      "name": "ActivateRoaming",
      "slots": ["destination_country", "plan_type"],
      "confidence_threshold": 0.80,
      "fallback_action": "send_guide_link"
    }
  ]
}
VALIDATE: Review with a domain expert. Every intent must map to a measurable success outcome before you build.
PITFALL: Teams that skip taxonomy and go straight to the console create bots with overlapping intents and 40%+ misclassification rates.
2Create the Lex V2 Bot and Configure Locales
Create the bot shell using AWS CLI. Set confidence score threshold at 0.70 — below this, Lex returns FALLBACK_INTENT. Enable multi-locale from day one.
bash
aws lexv2-models create-bot \
  --bot-name "AgentForgeCopilot" \
  --description "Enterprise FAQ and workflow copilot" \
  --role-arn "arn:aws:iam::ACCOUNT_ID:role/LexBotRole" \
  --data-privacy '{"childDirected": false}' \
  --idle-session-ttl-in-seconds 900

aws lexv2-models create-bot-locale \
  --bot-id $BOT_ID --bot-version "DRAFT" \
  --locale-id "en_US" \
  --nlu-intent-confidence-threshold 0.70 \
  --voice-settings '{"voiceId":"Joanna","engine":"neural"}'
VALIDATE: Run aws lexv2-models describe-bot --bot-id $BOT_ID and confirm status is "Available."
PITFALL: Forgetting to set idle-session-ttl-in-seconds causes sessions to expire after 5 minutes by default, breaking multi-turn conversations.
3Build Custom Slot Types Programmatically
Define slot types via JSON files rather than the console — this makes them version-controllable. Create one slot type per entity category.
bash
{
  "slotTypeName": "ProductType",
  "valueSelectionSetting": {
    "resolutionStrategy": "OriginalValue"
  },
  "slotTypeValues": [
    {"sampleValue": {"value": "enterprise"},
     "synonyms": [{"value": "business"}]},
    {"sampleValue": {"value": "starter"},
     "synonyms": [{"value": "free tier"}]},
    {"sampleValue": {"value": "professional"}}
  ]
}

# Then create:
aws lexv2-models create-slot-type \
  --bot-id $BOT_ID --bot-version "DRAFT" \
  --locale-id "en_US" \
  --cli-input-json file://slot_type_product.json
VALIDATE: Run aws lexv2-models list-slot-types --bot-id $BOT_ID --bot-version DRAFT --locale-id en_US and confirm all types appear.
PITFALL: Using AMAZON.AlphaNumeric for business entities loses synonym resolution. Always define custom slot types for domain-specific terms.
4Build the Lambda Fulfillment Handler
The Lambda receives a structured event from Lex containing intent name, slot values, session attributes, and confidence scores. Structure the handler as a router — one function per intent.
python
import boto3, json, os

bedrock = boto3.client("bedrock-runtime",
    region_name=os.environ["AWS_REGION"])
dynamodb = boto3.resource("dynamodb")
sessions = dynamodb.Table(os.environ["SESSIONS_TABLE"])

INTENT_HANDLERS = {
    "CheckOrderStatus": handle_order_status,
    "ActivateRoaming":  handle_roaming,
    "FallbackIntent":   handle_escalation,
}

def lambda_handler(event, context):
    intent = event["sessionState"]["intent"]["name"]
    slots  = event["sessionState"]["intent"]["slots"]
    sid    = event["sessionId"]
    conf   = event["sessionState"]["intent"] \
        .get("nluConfidence", {}).get("score", 0)

    sessions.put_item(Item={
        "sessionId": sid,
        "timestamp": int(context.get_remaining_time_in_millis()),
        "intent": intent,
        "slots": json.dumps(slots),
        "confidence": str(conf)
    })
    handler = INTENT_HANDLERS.get(intent, handle_escalation)
    return handler(slots, sid, event)
VALIDATE: Deploy and invoke with aws lambda invoke --function-name AgentForgeLex --payload file://test_event.json out.json. Confirm valid Lex V2 dialogAction structure.
PITFALL: Returning a plain string from Lambda breaks Lex. The response MUST be a fully structured Lex V2 response object with sessionState and messages.
5Implement RAG-Powered Knowledge Retrieval
Embed the user query, search your OpenSearch Serverless collection, retrieve top-5 chunks, and pass them as context to Bedrock Claude. Never send the raw user message to the LLM without retrieved context.
python
def rag_query(query_text: str, top_k: int = 5) -> str:
    embed_resp = bedrock.invoke_model(
        modelId="amazon.titan-embed-text-v2:0",
        body=json.dumps({"inputText": query_text})
    )
    vector = json.loads(
        embed_resp["body"].read())["embedding"]

    search_body = {
        "size": top_k,
        "query": {"knn": {
            "embedding": {"vector": vector, "k": top_k}
        }},
        "_source": ["text", "source", "chunk_id"]
    }
    resp = requests.post(
        f"{os.environ['OPENSEARCH_ENDPOINT']}"
        f"/knowledge-index/_search",
        headers={"Content-Type": "application/json"},
        json=search_body)
    hits = resp.json()["hits"]["hits"]
    return "\n\n".join(
        [h["_source"]["text"] for h in hits])
VALIDATE: Run a known question and confirm the retrieved chunks contain the expected source document name in the source field.
PITFALL: Embedding at query time with no caching adds 200–400ms per turn. Cache embeddings for the 100 most common queries in ElastiCache.
6Add DynamoDB Multi-Turn Session Management
Store the full conversation window in DynamoDB with a ring-buffer pattern: keep the last 10 turns per session. Inject this history into every Bedrock prompt as a "conversation so far" block.
python
def get_session_history(session_id, window=10):
    resp = sessions.query(
        KeyConditionExpression="sessionId = :sid",
        ExpressionAttributeValues={":sid": session_id},
        ScanIndexForward=False, Limit=window)
    turns = list(reversed(resp["Items"]))
    return [{"role": t["role"],
             "content": t["content"]} for t in turns]

def build_prompt(history, context, user_msg):
    messages = history.copy()
    messages.append({"role": "user", "content":
        f"Context from knowledge base:\n{context}"
        f"\n\nUser question: {user_msg}"})
    return messages
VALIDATE: Send 5 sequential messages in the same session. Query DynamoDB and confirm 5 items with the same sessionId.
PITFALL: Using Lex session attributes for memory limits you to 25KB and breaks on long conversations. Use DynamoDB for all state.
7Wire Amazon Comprehend Sentiment Analysis
Before every Bedrock call, run the user message through Comprehend Sentiment. Track sentiment across turns. If sentiment is NEGATIVE for two consecutive turns, set an escalation flag.
python
comprehend = boto3.client("comprehend")

def analyze_sentiment(text: str) -> dict:
    resp = comprehend.detect_sentiment(
        Text=text, LanguageCode="en")
    return {
        "sentiment": resp["Sentiment"],
        "score": resp["SentimentScore"]["Negative"]
    }

def should_escalate(session_id, current_score):
    history = get_session_history(session_id, window=2)
    neg = sum(1 for t in history
        if float(t.get("negative_score", 0)) > 0.6)
    return current_score > 0.6 and neg >= 1
VALIDATE: Send "This is absolutely terrible, nothing works" and confirm the sentiment score exceeds 0.6 and the escalation flag is set.
PITFALL: Escalating on a single negative turn generates false positives. Always require two consecutive high-negative-score turns before escalating.
8Build the Omnichannel Adapter Layer
A single Lambda adapter handles messages from Slack, Web Widget, and Amazon Connect by normalizing the channel-specific envelope into a canonical message format.
python
CHANNEL_SCHEMAS = {
    "slack": lambda e: {
        "text": e["event"]["text"],
        "userId": e["event"]["user"]},
    "connect": lambda e: {
        "text": e["Details"]["ContactData"]
                 ["Attributes"]["query"],
        "userId": e["Details"]["ContactData"]
                   ["ContactId"]},
    "web": lambda e: {
        "text": e["body"]["message"],
        "userId": e["body"]["userId"]},
}

def normalize_event(raw_event, channel):
    parser = CHANNEL_SCHEMAS.get(channel)
    if not parser:
        raise ValueError(f"Unknown channel: {channel}")
    canonical = parser(raw_event)
    canonical["channel"] = channel
    return canonical
VALIDATE: Invoke the adapter with a mocked Slack event and confirm the canonical message structure matches {"text": ..., "userId": ..., "channel": "slack"}.
PITFALL: Amazon Connect passes the user message as a Contact Attribute, not in the event body. The attribute key must be configured in the contact flow.
9Implement Human Hand-Off via Amazon Connect
When escalation is triggered, use the Amazon Connect Start Task API to create a task with the full conversation summary pre-loaded. The human agent sees the bot summary before the customer connects.
python
connect_client = boto3.client("connect")

def escalate_to_human(session_id, summary,
                      contact_flow_id):
    resp = connect_client.start_task_contact(
        InstanceId=os.environ["CONNECT_INSTANCE_ID"],
        ContactFlowId=contact_flow_id,
        Name=f"Bot Escalation - {session_id[:8]}",
        Description=summary,
        Attributes={
            "sessionId": session_id,
            "escalation_reason": "sentiment_negative",
            "bot_summary": summary[:4000]
        },
        TaskTemplateId=os.environ["TASK_TEMPLATE_ID"])
    return resp["ContactId"]
VALIDATE: Trigger an escalation and confirm a new task appears in the Amazon Connect agent workspace with the correct summary.
PITFALL: Connect tasks have a 4000-character attribute limit. Always truncate bot_summary and store the full transcript in DynamoDB instead.
10Add Bedrock Guardrails for Content Safety
Wrap every outbound Bedrock call with a Guardrail configuration. Define denied topics, word filters, and PII redaction for both input and output.
python
guardrail = bedrock_client.create_guardrail(
    name="AgentForgeCopilotGuardrail",
    topicPolicyConfig={"topicsConfig": [
        {"name": "LegalAdvice",
         "definition": "Advice about lawsuits or liability",
         "examples": ["Can I sue them?"],
         "type": "DENY"},
        {"name": "CompetitorMention",
         "definition": "References to competing products",
         "type": "DENY"}
    ]},
    sensitiveInformationPolicyConfig={
        "piiEntitiesConfig": [
            {"type": "EMAIL", "action": "ANONYMIZE"},
            {"type": "PHONE", "action": "ANONYMIZE"},
            {"type": "NAME",  "action": "ANONYMIZE"}
    ]},
    contentPolicyConfig={"filtersConfig": [
        {"type": "HATE",
         "inputStrength": "HIGH",
         "outputStrength": "HIGH"},
        {"type": "VIOLENCE",
         "inputStrength": "HIGH",
         "outputStrength": "HIGH"}
    ]})
VALIDATE: Send a message containing a competitor name and confirm the response is blocked with "GUARDRAIL_INTERVENED" in response metadata.
PITFALL: Guardrails add 80–150ms of latency per call. Measure p99 latency in a load test before enabling in production.
11Instrument Full Observability
Every invocation must emit a structured JSON log to CloudWatch with: sessionId, intent, confidence, sentiment, rag_chunks_used, bedrock tokens, latency_ms, escalated, guardrail_triggered.
python
import time, json, logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def emit_telemetry(session_id, intent, confidence,
    sentiment, chunks, in_tok, out_tok,
    start_ms, escalated, guardrail):
    logger.info(json.dumps({
        "event": "agent_turn_complete",
        "sessionId": session_id,
        "intent": intent,
        "nluConfidence": confidence,
        "sentiment": sentiment,
        "rag_chunks_used": chunks,
        "bedrock_input_tokens": in_tok,
        "bedrock_output_tokens": out_tok,
        "latency_ms": int(
            (time.time()*1000) - start_ms),
        "escalated": escalated,
        "guardrail_triggered": guardrail,
        "estimated_cost_usd": round(
            (in_tok*0.000003)
            + (out_tok*0.000015), 6)
    }))
VALIDATE: Run aws logs tail /aws/lambda/AgentForgeLex --since 1m --format json and confirm all 11 fields are present.
PITFALL: Using print() instead of logger.info() in Lambda loses structured JSON — CloudWatch metric filters fail.
12Build CloudWatch Dashboards and Alarms
Create a monitoring dashboard with four widgets: Invocations/min, Escalation Rate %, p50/p95/p99 Latency, Cost/hour. Set alarms for escalation rate, latency, error rate, and daily cost.
bash
aws cloudwatch put-metric-alarm \
  --alarm-name "AgentForge-EscalationRateHigh" \
  --metric-name "EscalationRate" \
  --namespace "AgentForge/Copilot" \
  --statistic Average --period 300 \
  --threshold 15 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 2 \
  --alarm-actions \
    "arn:aws:sns:us-east-1:ACCOUNT:AgentForgeAlerts" \
  --treat-missing-data notBreaching
VALIDATE: Manually trigger alarm state with aws cloudwatch set-alarm-state and confirm SNS notification arrives in Slack within 60 seconds.
PITFALL: Setting evaluation-periods to 1 causes alarm flapping on transient spikes. Always require 2 consecutive breach periods.
Azure Implementation Path

Replace Amazon Lex with Azure AI Bot Service + Microsoft Copilot Studio for intent classification and dialogue management. Use Azure AI Language (CLU) for custom NLU models with the same intent taxonomy. Session state moves from DynamoDB to Cosmos DB (partition key: sessionId). RAG retrieval uses Azure AI Search (vector + hybrid) with Azure OpenAI Embeddings. LLM generation swaps to Azure OpenAI GPT-4. Sentiment analysis via Azure AI Language Sentiment. Omnichannel delivery through Teams, Power Virtual Agents web chat, and Dynamics 365 Contact Center for voice + human hand-off. Guardrails via Azure AI Content Safety.

Azure AI Bot Service Copilot Studio Azure AI Language (CLU) Cosmos DB Azure AI Search Azure OpenAI Azure AI Content Safety Dynamics 365 Contact Center
bash
az bot create --resource-group agentforge-rg \
  --name AgentForgeCopilot --kind webapp \
  --sku S1 --location eastus

az cognitiveservices account create \
  --name agentforge-language \
  --resource-group agentforge-rg \
  --kind TextAnalytics --sku S \
  --location eastus

az search service create \
  --name agentforge-search \
  --resource-group agentforge-rg \
  --sku standard --partition-count 1
GCP Implementation Path

Replace Amazon Lex with Dialogflow CX for intent classification with advanced flow-based conversation design. Session state in Firestore (document: sessions/{sessionId}). RAG retrieval via Vertex AI Search (vector + keyword hybrid) with Vertex AI Embeddings. LLM generation with Gemini Pro via Vertex AI. Sentiment analysis through Cloud Natural Language API. Omnichannel: Dialogflow CX Messenger (web), Dialogflow CX Phone Gateway (voice), and Google Chat integration. Content filtering via Vertex AI Safety Filters. Human hand-off via Contact Center AI (CCAI).

Dialogflow CX Firestore Vertex AI Search Vertex AI (Gemini Pro) Cloud Natural Language Contact Center AI Google Chat
bash
gcloud dialogflow cx agents create \
  --display-name="AgentForgeCopilot" \
  --location=us-central1 \
  --default-language-code=en \
  --time-zone="America/New_York"

gcloud ai endpoints create \
  --display-name=agentforge-embedding \
  --region=us-central1

gcloud alpha contact-center-insights \
  operations list --location=us-central1
PRODUCTION CHECKLIST — NODE 01
Estimated Lab Time: 6–8 hours (Intermediate to Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 1.1
Build a multi-turn FAQ bot with Amazon Lex + Lambda
Create a Lex bot with 5 custom intents, wire a Lambda fulfillment function, and add DynamoDB session persistence. Test with 10 conversation turns.
aws lex-models put-bot --name AgentForgeFAQ \
  --locale en-US --child-directed false
Intermediate Exercise 1.2
Wire Amazon Q Business to your internal knowledge base
Connect Amazon Q Business to an S3 document corpus. Configure IAM identity-aware retrieval. Measure response accuracy against 20 known Q&A pairs.
Advanced Exercise 1.3
Omnichannel copilot: Slack + Web + Amazon Connect
Deploy the same agent across three channels using a unified Lambda adapter. Maintain cross-channel session state. Add graceful human hand-off via Amazon Connect.
Stretch Exercise 1.4
Tone analysis + escalation classifier
Use Amazon Comprehend sentiment analysis on every user turn. Auto-escalate to a human agent when sentiment score drops below -0.6 for two consecutive turns.
02🤖
AUTONOMOUS TASK AGENTS
Multi-step task execution agents with LLM-powered orchestration and event-driven triggers — capable of planning, tool-calling, and self-correcting over long horizons.
Bedrock Agents Step Functions + LLM Lambda Event Agents Azure AI Agent Service Logic Apps Orchestrators Vertex AI Agent Builder
Architecture Pattern
REACT LOOP
Goal input → Bedrock Agent [plan]
Thought → Tool selection [reason]
Tool call → Action Group Lambda [act]
Observation → model context [observe]
Loop until stopReason = end_turn [terminate]
Key Concepts
→ ReAct: Reason + Act + Observe loop
→ Action Groups as typed tool schemas
→ Knowledge Bases for grounded context
→ Step Functions for deterministic branching
→ Guardrails for output validation
Implementation Steps
PREREQUISITES
  • Bedrock Agent access enabled; model access for Claude 3 Sonnet confirmed
  • IAM role for Bedrock Agent with lambda:InvokeFunction, s3:GetObject, s3:PutObject, dynamodb:PutItem, bedrock:InvokeModel
  • OpenSearch Serverless collection created (type: VECTORSEARCH)
  • AgentCore CLI installed: pip install amazon-bedrock-agentcore-sdk
1Define Action Group Schemas with OpenAPI
Every tool your agent can call must be defined as an OpenAPI 3.0 schema. Write one YAML file per Action Group. The schema drives the LLM tool-calling decisions — a poorly written schema leads to wrong tool selection.
yaml
# action_group_calendar.yaml
openapi: 3.0.0
info:
  title: CalendarActions
  description: "Tools for creating and querying
    calendar events."
  version: 1.0.0
paths:
  /create_event:
    post:
      operationId: create_event
      description: "Creates a new calendar event.
        Call ONLY after start_time and attendees
        are confirmed."
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              required:
                - title
                - start_time
                - duration_minutes
                - attendees
              properties:
                title:
                  type: string
                start_time:
                  type: string
                  format: date-time
                duration_minutes:
                  type: integer
                  minimum: 15
                  maximum: 480
                attendees:
                  type: array
                  items:
                    type: string
                    format: email
VALIDATE: Run npx @redocly/openapi-cli lint action_group_calendar.yaml and confirm no errors.
PITFALL: Vague operationId descriptions (e.g., "manages events") cause the LLM to guess when to call the tool. Write descriptions as decision rules.
2Build Lambda Action Group Handlers
Each Action Group maps to a Lambda function. The function must return a response body that matches the OpenAPI schema — Bedrock Agents validate the structure. Wrap all handlers in try/except that returns structured error.
python
import json

def lambda_handler(event, context):
    action_group = event["actionGroup"]
    function     = event["function"]
    parameters   = {p["name"]: p["value"]
        for p in event.get("parameters", [])}

    dispatch = {
        "create_event":     handle_create_event,
        "get_availability": handle_get_availability,
        "send_email":       handle_send_email,
    }
    handler = dispatch.get(function)
    if not handler:
        return build_response(action_group, function,
            {"error": f"Unknown: {function}"})
    try:
        result = handler(parameters)
        return build_response(
            action_group, function, result)
    except Exception as e:
        return build_response(action_group, function,
            {"error": str(e), "retry_safe": True})

def build_response(ag, fn, body):
    return {
        "actionGroup": ag, "function": fn,
        "functionResponse": {"responseBody": {
            "TEXT": {"body": json.dumps(body)}}}
    }
VALIDATE: Invoke Lambda directly with a mock Bedrock Agent event and confirm functionResponse.responseBody.TEXT.body is valid JSON.
PITFALL: Raising exceptions from Action Group Lambdas causes Bedrock to treat the tool call as a hard failure. Always return a structured error body.
3Create the Bedrock Agent and Attach Action Groups
Create the agent via Boto3 rather than the console for reproducibility. Set the instruction carefully — this is the agent’s constitution. It must define identity, capabilities, tools, and behavioral rules.
python
import boto3, os
bedrock_agent = boto3.client("bedrock-agent")

agent = bedrock_agent.create_agent(
    agentName="AgentForgeTaskAgent",
    foundationModel=
        "anthropic.claude-3-sonnet-20240229-v1:0",
    agentResourceRoleArn=
        os.environ["AGENT_ROLE_ARN"],
    instruction="""You are an enterprise task agent.
Rules:
1. Confirm high-impact actions before executing.
2. If a required parameter is missing, ask.
3. After each tool call, explain what you did.
4. If a tool returns retry_safe=true, retry once.
5. If a tool fails twice, stop and explain.
6. Never reveal internal tool names to the user.
7. Maximum 8 tool calls per request.""",
    idleSessionTTLInSeconds=1800)
AGENT_ID = agent["agent"]["agentId"]
VALIDATE: Describe the agent and confirm foundationModel is set, instruction length > 300 chars, and status is "NOT_PREPARED".
PITFALL: Using a single-sentence instruction produces an unfocused agent. The instruction must be a complete policy document — typically 300–800 words.
4Attach a Bedrock Knowledge Base
Create a Knowledge Base backed by OpenSearch Serverless. The agent uses this to retrieve facts before deciding on tool calls. Configure chunking at 512 tokens with 20% overlap.
python
kb = bedrock_agent.create_knowledge_base(
    name="AgentForgeKB",
    roleArn=os.environ["KB_ROLE_ARN"],
    knowledgeBaseConfiguration={
        "type": "VECTOR",
        "vectorKnowledgeBaseConfiguration": {
            "embeddingModelArn":
              "arn:aws:bedrock:us-east-1::foundation-model/"
              "amazon.titan-embed-text-v2:0"
        }
    },
    storageConfiguration={
        "type": "OPENSEARCH_SERVERLESS",
        "opensearchServerlessConfiguration": {
            "collectionArn":
                os.environ["OPENSEARCH_COLLECTION_ARN"],
            "vectorIndexName": "agentforge-kb-index",
            "fieldMapping": {
                "vectorField": "embedding",
                "textField": "text",
                "metadataField": "metadata"
            }
        }
    })
KB_ID = kb["knowledgeBase"]["knowledgeBaseId"]

bedrock_agent.associate_agent_knowledge_base(
    agentId=AGENT_ID, agentVersion="DRAFT",
    knowledgeBaseId=KB_ID,
    description="Company policy docs and FAQ",
    knowledgeBaseState="ENABLED")
VALIDATE: Trigger a sync job and poll until status is "COMPLETE".
PITFALL: Associating a Knowledge Base before the agent is prepared causes an "AGENT_NOT_PREPARED" error. Always prepare after all resources are attached.
5Configure Bedrock Guardrails and Attach to Agent
Apply guardrails to both input processing and output generation. Define denied topics, add word policy for internal codenames, and block sensitive credentials.
python
guardrail = boto3.client("bedrock").create_guardrail(
    name="TaskAgentGuardrail",
    blockedInputMessaging=
        "I can't help with that specific request.",
    blockedOutputsMessaging=
        "I can't share that information.",
    topicPolicyConfig={"topicsConfig": [
        {"name": "FinancialAdvice",
         "definition": "Advice on investments or trading",
         "type": "DENY"}
    ]},
    sensitiveInformationPolicyConfig={
        "piiEntitiesConfig": [
            {"type": "AWS_ACCESS_KEY", "action": "BLOCK"},
            {"type": "PASSWORD", "action": "BLOCK"},
            {"type": "EMAIL", "action": "ANONYMIZE"}
    ]},
    wordPolicyConfig={
        "managedWordListsConfig": [
            {"type": "PROFANITY"}],
        "wordsConfig": [
            {"text": "INTERNAL_CODENAME_HERMES"}]}
)
VALIDATE: Send a message containing your blocked word and confirm the agent responds with blockedInputMessaging, not an error trace.
PITFALL: Guardrail version must be published (not DRAFT) before attaching to an agent. Call create_guardrail_version after creating.
6Prepare and Deploy the Agent
Preparation compiles action group schemas, validates Lambda ARNs, and creates the internal routing graph. Alias the agent after preparation — always invoke via alias, never directly via DRAFT.
python
bedrock_agent.prepare_agent(agentId=AGENT_ID)

import time
for _ in range(30):
    status = bedrock_agent.get_agent(
        agentId=AGENT_ID
    )["agent"]["agentStatus"]
    if status == "PREPARED":
        break
    time.sleep(10)
else:
    raise RuntimeError("Agent preparation timed out")

alias = bedrock_agent.create_agent_alias(
    agentId=AGENT_ID,
    agentAliasName="production-v1",
    description="Production alias - stable")
ALIAS_ID = alias["agentAlias"]["agentAliasId"]
VALIDATE: Invoke via alias and confirm a multi-turn reasoning trace appears in the response stream.
PITFALL: Invoking DRAFT version in production means every schema change immediately affects live users. Always create a versioned alias.
7Wire EventBridge for Event-Driven Invocation
The agent should be triggered by system events (S3 uploads, database changes, scheduled jobs). Create an EventBridge rule that routes events to a Lambda that invokes the agent.
bash
# eventbridge_rule.json
{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {"name": ["agentforge-inbox"]},
    "object": {"key": [{"suffix": ".pdf"}]}
  }
}

aws events put-rule \
  --name "AgentForge-S3-Trigger" \
  --event-pattern file://eventbridge_rule.json \
  --state ENABLED

# In the trigger Lambda:
def build_agent_prompt(s3_event):
    bucket = s3_event["detail"]["bucket"]["name"]
    key = s3_event["detail"]["object"]["key"]
    return (
        f"New document: s3://{bucket}/{key}. "
        f"Classify type, extract metadata, "
        f"route to DynamoDB, send Slack alert.")
VALIDATE: Upload a PDF to the S3 bucket and confirm EventBridge delivers the event to the trigger Lambda within 5 seconds.
PITFALL: EventBridge delivers events at-least-once. Your Lambda must be idempotent — check if already processed (S3 object tag "processed=true") before invoking.
8Implement Retry Logic and Plan Revision
Build a middleware wrapper that intercepts tool failures, classifies them as retryable vs. fatal, and injects a plan-revision directive into the agent’s next message when a retryable failure occurs.
python
class AgentMiddleware:
    MAX_RETRIES = 2
    RETRYABLE = ["ThrottlingException",
        "ServiceUnavailableException",
        "timeout", "connection refused"]

    def __init__(self, agent_id, alias_id):
        self.agent_id = agent_id
        self.alias_id = alias_id
        self.runtime = boto3.client(
            "bedrock-agent-runtime")
        self.retry_count = 0

    def invoke_with_retry(self, session_id, prompt):
        try:
            return self._stream_response(
                session_id, prompt)
        except Exception as e:
            err = str(e)
            retryable = any(
                r in err for r in self.RETRYABLE)
            if retryable and \
                self.retry_count < self.MAX_RETRIES:
                self.retry_count += 1
                return self.invoke_with_retry(
                    session_id,
                    f"Previous attempt failed: {err}."
                    f" Revise plan and retry.")
            raise
VALIDATE: Introduce a deliberate 500 error in one Action Group Lambda and confirm the agent retries and revises its plan.
PITFALL: Unlimited retries create infinite loops and runaway costs. Hard-cap retries at 2–3 and always log every retry with the original error.
9Deploy with AgentCore CLI
AgentCore provides managed invocation, automatic scaling, built-in observability, and Memory Store integration. Use it for production deployments.
yaml
# bedrock_agentcore.yaml
entrypoint: agent_handler.py
handler: lambda_handler
runtime: python3.11
memory: 512
timeout: 60
environment:
  AGENT_ID: ${AGENT_ID}
  AGENT_ALIAS_ID: ${AGENT_ALIAS_ID}
  OPENSEARCH_ENDPOINT: ${OPENSEARCH_ENDPOINT}
  SESSIONS_TABLE: AgentForgeSessions
tools:
  - name: create_event
    schema: ./schemas/action_group_calendar.yaml
  - name: send_email
    schema: ./schemas/action_group_email.yaml
memory:
  session_store: dynamodb://AgentForgeSessions
observability:
  cloudwatch: enabled
  xray: enabled

# Deploy:
agentcore configure -e agent_handler.py
agentcore launch
agentcore invoke '{"prompt": "Schedule a Q3 review"}'
VALIDATE: Confirm agentcore status shows "RUNNING" and invoke response contains a tool call trace.
PITFALL: agentcore launch without --env flags for secrets will cause the runtime to fail on the first tool call that requires external credentials.
10Build an Evaluation Harness
Before production, evaluate agent quality against a curated dataset. Create 50 test cases: input prompt, expected tool calls (in order), and expected response characteristics.
python
from dataclasses import dataclass

@dataclass
class TestCase:
    id: str
    prompt: str
    expected_tools: list  # ordered
    expected_contains: list  # keywords
    expected_excludes: list

def evaluate_agent(test_cases):
    results = {"passed": 0, "failed": 0,
               "tool_accuracy": []}
    for tc in test_cases:
        response, trace = invoke_agent_with_trace(
            tc.prompt)
        actual = [t["name"] for t in trace
                  if t["type"] == "tool_call"]
        tool_ok = actual == tc.expected_tools
        content_ok = all(
            kw in response
            for kw in tc.expected_contains)
        content_bad = any(
            kw in response
            for kw in tc.expected_excludes)
        passed = tool_ok and content_ok \
            and not content_bad
        results["passed" if passed
                else "failed"] += 1
        results["tool_accuracy"].append(
            1 if tool_ok else 0)
    results["accuracy_pct"] = sum(
        results["tool_accuracy"]
    ) / len(test_cases) * 100
    return results
VALIDATE: Run evaluation and confirm tool_accuracy_pct >= 80% before approving any production deployment.
PITFALL: Evaluating only happy-path prompts masks failure modes. Include at least 10 adversarial test cases in every evaluation run.
Azure Implementation Path

Replace Bedrock Agents with Azure AI Agent Service (formerly Azure AI Foundry Agents) for ReAct-style autonomous task execution. Action Groups map to Azure Functions with OpenAPI tool definitions. Knowledge Base equivalent: Azure AI Search with Azure OpenAI Embeddings. Orchestration for deterministic branching via Azure Logic Apps or Durable Functions. Guardrails via Azure AI Content Safety + Responsible AI tools. Event-driven triggers through Event GridAzure Functions. Session memory in Cosmos DB.

Azure AI Agent Service Azure Functions Azure AI Search Azure OpenAI Logic Apps Event Grid Azure AI Content Safety Cosmos DB
python
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential

client = AIProjectClient(
    credential=DefaultAzureCredential(),
    endpoint="https://agentforge.openai.azure.com")

agent = client.agents.create_agent(
    model="gpt-4",
    name="AgentForgeTaskAgent",
    instructions="You are an enterprise task agent...",
    tools=[{"type": "function", "function": {
        "name": "create_event",
        "parameters": {...}}}])
GCP Implementation Path

Replace Bedrock Agents with Vertex AI Agent Builder for autonomous agent creation with tool-use. Action Groups map to Cloud Functions (2nd gen) with function declarations. Knowledge Base via Vertex AI Search (RAG engine). Orchestration via Cloud Workflows for deterministic steps. Event-driven triggers through EventarcCloud Functions. Guardrails via Vertex AI Safety Settings. Session state in Firestore.

Vertex AI Agent Builder Cloud Functions Vertex AI Search Vertex AI (Gemini) Cloud Workflows Eventarc Firestore
python
from vertexai.preview import reasoning_engines

agent = reasoning_engines.LangchainAgent(
    model="gemini-1.5-pro",
    tools=[create_event, get_availability],
    agent_executor_kwargs={
        "return_intermediate_steps": True})

remote_agent = reasoning_engines.ReasoningEngine.create(
    agent, display_name="AgentForgeTaskAgent",
    requirements=["google-cloud-aiplatform"])
PRODUCTION CHECKLIST — NODE 02
Estimated Lab Time: 5–7 hours (Intermediate to Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 2.1
First ReAct agent using Bedrock Agents + Action Group
Create a Bedrock Agent with three Action Groups: get_weather, create_calendar_event, and send_email. Trigger a multi-step workflow: "Schedule a meeting about the storm tomorrow."
Intermediate Exercise 2.2
Event-driven agent triggered by S3 uploads
Wire an S3 event notification to a Lambda that invokes a Bedrock Agent. The agent must classify the document, extract metadata, and route it to the correct DynamoDB table.
aws s3api put-bucket-notification-configuration \
  --bucket agentforge-inbox --notification-config file://notify.json
Advanced Exercise 2.3
Long-horizon task: code review → PR → Jira ticket pipeline
Build an agent that reviews a GitHub PR diff, generates inline comments via the GitHub API, creates a Jira ticket for each critical finding, and posts a Slack summary — all autonomously.
Stretch Exercise 2.4
Self-healing agent with retry and plan revision
Introduce deliberate tool failures (20% error rate). Instrument the agent to detect failures, revise its plan, and retry with an alternative strategy. Log plan revisions to CloudWatch.
03⚙️
DEVELOPER & DEVOPS AGENTS
AI coding and pipeline automation with IaC auto-remediation across all major platforms — from intelligent code review to fully autonomous infrastructure repair.
Amazon Q Developer CodeWhisperer GitHub Copilot Azure DevOps AI Gemini Code Assist Cloud Build AI Terraform Auto-Remediation
Architecture Pattern
DEVOPS AGENT FLOW
Git push → GitHub Actions / CodePipeline [trigger]
Code scan → Amazon Q Developer [analysis]
Issue detected → Bedrock agent [remediation]
Patch generated → PR auto-created [fix]
tf plan → drift detected → auto-apply [IaC]
Key Concepts
→ CI/CD pipeline as agent trigger surface
→ Diff-aware code generation
→ Terraform plan parsing + remediation
→ Security scanning + auto-patch PRs
→ Rollback guards and change approvals
Implementation Steps
PREREQUISITES
  • GitHub repo with Actions enabled, PAT with repo + pull_request scope
  • AWS CLI v2, Terraform 1.6+, Python 3.11+, boto3
  • Bedrock model access: Claude 3 Sonnet
  • IAM role with SecurityAudit + SSMAutomationRole policies
  • Jira API token for ticket creation
  • Security Hub enabled in target account
1Set Up GitHub Actions PR Trigger Workflow
Create a reusable workflow triggered on pull_request events. Extract the diff using GitHub API and prepare it for LLM analysis. Store the diff as a workflow artifact for downstream steps.
yaml
# .github/workflows/ai-review.yml
name: AI Code Review
on:
  pull_request:
    types: [opened, synchronize]

jobs:
  review:
    runs-on: ubuntu-latest
    permissions:
      pull-requests: write
      contents: read
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - name: Get PR diff
        id: diff
        run: |
          gh pr diff ${{ github.event.number }} \
            > /tmp/pr_diff.txt
          echo "lines=$(wc -l < /tmp/pr_diff.txt)" \
            >> $GITHUB_OUTPUT
        env:
          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
      - name: Run AI Review
        run: python review_agent.py \
          ${{ github.event.number }}
        env:
          BEDROCK_REGION: us-east-1
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
VALIDATE: Open a test PR and confirm the workflow triggers within 30 seconds. Check Actions tab for successful diff extraction.
PITFALL: Using fetch-depth: 1 (default) prevents diff calculation. Always set fetch-depth: 0 for full git history.
2Build the AI Code Review Agent
Send the PR diff to Bedrock Claude with a structured prompt that requests severity-tagged issues. Parse the JSON response into actionable review comments.
python
import boto3, json

bedrock = boto3.client("bedrock-runtime",
    region_name="us-east-1")

def review_diff(diff_text: str) -> list:
    prompt = f"""Review this code diff. Return JSON:
[{{"file": "path", "line": N,
   "severity": "CRITICAL|HIGH|MEDIUM|LOW",
   "issue": "description",
   "suggestion": "fix"}}]

Rules:
- Flag security issues as CRITICAL
- Flag performance issues as HIGH
- Flag style issues as LOW
- Include line numbers from the diff

Diff:
{diff_text[:12000]}"""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": [{"role": "user",
                          "content": prompt}]
        }))
    body = json.loads(resp["body"].read())
    return json.loads(
        body["content"][0]["text"])
VALIDATE: Run the review agent against a known-bad diff containing an SQL injection vulnerability and confirm it returns a CRITICAL finding.
PITFALL: Sending the entire diff without truncation hits token limits. Always cap diff size at 12K tokens and split large PRs into batches.
3Post Inline Review Comments via GitHub API
Map LLM-identified issues to specific file paths and line numbers. Post inline review comments using the GitHub Pulls API. Create a review summary with severity counts.
python
import requests, os

def post_review_comments(pr_number, findings):
    token = os.environ["GITHUB_TOKEN"]
    repo  = os.environ["GITHUB_REPOSITORY"]
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/vnd.github+json"
    }

    comments = []
    for f in findings:
        comments.append({
            "path": f["file"],
            "line": f["line"],
            "body": f"**[{f['severity']}]** "
                    f"{f['issue']}\n\n"
                    f"**Suggestion:** {f['suggestion']}"
        })

    critical = sum(
        1 for f in findings
        if f["severity"] == "CRITICAL")
    event = "REQUEST_CHANGES" if critical > 0 \
        else "COMMENT"

    resp = requests.post(
        f"https://api.github.com/repos/{repo}"
        f"/pulls/{pr_number}/reviews",
        headers=headers,
        json={"body": f"AI Review: {len(findings)} "
              f"issues ({critical} critical)",
              "event": event,
              "comments": comments})
    return resp.status_code
VALIDATE: Open a PR and confirm inline comments appear on the correct files and lines. Verify CRITICAL findings trigger "Request Changes" status.
PITFALL: Posting comments on lines not in the diff causes a 422 error. Always validate line numbers against the actual diff hunks before posting.
4Create the Terraform Drift Detection Pipeline
Schedule terraform plan runs via GitHub Actions cron. Parse the plan JSON output to identify resource drift. Route drift findings to a Bedrock agent for analysis.
yaml
# .github/workflows/drift-detect.yml
name: Terraform Drift Detection
on:
  schedule:
    - cron: '0 6 * * *'  # Daily at 6 AM UTC

jobs:
  drift:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: hashicorp/setup-terraform@v3
        with:
          terraform_version: 1.6.0
      - name: Terraform Plan
        run: |
          cd infrastructure/terraform/environments/prod
          terraform init -backend-config=prod.hcl
          terraform plan -detailed-exitcode \
            -json -out=plan.json \
            > plan_output.json 2>&1 || true
      - name: Analyze Drift
        if: steps.plan.outcome == 'failure'
        run: python drift_agent.py plan_output.json
VALIDATE: Run the workflow manually and confirm it detects a known drift (e.g., manually changed security group rule).
PITFALL: Using -detailed-exitcode without handling exit code 2 (changes present) causes the workflow to fail. Use || true and check exit code separately.
5Build the IaC Remediation Agent
Agent receives Terraform drift JSON, generates corrective HCL patches, validates with terraform validate, and creates a Jira approval ticket before applying.
python
def handle_drift(plan_json: dict) -> dict:
    drift_resources = [
        r for r in plan_json.get(
            "resource_changes", [])
        if r["change"]["actions"] != ["no-op"]]

    prompt = f"""Analyze this Terraform drift:
{json.dumps(drift_resources[:5], indent=2)}

For each drifted resource, provide:
1. What changed and likely root cause
2. Corrective HCL code to fix the drift
3. Risk assessment (LOW/MEDIUM/HIGH)
4. Whether auto-apply is safe

Return JSON array."""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    remediation = json.loads(
        resp["body"].read()
    )["content"][0]["text"]
    return json.loads(remediation)
VALIDATE: Introduce a manual SG rule change, run the drift pipeline, and confirm the agent generates valid corrective HCL code.
PITFALL: Auto-applying remediation without human review can cause outages. Always require Jira approval for HIGH-risk changes.
6Wire Security Hub Findings to Auto-Remediation
Subscribe to Security Hub findings via EventBridge. For HIGH/CRITICAL findings, invoke a Bedrock agent that generates Terraform remediation snippets and opens a PR.
bash
# EventBridge rule for Security Hub
aws events put-rule \
  --name "SecurityHub-HighFindings" \
  --event-pattern '{
    "source": ["aws.securityhub"],
    "detail-type": ["Security Hub Findings"],
    "detail": {
      "findings": {
        "Severity": {"Label": ["HIGH","CRITICAL"]}
      }
    }
  }' --state ENABLED

# Lambda handler:
def handle_security_finding(event, context):
    finding = event["detail"]["findings"][0]
    resource = finding["Resources"][0]
    prompt = (
        f"Security Hub finding: "
        f"{finding['Title']}\n"
        f"Resource: {resource['Id']}\n"
        f"Severity: {finding['Severity']['Label']}\n"
        f"Generate Terraform remediation code."
    )
    remediation = invoke_bedrock(prompt)
    create_pr(remediation, finding["Id"])
    return {"statusCode": 200}
VALIDATE: Create a test Security Hub finding and confirm the Lambda triggers, generates a remediation PR, and posts to Slack.
PITFALL: Not filtering by severity floods the agent with LOW findings. Always filter to HIGH and CRITICAL only in the EventBridge pattern.
7Implement Rollback Guards and Approval Gates
Add human approval gates before any production terraform apply. Use Step Functions with a manual approval state backed by SNS + callback token.
json
# step_function_definition.json
{
  "StartAt": "PlanReview",
  "States": {
    "PlanReview": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCT:function:TfPlan",
      "Next": "WaitForApproval"
    },
    "WaitForApproval": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
      "Parameters": {
        "QueueUrl": "${ApprovalQueueUrl}",
        "MessageBody": {
          "taskToken.$": "$$.Task.Token",
          "plan.$": "$.plan_summary"
        }
      },
      "TimeoutSeconds": 86400,
      "Next": "Apply"
    },
    "Apply": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCT:function:TfApply",
      "End": true
    }
  }
}
VALIDATE: Submit a plan and confirm the Step Function pauses at WaitForApproval. Send the callback token to resume and verify apply completes.
PITFALL: Setting TimeoutSeconds too low causes legitimate approvals to expire. Use 86400 (24 hours) for production change windows.
8Build Incident-to-Patch Automation
PagerDuty webhook triggers Lambda. Agent reads CloudWatch logs, identifies root cause, generates code fix, and opens a PR automatically.
python
def handle_pagerduty_webhook(event, context):
    incident = json.loads(event["body"])
    service = incident["event"]["data"]["service"]["name"]

    # Fetch recent error logs
    logs_client = boto3.client("logs")
    resp = logs_client.filter_log_events(
        logGroupName=f"/aws/lambda/{service}",
        startTime=int((time.time() - 3600) * 1000),
        filterPattern="ERROR",
        limit=50)

    error_logs = "\n".join(
        [e["message"] for e in resp["events"]])

    prompt = (
        f"Service: {service}\n"
        f"Recent errors:\n{error_logs[:8000]}\n\n"
        f"1. Identify root cause\n"
        f"2. Generate a code fix\n"
        f"3. Explain the fix"
    )
    fix = invoke_bedrock(prompt)
    pr_url = create_pr_with_fix(service, fix)
    notify_oncall(pr_url, incident["event"]["data"])
    return {"statusCode": 200}
VALIDATE: Simulate a PagerDuty webhook and confirm the agent reads logs, generates a fix, and opens a PR within 2 minutes.
PITFALL: Reading unlimited log events causes Lambda timeout. Always limit to 50 events and filter by ERROR pattern.
9Add Blast Radius Estimation
Before applying any IaC change, estimate the blast radius by analyzing resource dependencies. Block changes affecting >10 resources without explicit approval.
python
def estimate_blast_radius(plan_json: dict) -> dict:
    changes = plan_json.get("resource_changes", [])
    affected = [r for r in changes
        if r["change"]["actions"] != ["no-op"]]

    # Check dependency graph
    destroyed = [r for r in affected
        if "delete" in r["change"]["actions"]]
    modified = [r for r in affected
        if "update" in r["change"]["actions"]]

    risk_score = (
        len(destroyed) * 3 +
        len(modified) * 1)

    return {
        "total_affected": len(affected),
        "destroyed": len(destroyed),
        "modified": len(modified),
        "risk_score": risk_score,
        "requires_approval": risk_score > 10,
        "summary": f"{len(affected)} resources "
            f"affected, {len(destroyed)} destroyed"
    }
VALIDATE: Run a plan that destroys 4 resources and confirm the blast radius score exceeds the threshold and requires_approval is True.
PITFALL: Not counting transitive dependencies underestimates blast radius. A deleted VPC cascades to all subnets, routes, and ENIs.
10Instrument CI/CD Observability
Log every agent action (review posted, drift detected, remediation generated) as structured JSON to CloudWatch. Track mean-time-to-remediation.
python
def log_agent_action(action_type, metadata):
    logger.info(json.dumps({
        "event": "devops_agent_action",
        "action_type": action_type,
        "timestamp": datetime.utcnow().isoformat(),
        "pr_number": metadata.get("pr_number"),
        "findings_count": metadata.get("findings", 0),
        "severity_breakdown": metadata.get("severity"),
        "drift_resources": metadata.get("drift_count"),
        "remediation_generated": metadata.get("remediated"),
        "latency_ms": metadata.get("latency_ms"),
        "estimated_cost_usd": metadata.get("cost")
    }))

# Usage:
log_agent_action("code_review", {
    "pr_number": 42,
    "findings": 5,
    "severity": {"CRITICAL": 1, "HIGH": 2,
                 "MEDIUM": 1, "LOW": 1},
    "latency_ms": 3200,
    "cost": 0.012
})
VALIDATE: Run 3 reviews and confirm all structured log entries appear in CloudWatch with correct fields.
PITFALL: Logging only on success hides failure patterns. Always log both successes and failures with the action_type field.
11Build Quality Gate in CI Pipeline
Run the code review agent as a required check. Block PR merge if any CRITICAL issues are found. Auto-approve if only LOW findings exist.
bash
# In the GitHub Actions workflow:
- name: Quality Gate
  run: |
    CRITICAL=$(python -c "
    import json
    f = json.load(open('findings.json'))
    print(sum(1 for x in f
        if x['severity'] == 'CRITICAL'))")
    if [ "$CRITICAL" -gt 0 ]; then
      echo "::error::CRITICAL issues found"
      exit 1
    fi

# Branch protection rule:
gh api repos/{owner}/{repo}/branches/main/protection \
  --method PUT --input - << 'EOF'
{
  "required_status_checks": {
    "strict": true,
    "contexts": ["AI Code Review / review"]
  },
  "enforce_admins": true,
  "required_pull_request_reviews": {
    "required_approving_review_count": 1
  }
}
EOF
VALIDATE: Submit a PR with a deliberate SQL injection and confirm the quality gate blocks the merge.
PITFALL: Making the review a required check without a timeout causes PRs to hang if the agent fails. Set a 10-minute job timeout.
12Deploy Production Monitoring Dashboard
Build CloudWatch dashboard: PRs reviewed/day, drift events/week, remediation success rate, mean review time, cost per review.
bash
aws cloudwatch put-dashboard \
  --dashboard-name "AgentForge-DevOps" \
  --dashboard-body '{
  "widgets": [
    {"type": "metric", "properties": {
      "title": "PRs Reviewed / Day",
      "metrics": [
        ["AgentForge/DevOps", "PRsReviewed",
         "Period", 86400]
      ]}},
    {"type": "metric", "properties": {
      "title": "Mean Review Latency (ms)",
      "metrics": [
        ["AgentForge/DevOps", "ReviewLatency",
         "Statistic", "Average"]
      ]}},
    {"type": "metric", "properties": {
      "title": "Remediation Success Rate",
      "metrics": [
        ["AgentForge/DevOps", "RemediationSuccess"]
      ]}},
    {"type": "metric", "properties": {
      "title": "Cost per Review (USD)",
      "metrics": [
        ["AgentForge/DevOps", "CostPerReview"]
      ]}}
  ]}'
VALIDATE: Open the CloudWatch dashboard and confirm all 4 widgets render with data from the last 24 hours.
PITFALL: Not setting the Period correctly causes metrics to aggregate incorrectly. Use 86400 for daily counts, 300 for latency.
Azure Implementation Path

Replace GitHub Actions with Azure Pipelines for CI/CD triggers on PRs. Code analysis via Azure DevOps AI-assisted reviews. IaC drift detection with Terraform Cloud or Azure Resource Graph queries. Security findings from Microsoft Defender for Cloud (replacing Security Hub). Remediation PRs via Azure Repos API. Approval gates via Azure Pipelines Environments with manual approval checks. Incident alerting via Azure Monitor + Logic Apps integration.

Azure Pipelines Azure DevOps Defender for Cloud Azure Resource Graph Azure Monitor Azure OpenAI
yaml
# azure-pipelines.yml
trigger:
  branches:
    include: [main]
pr:
  branches:
    include: [main]

stages:
  - stage: AIReview
    jobs:
      - job: CodeReview
        steps:
          - script: |
              python review_agent.py \
                --provider azure \
                --model gpt-4
            env:
              AZURE_OPENAI_KEY: $(AZURE_KEY)
GCP Implementation Path

Replace GitHub Actions with Cloud Build triggers on PRs. Code analysis via Gemini Code Assist. IaC drift detection using Cloud Asset Inventory + Terraform. Security findings from Security Command Center (SCC). Approval gates via Cloud Deploy approval policies. Incident alerting via Cloud Monitoring + Cloud Alerting. LLM analysis via Vertex AI (Gemini Pro).

Cloud Build Gemini Code Assist Security Command Center Cloud Asset Inventory Cloud Deploy Vertex AI (Gemini)
yaml
# cloudbuild.yaml
steps:
  - name: 'python:3.11'
    entrypoint: 'python'
    args: ['review_agent.py', '--provider', 'gcp']
    secretEnv: ['VERTEX_API_KEY']

  - name: 'hashicorp/terraform'
    args: ['plan', '-detailed-exitcode']

availableSecrets:
  secretManager:
    - versionName: projects/P/secrets/vertex-key/versions/1
      env: 'VERTEX_API_KEY'
PRODUCTION CHECKLIST — NODE 03
Estimated Lab Time: 5–7 hours (Intermediate to Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 3.1
Automated code review agent on every pull request
Build a GitHub Actions workflow that sends the PR diff to Bedrock (Claude). Parse the response for severity-tagged issues and post inline review comments via the GitHub API.
- uses: actions/checkout@v4
- run: python review_agent.py ${{ github.event.pull_request.number }}
Intermediate Exercise 3.2
Terraform drift detection + LLM-powered remediation
Run terraform plan on a schedule. When drift is detected, pass the plan JSON to a Bedrock agent that generates the corrective apply commands and creates a Jira ticket for approval.
Advanced Exercise 3.3
Full autonomous incident-to-patch pipeline
PagerDuty alert fires → agent reads CloudWatch logs → identifies root cause → generates a code fix → opens a PR → pings on-call engineer → auto-merges after approval timeout.
Stretch Exercise 3.4
Security posture agent with CSPM integration
Connect AWS Security Hub findings to a Bedrock agent. For each HIGH severity finding: generate a remediation Terraform snippet, estimate blast radius, and post to a security Slack channel.
04📊
DATA & ANALYTICS AGENTS
Intelligent analytics and RAG systems with natural language query across BI platforms — turning terabytes of structured and unstructured data into instant conversational insights.
QuickSight Q Bedrock RAG + OpenSearch Microsoft Fabric AI Power BI Copilot BigQuery + Gemini NLQ Looker AI Vertex AI Search
Architecture Pattern
RAG + NLQ PIPELINE
NL query → embed → vector search [retrieve]
Top-K chunks → re-ranker [filter]
Prompt + context → Bedrock / Gemini [generate]
SQL intent → Athena / BigQuery execute [query]
Result → chart + narrative response [output]
Key Concepts
→ Chunking strategies (512 tokens, overlapping)
→ Hybrid search: vector + BM25 keyword
→ NL-to-SQL with schema injection
→ Result confidence scoring
→ Citation and source attribution
Implementation Steps
PREREQUISITES
  • S3 bucket with 50+ documents (PDF, CSV, HTML)
  • OpenSearch Serverless collection (type: VECTORSEARCH)
  • Bedrock model access: Claude 3 Sonnet + Titan Embeddings V2
  • Glue Data Catalog with at least one database + 3 tables
  • Athena workgroup configured with S3 output location
  • Python 3.11+, boto3, opensearch-py, pandas
1Design Your Document Chunking Strategy
Analyze document types. Choose chunking: 512 tokens with 20% overlap for PDFs, row-based for CSVs. Use LangChain text splitters for consistent results.
python
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter)
import fitz  # PyMuPDF

def chunk_pdf(pdf_path: str,
              chunk_size=512, overlap=102):
    doc = fitz.open(pdf_path)
    full_text = ""
    for page in doc:
        full_text += page.get_text() + "\n"

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=overlap,
        length_function=lambda t: len(
            t.split()),  # token-approximate
        separators=["\n\n", "\n", ". ", " "])

    chunks = splitter.create_documents(
        [full_text],
        metadatas=[{"source": pdf_path,
                    "total_pages": len(doc)}])
    return chunks

# Process all documents
import glob
all_chunks = []
for pdf in glob.glob("s3_docs/*.pdf"):
    all_chunks.extend(chunk_pdf(pdf))
print(f"Total chunks: {len(all_chunks)}")
VALIDATE: Process 10 PDFs and confirm chunk count is reasonable (roughly doc_pages * 2–3 chunks per page). Verify no chunk exceeds 512 tokens.
PITFALL: Fixed-size chunking without overlap loses context at boundaries. Always use 15–20% overlap to preserve sentence continuity.
2Create OpenSearch Serverless Vector Collection
Create the collection, define the index mapping with knn_vector field (1024 dimensions for Titan v2), and configure network and data access policies.
json
# Create collection
aws opensearchserverless create-collection \
  --name agentforge-vectors \
  --type VECTORSEARCH \
  --description "RAG vector store"

# Create index mapping
PUT agentforge-kb-index
{
  "settings": {
    "index": {
      "knn": true,
      "knn.algo_param.ef_search": 512
    }
  },
  "mappings": {
    "properties": {
      "embedding": {
        "type": "knn_vector",
        "dimension": 1024,
        "method": {
          "engine": "faiss",
          "name": "hnsw",
          "space_type": "l2",
          "parameters": {
            "ef_construction": 512,
            "m": 16
          }
        }
      },
      "text": {"type": "text"},
      "source": {"type": "keyword"},
      "chunk_id": {"type": "keyword"},
      "metadata": {"type": "object"}
    }
  }
}
VALIDATE: Run aws opensearchserverless list-collections and confirm status is ACTIVE. Verify the index exists via the OpenSearch dashboard.
PITFALL: Using cosine similarity with unnormalized Titan embeddings produces poor results. Use L2 (Euclidean) distance for Titan Embeddings V2.
3Build the Embedding Pipeline
Process documents through Titan Embeddings V2. Batch embed chunks and index into OpenSearch. Track progress and handle rate limits with exponential backoff.
python
import boto3, json, time
from opensearchpy import OpenSearch

bedrock = boto3.client("bedrock-runtime")

def embed_text(text: str) -> list:
    resp = bedrock.invoke_model(
        modelId="amazon.titan-embed-text-v2:0",
        body=json.dumps({"inputText": text}))
    return json.loads(
        resp["body"].read())["embedding"]

def index_chunks(chunks, os_client, index_name):
    for i, chunk in enumerate(chunks):
        vector = embed_text(chunk.page_content)
        doc = {
            "embedding": vector,
            "text": chunk.page_content,
            "source": chunk.metadata["source"],
            "chunk_id": f"chunk-{i}",
            "metadata": chunk.metadata
        }
        os_client.index(
            index=index_name,
            body=doc,
            id=f"chunk-{i}")
        if i % 50 == 0:
            print(f"Indexed {i}/{len(chunks)}")
        time.sleep(0.1)  # Rate limit guard
VALIDATE: Index 100 chunks and verify the count with GET agentforge-kb-index/_count. Confirm the count matches expected chunks.
PITFALL: Not rate-limiting embedding calls triggers Bedrock throttling at ~50 TPS. Add 100ms delay between calls or use batch mode.
4Implement Vector Search with Hybrid Retrieval
Combine kNN vector search with BM25 keyword search. Use reciprocal rank fusion (RRF) to merge results from both search methods for better recall.
python
def hybrid_search(query: str, top_k: int = 20):
    query_vec = embed_text(query)

    # Vector search (kNN)
    vector_results = os_client.search(
        index="agentforge-kb-index",
        body={"size": top_k, "query": {"knn": {
            "embedding": {
                "vector": query_vec, "k": top_k
        }}}})

    # Keyword search (BM25)
    keyword_results = os_client.search(
        index="agentforge-kb-index",
        body={"size": top_k, "query": {
            "match": {"text": query}}})

    # Reciprocal Rank Fusion
    scores = {}
    for rank, hit in enumerate(
            vector_results["hits"]["hits"]):
        doc_id = hit["_id"]
        scores[doc_id] = scores.get(doc_id, 0) \
            + 1.0 / (60 + rank)
    for rank, hit in enumerate(
            keyword_results["hits"]["hits"]):
        doc_id = hit["_id"]
        scores[doc_id] = scores.get(doc_id, 0) \
            + 1.0 / (60 + rank)

    ranked = sorted(scores.items(),
        key=lambda x: x[1], reverse=True)
    return ranked[:top_k]
VALIDATE: Search for a known document topic and confirm the hybrid results include matches from both vector and keyword search.
PITFALL: Using vector search alone misses exact keyword matches (e.g., product names, error codes). Always combine with BM25 for production RAG.
5Add Cross-Encoder Re-Ranking
After initial retrieval, re-rank top-20 results using a cross-encoder model (ms-marco-MiniLM). Return top-5 to the generation step for higher precision.
python
from sentence_transformers import CrossEncoder

reranker = CrossEncoder(
    "cross-encoder/ms-marco-MiniLM-L-6-v2")

def rerank_results(query: str,
    candidates: list, top_k: int = 5):
    pairs = [(query, doc["text"])
             for doc in candidates]
    scores = reranker.predict(pairs)

    scored = list(zip(candidates, scores))
    scored.sort(key=lambda x: x[1], reverse=True)

    return [
        {**doc, "rerank_score": float(score)}
        for doc, score in scored[:top_k]
    ]

# Usage in pipeline:
raw_results = hybrid_search(user_query, top_k=20)
docs = fetch_documents(raw_results)
top_docs = rerank_results(user_query, docs, top_k=5)
VALIDATE: Run 10 queries and measure precision@5 with and without re-ranking. Confirm at least 15% improvement in relevance.
PITFALL: Loading the cross-encoder model on every Lambda invocation adds 3–5s cold start. Pre-load the model in a container or use a dedicated re-ranking service.
6Build the RAG Generation Pipeline
Assemble the prompt: system instructions + retrieved chunks with source citations + user query. Call Bedrock Claude. Always include source attribution in the response.
python
def generate_answer(query: str,
    context_docs: list) -> dict:
    context = "\n\n".join([
        f"[Source: {d['source']}, "
        f"Score: {d['rerank_score']:.2f}]\n"
        f"{d['text']}"
        for d in context_docs])

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2048,
            "system": "Answer based ONLY on the "
                "provided context. Cite sources "
                "as [Source: filename]. If the "
                "context does not contain the "
                "answer, say so explicitly.",
            "messages": [{"role": "user",
                "content": f"Context:\n{context}"
                f"\n\nQuestion: {query}"}]
        }))
    body = json.loads(resp["body"].read())
    return {
        "answer": body["content"][0]["text"],
        "sources": [d["source"]
            for d in context_docs],
        "tokens_used": body["usage"]
    }
VALIDATE: Ask a question whose answer is in the corpus and confirm the response cites the correct source document.
PITFALL: Omitting the "ONLY based on context" instruction causes hallucinations in 30%+ of responses. Always constrain the LLM to retrieved context.
7Implement NL-to-SQL with Schema Injection
Pull table schemas from Glue Data Catalog. Inject into the LLM prompt. Generate SQL, validate syntax, execute via Athena, and return formatted results.
python
glue = boto3.client("glue")
athena = boto3.client("athena")

def get_schema(database: str) -> str:
    tables = glue.get_tables(
        DatabaseName=database)["TableList"]
    schema_str = ""
    for t in tables:
        cols = ", ".join(
            [f"{c['Name']} {c['Type']}"
             for c in t["StorageDescriptor"]["Columns"]])
        schema_str += f"Table: {t['Name']} ({cols})\n"
    return schema_str

def nl_to_sql(question: str, database: str) -> str:
    schema = get_schema(database)
    prompt = f"""Convert to SQL for Amazon Athena.
Schema:
{schema}

Rules: Use double quotes for identifiers.
Only SELECT queries. No DDL/DML.

Question: {question}
Return ONLY the SQL, no explanation."""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1024,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    sql = json.loads(
        resp["body"].read()
    )["content"][0]["text"].strip()
    return sql
VALIDATE: Ask "What were total sales by region last month?" and confirm the generated SQL is syntactically valid and returns correct results from Athena.
PITFALL: Not injecting the actual table schema causes the LLM to hallucinate table and column names. Always pull live schema from Glue before generating SQL.
8Add Confidence Gating and Fallback
Score retrieval confidence. If top chunk rerank_score < 0.72, return "I don’t have enough information" instead of guessing. Log all low-confidence queries for review.
python
def gated_answer(query: str) -> dict:
    raw = hybrid_search(query, top_k=20)
    docs = fetch_documents(raw)
    ranked = rerank_results(query, docs, top_k=5)

    top_score = ranked[0]["rerank_score"] \
        if ranked else 0

    if top_score < 0.72:
        log_low_confidence(query, top_score)
        return {
            "answer": "I don't have enough "
                "information in the knowledge base "
                "to answer this question accurately. "
                "Please contact support for help.",
            "confidence": top_score,
            "gated": True,
            "sources": []
        }

    return generate_answer(query, ranked)
VALIDATE: Ask a question about a topic NOT in your corpus and confirm the agent returns the fallback message, not a hallucinated answer.
PITFALL: Setting the threshold too high (>0.85) causes the agent to refuse legitimate questions. Tune the threshold on a held-out test set of 50+ queries.
9Build Citation and Source Attribution
Track which chunks were used in each answer. Return source document name, page number, and relevance score with every response.
python
def build_citations(context_docs: list,
    answer: str) -> list:
    citations = []
    for doc in context_docs:
        # Check if this source was actually cited
        source_name = doc["source"].split("/")[-1]
        if source_name.lower() in answer.lower():
            citations.append({
                "source": doc["source"],
                "chunk_id": doc["chunk_id"],
                "relevance_score":
                    doc["rerank_score"],
                "page": doc.get("metadata", {})
                    .get("page_number", "N/A"),
                "excerpt": doc["text"][:200]
            })
    return citations

# Include in response:
result = generate_answer(query, ranked)
result["citations"] = build_citations(
    ranked, result["answer"])
VALIDATE: Ask a question and confirm the response includes at least one citation with source, page, and relevance score.
PITFALL: Returning all retrieved chunks as citations regardless of whether they were used inflates citation lists. Only cite chunks actually referenced in the answer.
10Implement RAGAS Evaluation Pipeline
Evaluate faithfulness, answer relevancy, context precision, and context recall. Run on every deployment as a quality gate.
python
from ragas import evaluate
from ragas.metrics import (
    faithfulness, answer_relevancy,
    context_precision, context_recall)
from datasets import Dataset

def run_ragas_eval(test_cases: list) -> dict:
    data = {
        "question": [tc["question"]
            for tc in test_cases],
        "answer": [tc["generated_answer"]
            for tc in test_cases],
        "contexts": [tc["retrieved_contexts"]
            for tc in test_cases],
        "ground_truth": [tc["expected_answer"]
            for tc in test_cases]
    }
    dataset = Dataset.from_dict(data)
    results = evaluate(
        dataset,
        metrics=[faithfulness, answer_relevancy,
                 context_precision, context_recall])
    return {
        "faithfulness": results["faithfulness"],
        "answer_relevancy":
            results["answer_relevancy"],
        "context_precision":
            results["context_precision"],
        "context_recall":
            results["context_recall"]
    }
VALIDATE: Run RAGAS on 20 test cases and confirm faithfulness > 0.85 and context_precision > 0.80.
PITFALL: Running RAGAS without ground_truth answers makes context_recall unmeasurable. Always curate ground truth for your evaluation set.
11Add Cost Tracking and Token Optimization
Track input/output tokens per query. Implement prompt compression for long contexts. Set daily cost budgets with alerts.
python
import tiktoken

def estimate_cost(in_tokens, out_tokens,
    model="claude-3-sonnet"):
    prices = {
        "claude-3-sonnet": {
            "input": 0.000003,
            "output": 0.000015}}
    p = prices[model]
    return round(
        in_tokens * p["input"] +
        out_tokens * p["output"], 6)

def compress_context(chunks: list,
    max_tokens: int = 4000) -> list:
    """Keep top chunks within token budget"""
    enc = tiktoken.get_encoding("cl100k_base")
    selected, total = [], 0
    for chunk in chunks:
        tokens = len(enc.encode(chunk["text"]))
        if total + tokens > max_tokens:
            break
        selected.append(chunk)
        total += tokens
    return selected
VALIDATE: Run 50 queries and confirm total cost is tracked. Verify prompt compression reduces average input tokens by >20%.
PITFALL: Not setting a daily cost budget allows runaway spend on high-traffic days. Always set a CloudWatch alarm at $50/day threshold.
12Deploy Monitoring and Alerting
CloudWatch dashboard: queries/min, p50/p95 latency, faithfulness score trend, cost/query, cache hit rate. Alert on quality degradation.
bash
aws cloudwatch put-metric-alarm \
  --alarm-name "RAG-FaithfulnessDrop" \
  --metric-name "Faithfulness" \
  --namespace "AgentForge/RAG" \
  --statistic Average --period 3600 \
  --threshold 0.80 \
  --comparison-operator LessThanThreshold \
  --evaluation-periods 3 \
  --alarm-actions \
    "arn:aws:sns:us-east-1:ACCOUNT:RAGAlerts" \
  --treat-missing-data notBreaching

# Also track per-query metrics:
aws cloudwatch put-metric-data \
  --namespace "AgentForge/RAG" \
  --metric-data '[
    {"MetricName":"QueryLatency",
     "Value":1250,"Unit":"Milliseconds"},
    {"MetricName":"CostPerQuery",
     "Value":0.008,"Unit":"None"},
    {"MetricName":"RetrievalScore",
     "Value":0.87,"Unit":"None"}
  ]'
VALIDATE: Open the dashboard and confirm all widgets render. Trigger a low-faithfulness alarm and verify SNS notification.
PITFALL: Alerting only on latency misses quality degradation. Always track faithfulness and context_precision as primary quality metrics.
Azure Implementation Path

Replace OpenSearch with Azure AI Search (vector + semantic hybrid search). Embeddings via Azure OpenAI text-embedding-ada-002. NL-to-SQL targets Azure Synapse Analytics or Azure SQL with schema from Azure Purview (data catalog). BI integration via Power BI Copilot for natural language queries on dashboards. Evaluation via Azure AI Studio built-in evaluation. Re-ranking via Azure AI Search semantic ranker (built-in, no external model needed).

Azure AI Search Azure OpenAI Azure Synapse Azure Purview Power BI Copilot Microsoft Fabric
python
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery

vector_query = VectorizedQuery(
    vector=embed_query(user_question),
    k_nearest_neighbors=20,
    fields="content_vector")

results = search_client.search(
    search_text=user_question,  # BM25 hybrid
    vector_queries=[vector_query],
    query_type="semantic",       # built-in re-ranker
    semantic_configuration_name="default",
    top=5)
GCP Implementation Path

Replace OpenSearch with Vertex AI Search (managed RAG with built-in chunking, embedding, and retrieval). Embeddings via Vertex AI textembedding-gecko. NL-to-SQL targets BigQuery with schema from Dataplex (data catalog). BI via Looker with Gemini NLQ for natural language queries. Built-in evaluation via Vertex AI Evaluation. BigQuery ML for in-database embedding generation at scale.

Vertex AI Search Vertex AI Embeddings BigQuery Dataplex Looker + Gemini NLQ BigQuery ML
python
from google.cloud import discoveryengine_v1

client = discoveryengine_v1.SearchServiceClient()
request = discoveryengine_v1.SearchRequest(
    serving_config=f"projects/{PROJECT}/locations/global"
        f"/collections/default_collection"
        f"/engines/agentforge-search/servingConfigs/default",
    query=user_question,
    page_size=5,
    content_search_spec={
        "snippet_spec": {"return_snippet": True},
        "summary_spec": {
            "summary_result_count": 5,
            "include_citations": True,
            "model_spec": {"version": "gemini-1.5-flash"}
        }
    })
PRODUCTION CHECKLIST — NODE 04
Estimated Lab Time: 4–6 hours (Intermediate)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 4.1
Build a document Q&A agent with Bedrock Knowledge Bases
Upload 10 PDF documents to S3. Create a Bedrock Knowledge Base backed by OpenSearch Serverless. Build a Lambda that answers natural language questions with cited source passages.
Intermediate Exercise 4.2
NL-to-SQL agent over an Athena data lake
Build an agent that receives a business question, generates SQL using the Glue schema catalog as context, executes against Athena, and returns a formatted table + narrative summary.
SELECT * FROM "agentforge_db"."sales_events"
WHERE region = 'us-east-1' AND date > '2024-01-01'
Advanced Exercise 4.3
Hybrid RAG with re-ranking and confidence gating
Combine Titan Embeddings vector search with BM25 keyword search. Implement a cross-encoder re-ranker. Gate responses: only answer if top chunk score > 0.72, else escalate to human.
Stretch Exercise 4.4
Self-evaluating RAG with RAGAS metrics pipeline
Instrument every RAG call with RAGAS metrics (faithfulness, answer relevance, context precision). Build a CloudWatch dashboard tracking metric drift. Alert when faithfulness drops below 0.85.
05🛡️
SECURITY & GOVERNANCE AGENTS
Automated threat triage, compliance validation, and IAM policy intelligence — agents that can read security signals faster than any human SOC team and act without hesitation.
GuardDuty + LLM Security Hub AI Defender for Cloud Sentinel AI Copilot Entra ID Risk Analysis GCP SCC Chronicle AI
Architecture Pattern
THREAT TRIAGE AGENT
GuardDuty finding → EventBridge [signal]
Lambda → enrich with CloudTrail logs [context]
Bedrock agent → severity + TTPs classify [triage]
HIGH → auto-isolate resource via SSM [contain]
Report → Security Hub + Slack + Jira [report]
Key Concepts
→ MITRE ATT&CK TTP classification
→ Blast radius estimation
→ Least-privilege IAM policy generation
→ Automated containment playbooks
→ Compliance gap analysis (SOC2 / ISO 27001)
Implementation Steps
PREREQUISITES
  • GuardDuty enabled in all active regions
  • Security Hub enabled with AWS Foundational Security Best Practices standard
  • CloudTrail logging to S3 with organization trail
  • EventBridge configured in the security account
  • IAM role with SecurityAudit, GuardDuty read, SSM Automation permissions
  • Bedrock model access: Claude 3 Sonnet
  • Slack webhook URL for security alerts; Jira project for security findings
1Enable GuardDuty Multi-Region and Subscribe to EventBridge
Enable GuardDuty across all regions with a single script. Create EventBridge rules to capture all finding types and route to a centralized Lambda in the security account.
bash
#!/bin/bash
REGIONS=$(aws ec2 describe-regions \
  --query 'Regions[].RegionName' \
  --output text)

for REGION in $REGIONS; do
  echo "Enabling GuardDuty in $REGION"
  aws guardduty create-detector \
    --enable \
    --finding-publishing-frequency FIFTEEN_MINUTES \
    --region $REGION 2>/dev/null || true
done

# EventBridge rule for ALL GuardDuty findings
aws events put-rule \
  --name "GuardDuty-AllFindings" \
  --event-pattern '{
    "source": ["aws.guardduty"],
    "detail-type": [
      "GuardDuty Finding"
    ]
  }' \
  --state ENABLED

aws events put-targets \
  --rule "GuardDuty-AllFindings" \
  --targets "Id"="SecurityTriageLambda","Arn"="arn:aws:lambda:us-east-1:ACCOUNT:function:SecurityTriage"
VALIDATE: Run aws guardduty list-detectors --region us-east-1 in each region and confirm a detector exists. Verify the EventBridge rule with aws events describe-rule.
PITFALL: Enabling GuardDuty without setting finding-publishing-frequency defaults to 6 hours, delaying threat detection. Always set to FIFTEEN_MINUTES for production.
2Build the CloudTrail Enrichment Function
When a GuardDuty finding arrives, query CloudTrail for the last 50 events from the affected principal/resource. Build an enriched context document that includes both the finding and surrounding activity.
python
import boto3, json
from datetime import datetime, timedelta

cloudtrail = boto3.client("cloudtrail")

def enrich_finding(finding: dict) -> dict:
    resource = finding["resource"]
    actor = finding.get("service", {}).get(
        "action", {}).get("awsApiCallAction", {})
    principal = actor.get("remoteIpDetails", {}).get(
        "ipAddressV4", "unknown")

    # Query CloudTrail for context
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(hours=1)

    events = cloudtrail.lookup_events(
        LookupAttributes=[{
            "AttributeKey": "ResourceType",
            "AttributeValue":
                resource.get("resourceType", "")
        }],
        StartTime=start_time,
        EndTime=end_time,
        MaxResults=50
    )["Events"]

    return {
        "finding": finding,
        "cloudtrail_context": [
            {"event_name": e["EventName"],
             "event_time": str(e["EventTime"]),
             "username": e.get("Username", "N/A"),
             "source_ip": json.loads(
                 e["CloudTrailEvent"]
             ).get("sourceIPAddress", "N/A")}
            for e in events
        ],
        "enrichment_timestamp":
            datetime.utcnow().isoformat()
    }
VALIDATE: Trigger a test finding and confirm the enrichment function returns CloudTrail events from the same time window with correct principal info.
PITFALL: Querying CloudTrail without time bounds returns events from the last 90 days, causing Lambda timeout. Always scope to a 1-hour window around the finding.
3Create the MITRE ATT&CK Classification Agent
Send enriched findings to Bedrock Claude with the MITRE ATT&CK framework embedded in the system prompt. Agent classifies the finding into tactics and techniques with confidence scores.
python
def classify_mitre(enriched_finding: dict) -> dict:
    prompt = f"""You are a security analyst.
Classify this GuardDuty finding using
MITRE ATT&CK framework.

Finding: {json.dumps(
    enriched_finding["finding"], indent=2)}

CloudTrail context: {json.dumps(
    enriched_finding["cloudtrail_context"][:10],
    indent=2)}

Return JSON:
{{
  "tactic": "TA0001-TA0011 name",
  "technique": "T-number and name",
  "sub_technique": "if applicable",
  "confidence": 0.0-1.0,
  "severity_override": "LOW/MEDIUM/HIGH/CRITICAL",
  "kill_chain_phase": "recon/weaponize/deliver/exploit/install/c2/action",
  "ioc_indicators": ["list of IOCs"],
  "recommended_actions": ["list"]
}}"""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2048,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    return json.loads(
        resp["body"].read()
    )["content"][0]["text"]
VALIDATE: Send a known Brute Force finding type and confirm the agent maps it to TA0006 (Credential Access) / T1110 (Brute Force) with confidence > 0.8.
PITFALL: Sending the full finding JSON without truncation wastes tokens on irrelevant fields. Strip internal AWS metadata before sending to the LLM.
4Generate Plain-English Incident Narratives
Agent produces a structured narrative: what happened, who/what was affected, blast radius estimate, recommended immediate actions, and long-term remediation plan.
python
def generate_narrative(enriched: dict,
    mitre: dict) -> str:
    prompt = f"""Write a security incident narrative.

MITRE Classification: {json.dumps(mitre)}
Finding Details: {json.dumps(
    enriched["finding"], indent=2)[:3000]}

Structure your response as:
## What Happened
[1-2 sentence summary]

## Affected Resources
[List resources with ARNs]

## Blast Radius
[Estimate: how many other resources could
be affected if this is a real attack]

## Immediate Actions (next 30 minutes)
[Numbered list]

## Long-Term Remediation (next 7 days)
[Numbered list]

## Risk Score: [1-10]"""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2048,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    return json.loads(
        resp["body"].read()
    )["content"][0]["text"]
VALIDATE: Generate a narrative for a HIGH-severity finding and confirm it contains all 5 sections with specific resource ARNs and actionable remediation steps.
PITFALL: Generic narratives like "investigate the issue" provide no value. The prompt must require specific ARNs, time windows, and concrete next actions.
5Implement Automated Containment for HIGH Severity
For findings with severity >= 7.0: auto-isolate the EC2 instance (swap to empty SG), revoke IAM sessions, and capture memory snapshot to S3 for forensics.
python
ec2 = boto3.client("ec2")
iam = boto3.client("iam")
ssm = boto3.client("ssm")

def auto_contain(finding: dict):
    severity = finding["severity"]
    if severity < 7.0:
        return {"action": "monitor_only"}

    resource = finding["resource"]
    instance_id = resource.get(
        "instanceDetails", {}).get("instanceId")

    if not instance_id:
        return {"action": "no_instance"}

    # 1. Create isolation security group
    vpc_id = resource["instanceDetails"] \
        ["networkInterfaces"][0]["vpcId"]
    iso_sg = ec2.create_security_group(
        GroupName=f"ISOLATION-{instance_id}",
        Description="Forensic isolation - no traffic",
        VpcId=vpc_id)
    # No ingress/egress rules = fully isolated

    # 2. Swap security groups
    ec2.modify_instance_attribute(
        InstanceId=instance_id,
        Groups=[iso_sg["GroupId"]])

    # 3. Create memory snapshot
    ssm.send_command(
        InstanceIds=[instance_id],
        DocumentName="AWS-RunShellScript",
        Parameters={"commands": [
            "dd if=/dev/mem of=/tmp/memdump.raw "
            "bs=1M count=1024",
            "aws s3 cp /tmp/memdump.raw "
            f"s3://forensics-bucket/{instance_id}/"
        ]})

    return {"action": "contained",
            "instance": instance_id,
            "isolation_sg": iso_sg["GroupId"]}
VALIDATE: Trigger a HIGH finding on a test instance and confirm: (1) SG swapped to isolation group, (2) memory snapshot uploaded to S3, (3) no inbound/outbound traffic possible.
PITFALL: Isolating an instance in a production ASG causes the ASG to replace it, destroying forensic evidence. Tag the instance with "forensic-hold" and suspend ASG processes first.
6Build the IAM Policy Least-Privilege Analyzer
Pull all IAM policies with Access Analyzer. Send overly-permissive policies (containing wildcards) to Bedrock. Agent generates a tightened replacement policy.
python
iam_client = boto3.client("iam")

def analyze_policies():
    paginator = iam_client.get_paginator(
        "list_policies")
    overpermissive = []

    for page in paginator.paginate(Scope="Local"):
        for policy in page["Policies"]:
            version = iam_client.get_policy_version(
                PolicyArn=policy["Arn"],
                VersionId=policy["DefaultVersionId"]
            )["PolicyVersion"]["Document"]

            # Check for wildcards
            for stmt in version.get(
                    "Statement", []):
                actions = stmt.get("Action", [])
                if isinstance(actions, str):
                    actions = [actions]
                if any("*" in a for a in actions):
                    overpermissive.append({
                        "arn": policy["Arn"],
                        "name": policy["PolicyName"],
                        "document": version
                    })
    return overpermissive

def suggest_tightened_policy(policy: dict) -> str:
    prompt = f"""This IAM policy is overly permissive:
{json.dumps(policy["document"], indent=2)}

Generate a least-privilege replacement that:
1. Replaces wildcard actions with specific actions
2. Adds resource-level constraints
3. Adds condition keys where appropriate
4. Preserves the intended functionality

Return valid IAM policy JSON only."""
    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    return json.loads(
        resp["body"].read()
    )["content"][0]["text"]
VALIDATE: Run the analyzer and confirm it identifies at least one policy with wildcards. Verify the suggested replacement is valid JSON that passes aws iam create-policy --dry-run.
PITFALL: Auto-applying tightened policies without testing breaks applications. Always deploy to a test account first and monitor for Access Denied errors for 7 days.
7Wire Multi-Channel Alert Routing
Route alerts based on severity: CRITICAL to PagerDuty + Slack + Jira, HIGH to Slack + Jira, MEDIUM to Jira only, LOW to dashboard only.
python
import requests

def route_alert(finding: dict, narrative: str,
    mitre: dict):
    severity = finding["severity"]

    if severity >= 9.0:  # CRITICAL
        send_pagerduty(finding, narrative)
        send_slack(finding, narrative, "#security-critical")
        create_jira_ticket(finding, narrative, "Critical")
    elif severity >= 7.0:  # HIGH
        send_slack(finding, narrative, "#security-alerts")
        create_jira_ticket(finding, narrative, "High")
    elif severity >= 4.0:  # MEDIUM
        create_jira_ticket(finding, narrative, "Medium")
    else:  # LOW
        log_to_dashboard(finding, narrative)

def send_slack(finding, narrative, channel):
    requests.post(os.environ["SLACK_WEBHOOK"], json={
        "channel": channel,
        "blocks": [{
            "type": "section",
            "text": {"type": "mrkdwn",
                "text": f"*Security Alert*\n"
                    f"Severity: {finding['severity']}\n"
                    f"Type: {finding['type']}\n"
                    f"{narrative[:500]}"}
        }]
    })
VALIDATE: Trigger findings at each severity level and confirm: CRITICAL reaches PagerDuty+Slack+Jira, HIGH reaches Slack+Jira, MEDIUM reaches Jira only.
PITFALL: Sending all findings to Slack causes alert fatigue. Reserve Slack for HIGH+ and use Jira for tracking MEDIUM findings that need investigation.
8Implement SOC 2 Compliance Sweep Agent
Daily scheduled Lambda runs AWS Config conformance pack evaluation against SOC 2 Type II controls. Agent compares results and generates a delta report.
python
config_client = boto3.client("config")

def run_compliance_sweep():
    # Get conformance pack results
    results = config_client \
        .get_conformance_pack_compliance_details(
            ConformancePackName=
                "agentforge-soc2-pack",
            Filters={"ComplianceType":
                "NON_COMPLIANT"})

    non_compliant = results[
        "ConformancePackRuleComplianceList"]

    # Generate delta report via Bedrock
    prompt = f"""SOC 2 Type II compliance sweep.
{len(non_compliant)} non-compliant controls found:
{json.dumps(non_compliant[:20], indent=2)}

Generate a compliance delta report with:
1. Summary of non-compliant controls
2. Risk level for each control (1-10)
3. Remediation steps for each
4. Estimated effort to fix (hours)
5. Priority ranking

Return structured JSON."""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4096,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    return json.loads(
        resp["body"].read()
    )["content"][0]["text"]
VALIDATE: Run the sweep and confirm the report lists all non-compliant controls with specific remediation steps and effort estimates.
PITFALL: Running Config rules without a conformance pack causes scattered results. Always group SOC 2 controls into a single conformance pack for unified reporting.
9Build Macie PII Detection Pipeline
Enable Macie on S3 buckets containing customer data. Route PII findings to the agent for classification and remediation recommendations.
python
macie = boto3.client("macie2")

# Enable Macie and create classification job
macie.enable_macie()

job = macie.create_classification_job(
    jobType="SCHEDULED",
    name="AgentForge-PII-Scan",
    s3JobDefinition={
        "bucketDefinitions": [{
            "accountId": os.environ["ACCOUNT_ID"],
            "buckets": [
                "customer-data-bucket",
                "analytics-raw-data"
            ]
        }]
    },
    scheduleFrequencyUpdate={
        "dailySchedule": {}
    },
    managedDataIdentifierSelector="ALL"
)

def handle_macie_finding(finding):
    prompt = f"""Macie PII finding:
Type: {finding["type"]}
Severity: {finding["severity"]["score"]}
Resource: {finding["resourcesAffected"]}
PII types: {finding["classificationDetails"]}

Recommend: 1) immediate action,
2) data handling fix, 3) prevention."""
    return invoke_bedrock(prompt)
VALIDATE: Run a Macie scan on a bucket with test PII data and confirm findings are generated for email addresses, phone numbers, and SSNs.
PITFALL: Macie charges per GB scanned. Exclude known non-sensitive prefixes (logs/, temp/) to reduce costs by 40-60%.
10Add Threat Intelligence Correlation
Enrich findings with IP reputation data from external threat intel feeds. Agent correlates across multiple findings to identify campaign patterns.
python
def correlate_threats(findings: list) -> dict:
    # Group by source IP
    ip_groups = {}
    for f in findings:
        ip = f.get("service", {}).get("action", {}) \
            .get("networkConnectionAction", {}) \
            .get("remoteIpDetails", {}) \
            .get("ipAddressV4", "unknown")
        if ip not in ip_groups:
            ip_groups[ip] = []
        ip_groups[ip].append(f)

    # Identify potential campaigns
    campaigns = []
    for ip, group in ip_groups.items():
        if len(group) >= 3:
            campaigns.append({
                "source_ip": ip,
                "finding_count": len(group),
                "finding_types": list(set(
                    f["type"] for f in group)),
                "time_span_hours":
                    calculate_time_span(group),
                "is_campaign": True
            })

    prompt = f"""Analyze these potential attack campaigns:
{json.dumps(campaigns, indent=2)}

For each campaign:
1. Classify attack type
2. Assess sophistication (1-10)
3. Predict next likely action
4. Recommend countermeasures"""
    return invoke_bedrock(prompt)
VALIDATE: Inject 5 findings from the same IP and confirm the agent identifies them as a campaign with correct attack classification.
PITFALL: Correlating only on IP misses distributed attacks. Also correlate on user-agent, time patterns, and targeted resource types.
11Implement Security Metrics Dashboard
CloudWatch dashboard: findings/day by severity, MTTR by severity, containment success rate, false positive rate, compliance score trend.
bash
aws cloudwatch put-dashboard \
  --dashboard-name "AgentForge-Security" \
  --dashboard-body '{
  "widgets": [
    {"type":"metric","properties":{
      "title":"Findings by Severity (Daily)",
      "metrics":[
        ["AgentForge/Security","Findings",
         "Severity","CRITICAL"],
        ["AgentForge/Security","Findings",
         "Severity","HIGH"],
        ["AgentForge/Security","Findings",
         "Severity","MEDIUM"]
      ],"period":86400}},
    {"type":"metric","properties":{
      "title":"Mean Time to Remediate (hours)",
      "metrics":[
        ["AgentForge/Security","MTTR",
         "Severity","CRITICAL"],
        ["AgentForge/Security","MTTR",
         "Severity","HIGH"]
      ],"stat":"Average"}},
    {"type":"metric","properties":{
      "title":"Containment Success Rate (%)",
      "metrics":[
        ["AgentForge/Security",
         "ContainmentSuccess"]
      ]}},
    {"type":"metric","properties":{
      "title":"Compliance Score Trend",
      "metrics":[
        ["AgentForge/Security",
         "SOC2ComplianceScore"]
      ],"period":86400}}
  ]}'
VALIDATE: Open the dashboard and confirm all 4 widgets render with historical data. Verify MTTR metric decreases over time as automation improves.
PITFALL: Not tracking false positive rate causes the team to distrust the agent. Always measure and report false positive rate alongside detection metrics.
12Build Automated Security Posture Reporting
Weekly agent-generated security posture report covering: new findings, resolved findings, compliance drift, top 5 risks, and recommended actions. Email to CISO stakeholders.
python
def generate_weekly_report():
    # Collect data from multiple sources
    findings = get_weekly_findings()
    compliance = get_compliance_status()
    containment = get_containment_stats()

    prompt = f"""Generate a weekly security
posture report for executive leadership.

New findings this week: {len(findings["new"])}
Resolved findings: {len(findings["resolved"])}
Open findings: {len(findings["open"])}
Compliance score: {compliance["score"]}%
Compliance drift: {compliance["drift"]}

Top findings: {json.dumps(
    findings["new"][:10], indent=2)}

Write a professional report with:
1. Executive Summary (3 sentences)
2. Key Metrics table
3. Top 5 Risks with risk scores
4. Week-over-week trend analysis
5. Recommended Actions (prioritized)
6. Compliance Status by control family"""

    report = invoke_bedrock(prompt)

    # Send via SES
    ses = boto3.client("ses")
    ses.send_email(
        Source="security@company.com",
        Destination={"ToAddresses":
            ["ciso@company.com"]},
        Message={
            "Subject": {"Data":
                "Weekly Security Posture Report"},
            "Body": {"Html": {"Data": report}}
        })
VALIDATE: Generate a test report and confirm it contains all 6 sections with specific metrics, not generic placeholders.
PITFALL: Sending raw finding data to executives overwhelms them. The report must contain an executive summary with trend lines, not a data dump.
Azure Implementation Path

Replace GuardDuty with Microsoft Defender for Cloud for threat detection across Azure resources. Security aggregation via Microsoft Sentinel (SIEM/SOAR) replacing Security Hub. Identity risk analysis via Microsoft Entra ID Protection. CloudTrail equivalent: Azure Activity Log + Azure Monitor. Auto-containment via Azure Logic Apps playbooks triggered by Sentinel. Compliance via Microsoft Defender for Cloud regulatory compliance dashboard (CIS, SOC 2, ISO 27001 built-in). IAM analysis via Entra Permissions Management.

Defender for Cloud Microsoft Sentinel Entra ID Protection Azure Monitor Logic Apps (SOAR) Entra Permissions Management
bash
az sentinel alert-rule create \
  --resource-group agentforge-security \
  --workspace-name agentforge-sentinel \
  --rule-name "HighSeverityThreat" \
  --severity High \
  --query "SecurityAlert | where AlertSeverity == 'High'"

az security assessment create \
  --name "SOC2-Compliance" \
  --status-code "Unhealthy"
GCP Implementation Path

Replace GuardDuty with Security Command Center (SCC) Premium for threat detection. SIEM via Chronicle Security Operations replacing Security Hub. Identity analysis via IAM Recommender + Policy Intelligence. Audit logging via Cloud Audit Logs. Auto-containment via Cloud Functions triggered by SCC notifications. Compliance via SCC Compliance Reports (CIS, PCI DSS, NIST). Secrets management via Secret Manager.

Security Command Center Chronicle SIEM IAM Recommender Cloud Audit Logs Policy Intelligence SCC Compliance
bash
gcloud scc notifications create agentforge-alerts \
  --pubsub-topic=projects/PROJECT/topics/scc-findings \
  --filter='severity="HIGH" OR severity="CRITICAL"'

gcloud policy-intelligence lint-policy \
  --policy-file=policy.json \
  --resource="//cloudresourcemanager.googleapis.com/projects/PROJECT"

gcloud iam recommender recommendations list \
  --project=PROJECT --location=global
PRODUCTION CHECKLIST — NODE 05
Estimated Lab Time: 6–8 hours (Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 5.1
GuardDuty finding enrichment and narrative generator
Subscribe to GuardDuty findings via EventBridge. For each finding, call Bedrock to generate a plain-English explanation with: what happened, affected resource, recommended action, and MITRE TTP mapping.
Intermediate Exercise 5.2
IAM policy least-privilege analyser
Parse IAM policies using Boto3. Send overly-permissive policies to a Bedrock agent that suggests a tighter replacement policy, explains the risk reduction, and outputs valid JSON for immediate apply.
aws iam get-policy-version --policy-arn arn:aws:iam::...
--version-id v1 | python policy_agent.py
Advanced Exercise 5.3
Automated containment: isolate compromised EC2 on HIGH finding
When GuardDuty severity ≥ 7.0, auto-remove the instance from its security group, capture a memory snapshot to S3, revoke active sessions, and page the SOC — all within 90 seconds of the alert.
Stretch Exercise 5.4
Continuous compliance agent for SOC 2 controls
Build an agent that runs daily, checks all 35 SOC 2 Type II controls across your AWS account using Config Rules, generates a compliance delta report, and opens Jira tickets for newly failed controls.
06🏥
INDUSTRY-SPECIFIC AGENTS
Domain-aware agents for healthcare, retail, media, and enterprise sales and supply chain — trained on industry vocabularies and compliant with sector-specific regulations.
HealthLake AI Amazon Personalize Nuance DAX Dynamics 365 Copilot Vertex AI Healthcare NLP
Architecture Pattern
HEALTHCARE AGENT FLOW
Clinical note input → Comprehend Medical [NER]
FHIR R4 entity extraction → HealthLake [structure]
PHI detection → Macie + redaction [comply]
Clinical reasoning → Bedrock + SNOMED [reason]
Summary → EHR system via SMART on FHIR [deliver]
Key Concepts
→ HIPAA / GDPR data handling
→ PHI detection and redaction pipelines
→ Domain-specific ontologies (SNOMED, ICD-10)
→ Personalisation with behavioral signals
→ Audit trails for regulated industries
Implementation Steps
PREREQUISITES
  • Amazon Comprehend Medical API access enabled
  • HealthLake datastore created (FHIR R4)
  • S3 bucket with 50 sample clinical notes (de-identified)
  • Amazon Personalize dataset group created
  • Macie enabled for PHI detection
  • DynamoDB table for HIPAA audit logs (PK: accessId, SK: timestamp)
  • Bedrock model access: Claude 3 Sonnet
  • SNOMED CT and ICD-10 reference files in S3
1Build the Clinical Note Ingestion Pipeline
Ingest clinical notes from S3. Use Amazon Comprehend Medical to extract medical entities including conditions, medications, procedures, and anatomical terms with confidence scores.
python
import boto3, json

comprehend_medical = boto3.client(
    "comprehendmedical")
s3 = boto3.client("s3")

def extract_medical_entities(note_text: str):
    resp = comprehend_medical.detect_entities_v2(
        Text=note_text)

    entities = {
        "conditions": [],
        "medications": [],
        "procedures": [],
        "anatomy": [],
        "test_results": []
    }
    for entity in resp["Entities"]:
        category = entity["Category"]
        entry = {
            "text": entity["Text"],
            "score": entity["Score"],
            "type": entity["Type"],
            "traits": [t["Name"]
                for t in entity.get("Traits", [])],
            "attributes": [
                {"type": a["Type"],
                 "text": a["Text"],
                 "score": a["Score"]}
                for a in entity.get("Attributes", [])
            ]
        }
        if category == "MEDICAL_CONDITION":
            entities["conditions"].append(entry)
        elif category == "MEDICATION":
            entities["medications"].append(entry)
        elif category == "PROCEDURE":
            entities["procedures"].append(entry)
        elif category == "ANATOMY":
            entities["anatomy"].append(entry)
        elif category == "TEST_TREATMENT_PROCEDURE":
            entities["test_results"].append(entry)

    return entities
VALIDATE: Process 10 sample notes and confirm entity extraction returns conditions, medications, and procedures with confidence scores > 0.8.
PITFALL: Comprehend Medical has a 20KB per-request text limit. Split long notes into sections and process each separately, then merge results.
2Implement PHI Detection and Redaction
Run each note through Comprehend Medical PHI detection. Redact all 18 HIPAA identifier categories. Store redacted and original versions separately with different access controls.
python
def detect_and_redact_phi(note_text: str):
    resp = comprehend_medical.detect_phi(
        Text=note_text)

    phi_entities = resp["Entities"]
    redacted = note_text

    # Sort by offset descending to preserve positions
    sorted_phi = sorted(phi_entities,
        key=lambda e: e["BeginOffset"],
        reverse=True)

    phi_log = []
    for entity in sorted_phi:
        start = entity["BeginOffset"]
        end = entity["EndOffset"]
        phi_type = entity["Type"]
        original = note_text[start:end]

        # Replace with type tag
        replacement = f"[{phi_type}_REDACTED]"
        redacted = (redacted[:start]
            + replacement + redacted[end:])

        phi_log.append({
            "type": phi_type,
            "score": entity["Score"],
            "offset": start,
            "length": end - start
        })

    return {
        "redacted_text": redacted,
        "phi_count": len(phi_entities),
        "phi_types": list(set(
            e["Type"] for e in phi_entities)),
        "phi_log": phi_log
    }
VALIDATE: Process a note containing a patient name, date of birth, and SSN. Confirm all three are replaced with [NAME_REDACTED], [DATE_REDACTED], [SSN_REDACTED].
PITFALL: Regex-based redaction misses 15-20% of PHI variants. Always use Comprehend Medical PHI detection as the primary method, with regex as a secondary fallback.
3Map Entities to FHIR R4 Resources
Transform extracted medical entities into FHIR R4 resources (Condition, MedicationStatement, Procedure). Store in HealthLake for standardized querying.
python
healthlake = boto3.client("healthlake")

def to_fhir_condition(entity: dict,
    patient_id: str) -> dict:
    return {
        "resourceType": "Condition",
        "subject": {
            "reference": f"Patient/{patient_id}"
        },
        "code": {
            "coding": [{
                "system":
                    "http://snomed.info/sct",
                "code": lookup_snomed(
                    entity["text"]),
                "display": entity["text"]
            }],
            "text": entity["text"]
        },
        "clinicalStatus": {
            "coding": [{
                "system": "http://terminology"
                    ".hl7.org/CodeSystem/"
                    "condition-clinical",
                "code": "active"
            }]
        },
        "verificationStatus": {
            "coding": [{
                "system": "http://terminology"
                    ".hl7.org/CodeSystem/"
                    "condition-ver-status",
                "code": "confirmed"
                    if entity["score"] > 0.9
                    else "provisional"
            }]
        }
    }

def store_in_healthlake(resource: dict,
    datastore_id: str):
    resp = healthlake.create_resource(
        DatastoreId=datastore_id,
        ResourceType=resource["resourceType"],
        ResourceBody=json.dumps(resource))
    return resp["ResourceId"]
VALIDATE: Create a FHIR Condition resource from an extracted entity and confirm it validates against the FHIR R4 schema. Verify storage in HealthLake.
PITFALL: Mapping entity text directly to SNOMED codes without validation produces invalid codes. Always validate against the SNOMED CT reference before storing.
4Build the Clinical Reasoning Agent
Send redacted context + FHIR data to Bedrock with SNOMED/ICD-10 ontology grounding. Agent generates clinical summaries with coded references that clinicians can verify.
python
def clinical_summary(patient_id: str,
    redacted_note: str,
    fhir_conditions: list) -> str:
    conditions_text = json.dumps(
        fhir_conditions[:10], indent=2)

    prompt = f"""You are a clinical documentation
assistant. Generate a structured clinical summary.

Redacted clinical note:
{redacted_note[:4000]}

FHIR Conditions on file:
{conditions_text}

Requirements:
1. Summarize in 3-5 sentences
2. Reference SNOMED codes for all conditions
3. List active medications with dosages
4. Note any drug interactions
5. Flag conditions needing follow-up
6. Use professional clinical language
7. NEVER attempt to diagnose - only summarize

Format as structured clinical note."""

    resp = bedrock.invoke_model(
        modelId="anthropic.claude-3-sonnet-20240229-v1:0",
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 2048,
            "messages": [{"role": "user",
                          "content": prompt}]}))
    return json.loads(
        resp["body"].read()
    )["content"][0]["text"]
VALIDATE: Generate a summary for a note with 3 conditions and 2 medications. Confirm SNOMED codes are valid and drug interactions are flagged.
PITFALL: Allowing the agent to diagnose creates liability. The prompt must explicitly state "summarize only, never diagnose" and the output must carry a disclaimer.
5Implement HIPAA Audit Logging
Log every data access: who accessed what, when, and why. Store in DynamoDB with CloudTrail integration. Generate audit reports on demand for compliance review.
python
import uuid
from datetime import datetime

audit_table = dynamodb.Table("HIPAAAuditLog")

def log_access(user_id: str, resource_type: str,
    resource_id: str, action: str,
    purpose: str, phi_accessed: bool):
    audit_table.put_item(Item={
        "accessId": str(uuid.uuid4()),
        "timestamp": datetime.utcnow().isoformat(),
        "userId": user_id,
        "resourceType": resource_type,
        "resourceId": resource_id,
        "action": action,
        "purpose": purpose,
        "phiAccessed": phi_accessed,
        "sourceIp": get_client_ip(),
        "sessionId": get_session_id(),
        "ttl": int((datetime.utcnow()
            + timedelta(days=2555)).timestamp())
    })

# Decorator for automatic audit logging
def hipaa_audited(resource_type):
    def decorator(func):
        def wrapper(*args, **kwargs):
            log_access(
                get_current_user(),
                resource_type,
                kwargs.get("resource_id", "N/A"),
                func.__name__,
                kwargs.get("purpose", "clinical"),
                True)
            return func(*args, **kwargs)
        return wrapper
    return decorator

@hipaa_audited("Condition")
def get_patient_conditions(resource_id,
    purpose="treatment"):
    return healthlake.read_resource(...)
VALIDATE: Access 5 patient records and confirm 5 audit log entries appear in DynamoDB with correct user, resource, and timestamp. Verify TTL is set to 7 years.
PITFALL: Setting TTL below 6 years violates HIPAA retention requirements. Always set TTL to at least 2555 days (7 years) for healthcare audit logs.
6Build the Retail Personalization Agent
Train Amazon Personalize on clickstream data. Wire recommendation scores into a Bedrock agent that generates personalized product narratives for each user.
python
personalize = boto3.client("personalize")
personalize_runtime = boto3.client(
    "personalize-runtime")

# Get recommendations
def get_recommendations(user_id: str,
    num_results: int = 5) -> list:
    resp = personalize_runtime \
        .get_recommendations(
            campaignArn=os.environ[
                "PERSONALIZE_CAMPAIGN_ARN"],
            userId=user_id,
            numResults=num_results)
    return resp["itemList"]

# Generate narrative with Bedrock
def personalized_narrative(user_id: str,
    user_context: dict) -> str:
    recs = get_recommendations(user_id)
    items = [get_product_details(r["itemId"])
             for r in recs]

    prompt = f"""Generate a personalized product
recommendation for this user.

User profile: {json.dumps(user_context)}
Recommended products:
{json.dumps(items, indent=2)}

Write a warm, conversational 3-paragraph
recommendation that:
1. References the user's recent activity
2. Explains why each product fits
3. Suggests a bundle deal if applicable"""

    return invoke_bedrock(prompt)
VALIDATE: Generate recommendations for 3 different user profiles and confirm each narrative references user-specific activity and different product sets.
PITFALL: Exposing raw Personalize scores to the user breaks trust. Always translate scores into natural language explanations ("because you recently viewed...").
7Create the Supply Chain Exception Monitor
Build a Kinesis stream for supply chain events. Agent monitors for disruptions, classifies severity, and suggests alternatives from a Knowledge Base.
python
kinesis = boto3.client("kinesis")

def process_supply_chain_event(record):
    event = json.loads(record["Data"])
    event_type = event["type"]

    severity_map = {
        "shipment_delayed": "MEDIUM",
        "supplier_outage": "HIGH",
        "port_closure": "CRITICAL",
        "quality_recall": "CRITICAL",
        "price_spike": "LOW"
    }

    severity = severity_map.get(
        event_type, "MEDIUM")

    if severity in ("HIGH", "CRITICAL"):
        prompt = f"""Supply chain disruption:
Type: {event_type}
Supplier: {event["supplier"]}
Product: {event["product"]}
Region: {event["region"]}
ETA Impact: {event["eta_impact_days"]} days

From the knowledge base, suggest:
1. Alternative suppliers for this product
2. Estimated lead time from each alternative
3. Price impact estimate
4. Risk of switching suppliers"""

        analysis = invoke_bedrock(prompt)
        send_alert(severity, event, analysis)

    return {"severity": severity,
            "processed": True}
VALIDATE: Inject a "supplier_outage" event and confirm the agent identifies alternative suppliers from the Knowledge Base with lead time estimates.
PITFALL: Processing every Kinesis record synchronously causes throughput bottleneck. Use batch processing with a window of 100 records or 60 seconds.
8Implement Domain Ontology Grounding
Load SNOMED CT and ICD-10 mappings from S3. Agent validates all medical codes in generated summaries. Reject outputs containing invalid codes.
python
import csv

def load_snomed_index(s3_path: str) -> dict:
    """Load SNOMED CT concept file into lookup"""
    obj = s3.get_object(
        Bucket="ontology-bucket",
        Key=s3_path)
    reader = csv.DictReader(
        obj["Body"].read().decode().splitlines(),
        delimiter="\t")
    return {row["conceptId"]: row["term"]
            for row in reader
            if row["active"] == "1"}

SNOMED_INDEX = load_snomed_index(
    "snomed/sct2_Description.txt")

def validate_medical_codes(summary: str) -> dict:
    import re
    codes = re.findall(
        r'SNOMED:\s*(\d+)', summary)
    icd_codes = re.findall(
        r'ICD-10:\s*([A-Z]\d{2}\.?\d*)',
        summary)

    invalid_snomed = [c for c in codes
        if c not in SNOMED_INDEX]
    valid_snomed = [c for c in codes
        if c in SNOMED_INDEX]

    return {
        "valid_snomed": len(valid_snomed),
        "invalid_snomed": invalid_snomed,
        "icd_codes_found": len(icd_codes),
        "all_valid": len(invalid_snomed) == 0
    }
VALIDATE: Generate a summary with 5 SNOMED codes and confirm all 5 are validated against the reference. Inject an invalid code and confirm it is flagged.
PITFALL: Caching the full SNOMED CT file in Lambda memory requires 2GB+. Use a DynamoDB lookup table with only active concepts for Lambda deployments.
9Build the Patient Summary Generator
Agent generates one-page patient summaries from FHIR data: active conditions, current medications, recent procedures, allergies, and care plan.
python
def generate_patient_summary(patient_id: str):
    # Fetch all FHIR resources for patient
    conditions = healthlake_query(
        f"Condition?subject=Patient/{patient_id}"
        f"&clinical-status=active")
    meds = healthlake_query(
        f"MedicationStatement?"
        f"subject=Patient/{patient_id}"
        f"&status=active")
    allergies = healthlake_query(
        f"AllergyIntolerance?"
        f"subject=Patient/{patient_id}")
    procedures = healthlake_query(
        f"Procedure?subject=Patient/{patient_id}"
        f"&date=ge2024-01-01")

    prompt = f"""Generate a one-page patient summary.

Active Conditions: {json.dumps(conditions[:10])}
Current Medications: {json.dumps(meds[:10])}
Allergies: {json.dumps(allergies)}
Recent Procedures: {json.dumps(procedures[:5])}

Format as:
PATIENT SUMMARY
===============
Active Problems: [bulleted list with SNOMED]
Medications: [name, dose, frequency]
Allergies: [substance, reaction, severity]
Recent Procedures: [date, type, outcome]
Care Plan: [recommended next steps]

Include drug-allergy interaction warnings."""

    summary = invoke_bedrock(prompt)
    log_access(get_current_user(), "Patient",
        patient_id, "generate_summary",
        "treatment", True)
    return summary
VALIDATE: Generate a summary for a patient with 3 conditions, 2 meds, and 1 allergy. Confirm drug-allergy interactions are flagged if present.
PITFALL: Generating summaries without checking drug-allergy interactions misses critical safety signals. Always include interaction checks in the prompt.
10Add Regulatory Compliance Validation
Automated checks against HIPAA Security Rule §164.312: access controls, audit controls, integrity controls, and transmission security.
python
HIPAA_CONTROLS = {
    "164.312(a)(1)": {
        "name": "Access Control",
        "checks": [
            ("unique_user_ids",
             "Every user has unique ID"),
            ("emergency_access",
             "Emergency access procedure exists"),
            ("auto_logoff",
             "Session timeout <= 15 minutes"),
            ("encryption_at_rest",
             "PHI encrypted at rest with AES-256")
        ]
    },
    "164.312(b)": {
        "name": "Audit Controls",
        "checks": [
            ("audit_logging",
             "All PHI access logged"),
            ("audit_retention",
             "Logs retained >= 6 years"),
            ("audit_review",
             "Weekly audit log review")
        ]
    },
    "164.312(c)(1)": {
        "name": "Integrity",
        "checks": [
            ("data_integrity",
             "PHI integrity verified on access"),
            ("tamper_detection",
             "Unauthorized changes detected")
        ]
    }
}

def run_hipaa_validation() -> dict:
    results = {}
    for control_id, control in \
            HIPAA_CONTROLS.items():
        checks = []
        for check_id, desc in control["checks"]:
            passed = run_check(check_id)
            checks.append({
                "check": check_id,
                "description": desc,
                "passed": passed})
        results[control_id] = {
            "name": control["name"],
            "checks": checks,
            "compliant": all(
                c["passed"] for c in checks)
        }
    return results
VALIDATE: Run the validation and confirm it checks all HIPAA §164.312 sub-controls. Verify non-compliant controls are flagged with specific remediation.
PITFALL: Checking compliance only at deployment time misses configuration drift. Run HIPAA validation daily and alert on any newly failed controls.
11Implement Cross-Industry Template System
Parameterize the agent for healthcare/retail/supply chain. Single codebase with industry-specific configs, ontologies, and compliance rules.
python
INDUSTRY_CONFIGS = {
    "healthcare": {
        "ontology": "snomed_ct",
        "compliance": "hipaa",
        "phi_detection": True,
        "audit_retention_days": 2555,
        "entity_extractor": "comprehend_medical",
        "output_format": "fhir_r4"
    },
    "retail": {
        "ontology": None,
        "compliance": "pci_dss",
        "phi_detection": False,
        "audit_retention_days": 365,
        "entity_extractor": "comprehend",
        "output_format": "json"
    },
    "supply_chain": {
        "ontology": "gs1",
        "compliance": "sox",
        "phi_detection": False,
        "audit_retention_days": 2555,
        "entity_extractor": "comprehend",
        "output_format": "json"
    }
}

class IndustryAgent:
    def __init__(self, industry: str):
        self.config = INDUSTRY_CONFIGS[industry]
        self.industry = industry

    def process(self, input_data: dict):
        if self.config["phi_detection"]:
            input_data = self.redact_phi(input_data)
        entities = self.extract_entities(input_data)
        self.audit_log(input_data, entities)
        return self.generate_output(entities)
VALIDATE: Instantiate the agent for each of the 3 industries and confirm each uses the correct ontology, compliance framework, and audit retention.
PITFALL: Hard-coding industry logic in if/else branches makes adding new industries expensive. Always use a config-driven pattern for industry switching.
12Deploy Production Monitoring
Dashboard: notes processed/day, PHI detection accuracy, FHIR mapping success rate, audit log completeness, compliance score by control family.
bash
aws cloudwatch put-dashboard \
  --dashboard-name "AgentForge-Industry" \
  --dashboard-body '{
  "widgets": [
    {"type":"metric","properties":{
      "title":"Notes Processed / Day",
      "metrics":[
        ["AgentForge/Industry",
         "NotesProcessed"]
      ],"period":86400}},
    {"type":"metric","properties":{
      "title":"PHI Detection Rate",
      "metrics":[
        ["AgentForge/Industry",
         "PHIDetected"],
        ["AgentForge/Industry",
         "PHIMissed"]
      ]}},
    {"type":"metric","properties":{
      "title":"FHIR Mapping Success %",
      "metrics":[
        ["AgentForge/Industry",
         "FHIRMappingSuccess"]
      ]}},
    {"type":"metric","properties":{
      "title":"HIPAA Compliance Score",
      "metrics":[
        ["AgentForge/Industry",
         "ComplianceScore"]
      ],"period":86400}}
  ]}'
VALIDATE: Open the dashboard and confirm all widgets render. Verify PHI detection rate is > 98% and FHIR mapping success is > 95%.
PITFALL: Not tracking PHI detection misses (false negatives) creates compliance risk. Always measure both detection rate AND miss rate with manual spot-checks.
Azure Implementation Path

Replace Comprehend Medical with Azure AI Health Text Analytics for medical NER and entity linking. FHIR storage via Azure Health Data Services (FHIR R4 server). Clinical documentation via Nuance DAX Copilot for ambient listening and note generation. PHI detection via Azure AI Language PII detection (healthcare category). Retail personalization via Azure AI Personalizer. Supply chain via Dynamics 365 Supply Chain Management + Copilot. HIPAA compliance managed through Azure Compliance Manager.

Health Text Analytics Azure Health Data Services Nuance DAX Copilot Azure AI Personalizer Dynamics 365 SCM Compliance Manager
python
from azure.ai.textanalytics import TextAnalyticsClient

client = TextAnalyticsClient(
    endpoint="https://agentforge.cognitiveservices.azure.com",
    credential=credential)

result = client.begin_analyze_healthcare_entities(
    [clinical_note_text])
for entity in result[0].entities:
    print(f"{entity.text}: {entity.category} "
          f"({entity.confidence_score:.2f})")
    for link in entity.data_sources:
        print(f"  -> {link.entity_id} ({link.name})")
GCP Implementation Path

Replace Comprehend Medical with Vertex AI Healthcare NLP API (Healthcare Natural Language API) for medical entity extraction and relationship detection. FHIR storage via Cloud Healthcare API (FHIR R4 store). Clinical reasoning via Vertex AI (Gemini Pro) with Med-PaLM grounding. PHI detection via Cloud DLP with healthcare infoTypes. Retail personalization via Vertex AI Recommendations. Supply chain via Supply Chain Twin on GCP.

Healthcare NLP API Cloud Healthcare API Vertex AI (Med-PaLM) Cloud DLP Vertex AI Recommendations
python
from google.cloud import healthcare_v1

client = healthcare_v1.FhirServiceClient()
fhir_store = (f"projects/{PROJECT}/locations/{LOCATION}"
    f"/datasets/{DATASET}/fhirStores/{FHIR_STORE}")

# Create FHIR Condition resource
response = client.create_resource(
    parent=fhir_store,
    type="Condition",
    body=json.dumps(fhir_condition))

# NLP analysis via Healthcare NLP
nlp_client = healthcare_v1.CloudHealthcareNlpServiceClient()
result = nlp_client.analyze_entities(
    nlp_service=f"projects/{PROJECT}/locations/{LOCATION}"
        f"/services/nlp",
    document_content=clinical_note_text)
PRODUCTION CHECKLIST — NODE 06
Estimated Lab Time: 6–8 hours (Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 6.1
Clinical note summariser with PHI redaction
Use Amazon Comprehend Medical to extract medical entities from 20 sample notes. Before sending to Bedrock, redact all PHI categories. Generate a one-paragraph clinical summary per note.
Intermediate Exercise 6.2
Retail personalisation agent with Amazon Personalize
Train a Personalize model on synthetic clickstream data. Wire it into a Bedrock agent that generates a personalised product narrative for each user, blending recommendation scores with LLM storytelling.
Advanced Exercise 6.3
Supply chain exception agent with real-time alerting
Simulate a supply chain event stream. Build an agent that monitors for disruptions, classifies severity, suggests alternative suppliers using a Bedrock Knowledge Base, and sends alerts with ETA impact analysis.
Stretch Exercise 6.4
HIPAA-compliant audit agent with HealthLake FHIR queries
Build a Bedrock agent that queries HealthLake FHIR R4 resources using natural language. Implement a full HIPAA audit log for every access. Validate against the HIPAA Security Rule §164.312 checklist.
07🔧
INFRASTRUCTURE & ORCHESTRATION AGENTS
Autonomous cloud ops, auto-healing clusters, FinOps optimization, and cross-cloud governance — agents that keep infrastructure running, lean, and compliant without human paging.
EventBridge Ops Agents Auto-Healing EKS Cost Explorer + LLM Azure Automanage AI AutoOps for GKE FinOps Copilot
Architecture Pattern
AUTO-HEALING FLOW
CloudWatch alarm → EventBridge rule [detect]
Lambda → fetch metrics + pod logs [gather]
Bedrock agent → root cause classify [diagnose]
Remediation: restart / scale / rollback [heal]
Post-incident report → Confluence [document]
Key Concepts
→ Prometheus alert → agent runbook execution
→ EKS pod failure classification + kubectl fix
→ Cost anomaly detection + rightsizing
→ Reserved Instance purchasing agent
→ Cross-cloud cost normalization
Implementation Steps
PREREQUISITES
  • CloudWatch configured with detailed monitoring on target resources
  • EventBridge rules enabled in the ops account
  • EKS cluster with Prometheus + AlertManager deployed
  • IAM role with SSM Automation, EC2, EKS, Cost Explorer permissions
  • AWS FIS experiment templates created for chaos testing
  • Bedrock model access: Claude 3 Sonnet
  • Slack webhook for ops alerts; Confluence API token for post-incident docs
1Set Up CloudWatch Alarms for Infrastructure Metrics
Create composite alarms that trigger on CPU, memory, error rate, and disk I/O thresholds. Use anomaly detection bands rather than static thresholds for dynamic workloads.
bash
# Create anomaly detection alarm
aws cloudwatch put-metric-alarm \
  --alarm-name "AgentForge-CPU-Anomaly" \
  --metrics '[
    {"Id":"m1","MetricStat":{
      "Metric":{"Namespace":"AWS/EC2",
        "MetricName":"CPUUtilization",
        "Dimensions":[{"Name":"AutoScalingGroupName",
          "Value":"agentforge-asg"}]},
      "Period":300,"Stat":"Average"}},
    {"Id":"ad1","Expression":
      "ANOMALY_DETECTION_BAND(m1, 2)"}
  ]' \
  --comparison-operator \
    LessThanLowerOrGreaterThanUpperThreshold \
  --threshold-metric-id "ad1" \
  --evaluation-periods 3 \
  --alarm-actions \
    "arn:aws:sns:us-east-1:ACCOUNT:OpsAlerts" \
  --treat-missing-data notBreaching

# Composite alarm: CPU AND error rate
aws cloudwatch put-composite-alarm \
  --alarm-name "AgentForge-Critical-Composite" \
  --alarm-rule 'ALARM("AgentForge-CPU-Anomaly") AND ALARM("AgentForge-ErrorRate-High")'
VALIDATE: Trigger a CPU spike on a test instance and confirm the anomaly detection alarm fires within 15 minutes.
PITFALL: Static threshold alarms (e.g., CPU > 80%) generate false positives on autoscaling events. Use ANOMALY_DETECTION_BAND for dynamic workloads.
2Create EventBridge Rule to Route Alarm Payloads
Route CloudWatch alarm state changes to a Lambda handler that enriches the alarm with instance metadata and recent metrics before sending to the Bedrock agent.
python
# EventBridge rule
aws events put-rule \
  --name "CW-Alarm-To-Agent" \
  --event-pattern '{
    "source": ["aws.cloudwatch"],
    "detail-type": ["CloudWatch Alarm State Change"],
    "detail": {
      "state": {"value": ["ALARM"]}
    }
  }' --state ENABLED

# Lambda enrichment handler
def enrich_alarm(event, context):
    alarm = event["detail"]
    alarm_name = alarm["alarmName"]
    metric = alarm["configuration"]["metrics"][0]

    # Fetch recent metric data
    cw = boto3.client("cloudwatch")
    data = cw.get_metric_data(
        MetricDataQueries=[{
            "Id": "m1",
            "MetricStat": metric["metricStat"],
            "ReturnData": True
        }],
        StartTime=datetime.utcnow()
            - timedelta(hours=1),
        EndTime=datetime.utcnow())

    return {
        "alarm_name": alarm_name,
        "current_state": alarm["state"]["value"],
        "reason": alarm["state"]["reason"],
        "metric_values": data["MetricDataResults"],
        "timestamp": alarm["state"]["timestamp"]
    }
VALIDATE: Trigger an alarm and confirm the Lambda receives the event, enriches it with 1 hour of metric history, and passes it to the agent.
PITFALL: Not enriching alarms with metric history forces the agent to make decisions without trend data. Always include at least 1 hour of metric values.
3Build the Runbook Selection Agent
Lambda passes enriched alarm context to a Bedrock agent that selects the appropriate SSM Automation runbook based on the alarm type, affected resource, and historical pattern.
python
RUNBOOKS = {
    "high_cpu": "AWS-RestartEC2Instance",
    "high_memory": "Custom-IncreaseASGCapacity",
    "disk_full": "Custom-CleanupDiskSpace",
    "error_rate": "Custom-RollbackDeployment",
    "oom_killed": "Custom-PatchK8sMemoryLimits",
    "connection_timeout": "Custom-RestartService"
}

def select_and_execute_runbook(enriched_alarm):
    prompt = f"""You are an SRE agent. Analyze this
alarm and select the best remediation runbook.

Alarm: {json.dumps(enriched_alarm, indent=2)}

Available runbooks:
{json.dumps(RUNBOOKS, indent=2)}

Return JSON:
{{"runbook": "runbook_name",
  "parameters": {{}},
  "confidence": 0.0-1.0,
  "reasoning": "why this runbook",
  "requires_approval": true/false}}"""

    decision = invoke_bedrock(prompt)
    parsed = json.loads(decision)

    if parsed["confidence"] > 0.85 \
        and not parsed["requires_approval"]:
        execute_runbook(
            parsed["runbook"],
            parsed["parameters"])
    else:
        request_approval(parsed)

    return parsed
VALIDATE: Trigger a high_cpu alarm and confirm the agent selects AWS-RestartEC2Instance with confidence > 0.85 and executes the runbook.
PITFALL: Auto-executing runbooks without a confidence threshold causes wrong remediation. Always require confidence > 0.85 for auto-execution and route lower confidence to human approval.
4Build the FinOps Cost Anomaly Agent
Pull Cost Explorer API data daily. Agent identifies services with cost spikes >20% month-over-month, recommends rightsizing actions, and estimates monthly savings.
python
ce = boto3.client("ce")

def analyze_costs():
    now = datetime.utcnow()
    current_month = now.strftime("%Y-%m-01")
    prev_month = (now - timedelta(days=30)) \
        .strftime("%Y-%m-01")

    resp = ce.get_cost_and_usage(
        TimePeriod={
            "Start": prev_month,
            "End": current_month},
        Granularity="MONTHLY",
        Metrics=["BlendedCost"],
        GroupBy=[{"Type": "DIMENSION",
                  "Key": "SERVICE"}])

    # Compare with previous month
    current = resp["ResultsByTime"][-1]["Groups"]
    previous = resp["ResultsByTime"][0]["Groups"]

    spikes = []
    for svc in current:
        name = svc["Keys"][0]
        curr_cost = float(
            svc["Metrics"]["BlendedCost"]["Amount"])
        prev_svc = next(
            (p for p in previous
             if p["Keys"][0] == name), None)
        prev_cost = float(
            prev_svc["Metrics"]["BlendedCost"]
            ["Amount"]) if prev_svc else 0

        if prev_cost > 0:
            change_pct = ((curr_cost - prev_cost)
                / prev_cost) * 100
            if change_pct > 20:
                spikes.append({
                    "service": name,
                    "current": curr_cost,
                    "previous": prev_cost,
                    "change_pct": round(change_pct, 1)
                })

    if spikes:
        return get_rightsizing_recs(spikes)
    return {"status": "no_anomalies"}
VALIDATE: Confirm the agent detects a >20% MoM spike on a test service and generates rightsizing recommendations with estimated monthly savings in USD.
PITFALL: Comparing only BlendedCost misses RI/SP coverage changes. Also check UnblendedCost and AmortizedCost for accurate analysis.
5Generate Rightsizing Recommendations
Agent analyzes EC2 instance utilization, RDS instance metrics, and Lambda concurrency to recommend instance type changes, reserved instances, and savings plans.
python
def get_rightsizing_recs(spikes):
    # Fetch EC2 rightsizing from Cost Explorer
    rs = ce.get_rightsizing_recommendation(
        Service="AmazonEC2",
        Configuration={
            "RecommendationTarget": "SAME_INSTANCE_FAMILY",
            "BenefitsConsidered": True})

    prompt = f"""You are a FinOps analyst.

Cost spikes detected:
{json.dumps(spikes, indent=2)}

AWS Rightsizing recommendations:
{json.dumps(rs["RightsizingRecommendations"][:10],
    indent=2)}

For each spike:
1. Root cause analysis
2. Specific rightsizing action
3. Estimated monthly savings (USD)
4. Implementation risk (LOW/MEDIUM/HIGH)
5. Recommended timeline

Also suggest:
- Reserved Instance opportunities
- Savings Plan coverage gaps
- Spot Instance candidates"""

    return invoke_bedrock(prompt)
VALIDATE: Run analysis on an account with at least 5 EC2 instances and confirm savings recommendations total > $0 with specific instance type changes.
PITFALL: Recommending Spot for stateful workloads causes data loss. Always classify workloads as stateful/stateless before recommending Spot instances.
6Build EKS Auto-Healing for OOMKilled Pods
Monitor EKS for OOMKilled events via Prometheus AlertManager webhook. Route to a Bedrock agent that reads the deployment manifest, calculates safe memory limits, and patches the deployment.
python
import subprocess, json, yaml

def handle_oom_alert(alert):
    namespace = alert["labels"]["namespace"]
    pod = alert["labels"]["pod"]
    container = alert["labels"]["container"]

    # Get current deployment manifest
    deploy_name = pod.rsplit("-", 2)[0]
    manifest = json.loads(subprocess.run(
        ["kubectl", "get", "deployment",
         deploy_name, "-n", namespace,
         "-o", "json"],
        capture_output=True, text=True).stdout)

    # Get actual memory usage from Prometheus
    current_limit = get_container_limit(
        manifest, container)

    prompt = f"""EKS pod OOMKilled.
Deployment: {deploy_name}
Container: {container}
Current memory limit: {current_limit}
Pod restarts (24h): {alert["annotations"]["restarts"]}

Calculate a safe memory limit:
1. Current limit + 50% headroom
2. Never exceed node capacity
3. Consider other pods on the node

Return JSON:
{{"new_limit": "512Mi",
  "new_request": "256Mi",
  "reasoning": "..."}}"""

    rec = json.loads(invoke_bedrock(prompt))

    # Patch the deployment
    patch = {
        "spec": {"template": {"spec": {
            "containers": [{
                "name": container,
                "resources": {
                    "limits": {
                        "memory": rec["new_limit"]},
                    "requests": {
                        "memory": rec["new_request"]}
                }}]}}}
    }
    subprocess.run([
        "kubectl", "patch", "deployment",
        deploy_name, "-n", namespace,
        "--type", "strategic",
        "-p", json.dumps(patch)])
    return rec
VALIDATE: Trigger an OOMKilled event on a test pod and confirm the agent patches the deployment with a higher memory limit and the pod restarts successfully.
PITFALL: Setting memory limits without also setting requests causes the Kubernetes scheduler to overcommit nodes. Always set both limits and requests.
7Add Chaos Engineering with AWS FIS
Use AWS Fault Injection Service to inject faults on a schedule. Build a companion Bedrock agent that monitors the system during each experiment and triggers compensating actions.
python
fis = boto3.client("fis")

def run_chaos_experiment(template_id: str):
    # Start FIS experiment
    experiment = fis.start_experiment(
        experimentTemplateId=template_id,
        tags={"agentforge": "chaos-test"})
    exp_id = experiment["experiment"]["id"]

    # Monitor during experiment
    import time
    for i in range(12):  # 60 seconds
        time.sleep(5)
        status = fis.get_experiment(
            id=exp_id)["experiment"]["state"]

        # Check system health
        health = check_system_health()
        if not health["healthy"]:
            prompt = f"""Chaos experiment running.
Experiment: {template_id}
System health: {json.dumps(health)}
Duration: {i * 5} seconds

The system is degraded. Recommend:
1. Should we stop the experiment?
2. What compensating action is needed?
3. Is this an expected degradation?

Return JSON with action recommendation."""

            decision = json.loads(
                invoke_bedrock(prompt))
            if decision.get("stop_experiment"):
                fis.stop_experiment(id=exp_id)
                execute_compensation(
                    decision["compensating_action"])
                break

    return get_experiment_report(exp_id)
VALIDATE: Run a CPU stress experiment and confirm the agent detects degradation, evaluates whether to stop, and triggers compensating actions when thresholds are breached.
PITFALL: Running chaos experiments without a kill switch risks prolonged outages. Always implement automatic experiment termination after 60 seconds of sustained degradation.
8Generate Post-Incident Reports
After any auto-remediation or chaos experiment, the agent generates a structured post-incident report and pushes it to Confluence.
python
def generate_post_incident(incident: dict):
    prompt = f"""Generate a post-incident report.

Incident: {json.dumps(incident, indent=2)}

Format:
# Post-Incident Report
## Summary
[1-2 sentences]
## Timeline
[Chronological events with timestamps]
## Root Cause
[Technical root cause analysis]
## Impact
[Users/services affected, duration]
## Resolution
[What fixed it, who was involved]
## Action Items
[Numbered list with owners and deadlines]
## Lessons Learned
[What went well, what didn't]"""

    report = invoke_bedrock(prompt)

    # Push to Confluence
    requests.post(
        f"{os.environ['CONFLUENCE_URL']}"
        f"/rest/api/content",
        headers={
            "Authorization":
                f"Bearer {os.environ['CONFLUENCE_TOKEN']}",
            "Content-Type": "application/json"},
        json={
            "type": "page",
            "title": f"Incident Report - "
                f"{incident['id']} - "
                f"{datetime.utcnow().strftime('%Y-%m-%d')}",
            "space": {"key": "INCIDENTS"},
            "body": {"storage": {
                "value": report,
                "representation": "wiki"
            }}
        })
    return report
VALIDATE: Trigger an incident, auto-remediate, and confirm a post-incident report appears in Confluence with all 7 sections populated.
PITFALL: Generating reports immediately after incident may miss follow-up actions. Schedule report generation 30 minutes after resolution to capture the full timeline.
9Build Cross-Cloud Cost Normalization
Normalize cost data across AWS, Azure, and GCP into a unified schema. Agent compares equivalent services across clouds and recommends the cheapest option.
python
def normalize_costs():
    # AWS Cost Explorer
    aws_costs = ce.get_cost_and_usage(...)

    # Azure Cost Management (via REST API)
    azure_costs = requests.get(
        f"https://management.azure.com/"
        f"subscriptions/{SUB_ID}/providers/"
        f"Microsoft.CostManagement/query",
        headers={"Authorization":
            f"Bearer {azure_token}"},
        json={"type": "ActualCost",
              "timeframe": "MonthToDate",
              "dataset": {"granularity": "Monthly",
                  "aggregation": {
                      "totalCost": {
                          "name": "Cost",
                          "function": "Sum"}
                  }}}).json()

    # Unified schema
    unified = {
        "period": "2025-03",
        "clouds": {
            "aws": {"total": aws_total,
                "by_service": aws_breakdown},
            "azure": {"total": azure_total,
                "by_service": azure_breakdown},
            "gcp": {"total": gcp_total,
                "by_service": gcp_breakdown}
        },
        "equivalent_services": [
            {"category": "compute",
             "aws": "EC2", "azure": "VMs",
             "gcp": "GCE",
             "cheapest": "calculate..."}
        ]
    }
    return unified
VALIDATE: Fetch costs from at least 2 clouds and confirm the normalized output uses the same units (USD/month) and maps equivalent services correctly.
PITFALL: Comparing list prices across clouds ignores negotiated discounts and commitments. Always use actual billed costs, not list prices, for cross-cloud comparison.
10Implement Reserved Instance Purchasing Agent
Agent analyzes 90 days of usage patterns, identifies stable workloads, and recommends RI purchases with break-even analysis.
python
def analyze_ri_opportunities():
    # Get RI recommendations from Cost Explorer
    ri_recs = ce.get_reservation_purchase_recommendation(
        Service="Amazon Elastic Compute Cloud - Compute",
        LookbackPeriodInDays="NINETY_DAYS",
        TermInYears="ONE_YEAR",
        PaymentOption="NO_UPFRONT")

    prompt = f"""Analyze these RI purchase opportunities:
{json.dumps(ri_recs["Recommendations"][:10], indent=2)}

For each recommendation:
1. Break-even point (months)
2. Monthly savings vs On-Demand
3. Annual savings
4. Risk assessment (what if usage drops?)
5. Recommendation: BUY/SKIP/WAIT

Also consider:
- Savings Plans as alternative to RIs
- Convertible vs Standard RIs
- Payment option trade-offs

Return structured JSON with BUY recommendations
sorted by annual savings descending."""

    return invoke_bedrock(prompt)
VALIDATE: Run the analysis and confirm at least one RI recommendation includes break-even month, annual savings, and risk assessment.
PITFALL: Recommending 3-year All Upfront RIs without usage stability analysis locks in spend on potentially declining workloads. Default to 1-year No Upfront for first-time RI buyers.
11Build Automated Scaling Policy Tuner
Agent analyzes historical scaling events, identifies over/under-provisioning patterns, and recommends adjusted scaling policies.
python
autoscaling = boto3.client("autoscaling")

def tune_scaling_policies(asg_name: str):
    # Get scaling history
    activities = autoscaling \
        .describe_scaling_activities(
            AutoScalingGroupName=asg_name,
            MaxRecords=100)

    # Get current policies
    policies = autoscaling \
        .describe_policies(
            AutoScalingGroupName=asg_name)

    # Get CloudWatch metrics for the ASG
    metrics = get_asg_metrics(asg_name, days=14)

    prompt = f"""Analyze this Auto Scaling Group.

ASG: {asg_name}
Current policies:
{json.dumps(policies["ScalingPolicies"], indent=2)}

Scaling events (last 100):
{json.dumps([{
    "time": str(a["StartTime"]),
    "cause": a["Cause"][:100],
    "status": a["StatusCode"]}
    for a in activities["Activities"][:20]],
    indent=2)}

14-day metrics summary:
{json.dumps(metrics)}

Recommend policy adjustments:
1. Optimal min/max/desired capacity
2. Scale-out threshold and cooldown
3. Scale-in threshold and cooldown
4. Predictive scaling opportunity
5. Estimated cost impact of changes"""

    return invoke_bedrock(prompt)
VALIDATE: Run the tuner on an ASG with at least 7 days of history. Confirm recommendations include specific threshold changes with estimated cost impact.
PITFALL: Reducing scale-in cooldown too aggressively causes instance flapping. Never recommend cooldown below 300 seconds without 30+ days of stable data.
12Deploy Operations Dashboard and Alerting
Unified ops dashboard: alarms/day, auto-remediation success rate, MTTR, cost savings from rightsizing, chaos experiment results, scaling efficiency.
bash
aws cloudwatch put-dashboard \
  --dashboard-name "AgentForge-InfraOps" \
  --dashboard-body '{
  "widgets": [
    {"type":"metric","properties":{
      "title":"Auto-Remediation Success Rate",
      "metrics":[
        ["AgentForge/Ops","RemediationSuccess"],
        ["AgentForge/Ops","RemediationFailed"]
      ]}},
    {"type":"metric","properties":{
      "title":"Mean Time to Remediate (min)",
      "metrics":[
        ["AgentForge/Ops","MTTR"]
      ],"stat":"Average"}},
    {"type":"metric","properties":{
      "title":"Monthly Cost Savings (USD)",
      "metrics":[
        ["AgentForge/Ops","CostSavings"]
      ],"period":2592000}},
    {"type":"metric","properties":{
      "title":"Chaos Experiment Pass Rate",
      "metrics":[
        ["AgentForge/Ops","ChaosPassRate"]
      ]}}
  ]}'

aws cloudwatch put-metric-alarm \
  --alarm-name "Ops-MTTR-SLA-Breach" \
  --metric-name "MTTR" \
  --namespace "AgentForge/Ops" \
  --statistic Average --period 3600 \
  --threshold 30 \
  --comparison-operator GreaterThanThreshold \
  --evaluation-periods 2 \
  --alarm-actions \
    "arn:aws:sns:us-east-1:ACCOUNT:OpsAlerts"
VALIDATE: Open the dashboard and confirm all widgets render. Trigger an MTTR SLA breach alarm and verify SNS notification arrives.
PITFALL: Tracking only success rate hides increasing MTTR. Always track both success rate AND mean time to remediate as primary SLIs.
Azure Implementation Path

Replace CloudWatch with Azure Monitor + Azure Automanage for infrastructure monitoring and auto-remediation. Alarms via Azure Monitor Alerts with dynamic thresholds. Event routing via Event GridAzure Functions. Runbook execution via Azure Automation Runbooks (PowerShell/Python). FinOps via Azure Cost Management + Azure Advisor for rightsizing. EKS equivalent: AKS with Azure Monitor for containers. Chaos engineering via Azure Chaos Studio. Post-incident reports via Azure DevOps Wiki.

Azure Monitor Azure Automanage Azure Automation Cost Management Azure Advisor Chaos Studio AKS
bash
az monitor metrics alert create \
  --name "CPU-Anomaly" \
  --resource-group agentforge-rg \
  --scopes "/subscriptions/SUB_ID/resourceGroups/agentforge-rg" \
  --condition "avg Percentage CPU > dynamic HighSensitivity" \
  --action "/subscriptions/SUB_ID/resourceGroups/agentforge-rg/providers/microsoft.insights/actionGroups/OpsTeam"

az advisor recommendation list \
  --category Cost \
  --output table
GCP Implementation Path

Replace CloudWatch with Cloud Monitoring + Cloud Operations Suite. Alarms via Cloud Alerting with MQL queries. Event routing via EventarcCloud Functions. Runbook execution via Cloud Workflows + OS Config for VM patching. FinOps via Cloud Billing API + Recommender for rightsizing. EKS equivalent: GKE with GKE AutoOps (auto-repair, auto-upgrade). Chaos engineering via Chaos Toolkit on GKE. Cost anomaly detection via Cloud Billing Budget Alerts.

Cloud Monitoring Cloud Alerting GKE AutoOps Cloud Billing Recommender Cloud Workflows Eventarc
bash
gcloud monitoring policies create \
  --policy-from-file=cpu-anomaly-policy.json

gcloud recommender recommendations list \
  --project=PROJECT \
  --recommender=google.compute.instance.MachineTypeRecommender \
  --location=us-central1 \
  --format="table(name, priority, content.overview)"

gcloud container clusters update agentforge-gke \
  --enable-autorepair --enable-autoupgrade \
  --location=us-central1
PRODUCTION CHECKLIST — NODE 07
Estimated Lab Time: 5–7 hours (Intermediate to Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 7.1
CloudWatch alarm → LLM-generated runbook executor
Create a Lambda that receives CloudWatch alarm payloads. Pass the alarm context to a Bedrock agent that selects and executes the right Systems Manager Automation document to remediate.
Intermediate Exercise 7.2
FinOps agent: cost anomaly detection + rightsizing
Pull Cost Explorer data daily. Build a Bedrock agent that identifies services with cost spikes >20% MoM, recommends rightsizing actions, and estimates monthly savings for each recommendation.
aws ce get-cost-and-usage \
  --time-period Start=2025-01-01,End=2025-02-01 \
  --granularity MONTHLY --metrics BlendedCost
Advanced Exercise 7.3
EKS self-healing: OOMKilled pod detection and resolution
Monitor EKS for OOMKilled events via a Prometheus + AlertManager webhook. Route to a Bedrock agent that reads the deployment manifest, calculates safe memory limits, patches the deployment, and confirms pod recovery.
Stretch Exercise 7.4
Chaos engineering agent: inject and auto-recover faults
Use AWS FIS to inject faults on a schedule. Build a companion Bedrock agent that monitors the system during each experiment, detects when recovery thresholds are missed, and triggers compensating actions autonomously.
08🌐
CROSS-CLOUD & AGNOSTIC PATTERNS
Framework-level orchestration patterns that operate across any cloud provider — portable agent architectures built on open standards that deploy equally well on AWS, Azure, or GCP.
LangChain LangGraph AutoGen CrewAI Self-Healing DevSecOps Bedrock (backend) Azure OpenAI (backend) Vertex AI (backend)
Architecture Pattern
MULTI-AGENT GRAPH
Supervisor node → route by intent [LangGraph]
Specialist agents: Research / Code / Write [CrewAI]
Shared state graph → conditional edges [state]
Any LLM backend via provider abstraction [agnostic]
Deploy: Docker → ECS / AKS / Cloud Run [portable]
Key Concepts
→ LangGraph state machines with typed nodes
→ CrewAI role-based agent teams
→ AutoGen conversation patterns
→ Provider abstraction via LiteLLM
→ Containerised deployment on any cloud
Implementation Steps
PREREQUISITES
  • Python 3.11+, pip install langchain langgraph crewai autogen-agentchat litellm
  • Docker Desktop installed and running
  • Terraform 1.6+ with AWS, Azure, and GCP provider credentials
  • Bedrock, Azure OpenAI, and Vertex AI API keys configured
  • GitHub repo for the multi-agent project
  • Container registry access (ECR, ACR, or GCR)
1Build a LangGraph StateGraph with Supervisor Routing
Create a typed state graph with a Supervisor node that analyzes user intent and routes to specialist sub-agents. Use conditional edges for dynamic routing based on the Supervisor decision.
python
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class AgentState(TypedDict):
    messages: list
    next_agent: str
    final_answer: str

def supervisor(state: AgentState) -> AgentState:
    last_msg = state["messages"][-1]["content"]
    # Route based on intent classification
    prompt = f"""Classify this request into one of:
- research: needs information retrieval
- code: needs code generation
- write: needs content writing
Route: {last_msg}
Return ONLY the category name."""

    route = llm.invoke(prompt).content.strip()
    return {**state, "next_agent": route}

def route_fn(state: AgentState) -> Literal[
        "research", "code", "write", "end"]:
    return state["next_agent"]

graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("research", research_agent)
graph.add_node("code", code_agent)
graph.add_node("write", write_agent)

graph.set_entry_point("supervisor")
graph.add_conditional_edges(
    "supervisor", route_fn,
    {"research": "research",
     "code": "code",
     "write": "write",
     "end": END})

# All agents return to supervisor for review
for agent in ["research", "code", "write"]:
    graph.add_edge(agent, "supervisor")

app = graph.compile()
VALIDATE: Send 5 mixed prompts (2 research, 2 code, 1 write) and confirm the Supervisor routes each to the correct specialist agent.
PITFALL: Not adding a loop guard causes infinite supervisor-agent cycles. Add a max_iterations counter (default: 5) and route to END when exceeded.
2Define Specialist Sub-Agents with Tools
Build each specialist agent with domain-specific tools. ResearchAgent uses RAG retrieval, CodeAgent has file I/O tools, WriterAgent has formatting tools.
python
from langchain.tools import tool
from langchain_community.chat_models import (
    ChatLiteLLM)

llm = ChatLiteLLM(model="bedrock/anthropic.claude-3-sonnet-20240229-v1:0")

@tool
def search_knowledge_base(query: str) -> str:
    """Search the knowledge base for information."""
    results = vector_store.similarity_search(
        query, k=5)
    return "\n\n".join(
        [r.page_content for r in results])

@tool
def generate_code(spec: str) -> str:
    """Generate code based on a specification."""
    resp = llm.invoke(
        f"Generate production Python code for: "
        f"{spec}\nInclude type hints and docstrings.")
    return resp.content

@tool
def write_document(outline: str) -> str:
    """Write a structured document from an outline."""
    resp = llm.invoke(
        f"Write a professional document: {outline}")
    return resp.content

def research_agent(state: AgentState):
    result = search_knowledge_base.invoke(
        state["messages"][-1]["content"])
    state["messages"].append(
        {"role": "assistant",
         "content": f"Research result: {result}"})
    return state
VALIDATE: Invoke each agent independently with a test prompt. Confirm ResearchAgent returns KB results, CodeAgent returns valid Python, WriterAgent returns formatted prose.
PITFALL: Giving all agents access to all tools causes tool confusion. Each specialist should only see its own domain-specific tools.
3Implement LiteLLM Provider Abstraction
Use LiteLLM as the provider abstraction layer. Configure fallback from Bedrock to Azure OpenAI to Vertex AI. This allows swapping backends without changing agent code.
python
import litellm

# Configure provider routing
litellm.set_verbose = False

PROVIDER_CONFIG = {
    "primary": {
        "model": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
        "timeout": 30,
        "max_retries": 2
    },
    "fallback_1": {
        "model": "azure/gpt-4",
        "api_key": os.environ["AZURE_API_KEY"],
        "api_base": os.environ["AZURE_ENDPOINT"],
        "timeout": 30
    },
    "fallback_2": {
        "model": "vertex_ai/gemini-pro",
        "timeout": 30
    }
}

def invoke_with_fallback(messages: list) -> str:
    for name, config in PROVIDER_CONFIG.items():
        try:
            resp = litellm.completion(
                model=config["model"],
                messages=messages,
                timeout=config.get("timeout", 30),
                max_retries=config.get(
                    "max_retries", 1))
            return resp.choices[0].message.content
        except Exception as e:
            print(f"{name} failed: {e}")
            continue
    raise RuntimeError("All providers failed")
VALIDATE: Deliberately block the primary provider (invalid key) and confirm the request falls through to fallback_1 within 30 seconds.
PITFALL: Not setting per-provider timeouts causes cascading failures. Set a 30-second timeout on each provider to ensure fast failover.
4Create a CrewAI Team with Role-Based Agents
Define a Product Manager, Market Researcher, Copywriter, and Technical Writer crew. Assign specific roles, goals, and backstories that constrain each agent’s behavior.
python
from crewai import Agent, Task, Crew, Process

pm = Agent(
    role="Product Manager",
    goal="Define product requirements and "
         "prioritize features based on market data",
    backstory="10 years PM experience at "
              "enterprise SaaS companies",
    llm=llm, verbose=True, max_iter=5)

researcher = Agent(
    role="Market Researcher",
    goal="Gather competitive intelligence and "
         "market sizing data",
    backstory="Former Gartner analyst specializing "
              "in AI/ML markets",
    llm=llm, verbose=True, max_iter=5)

copywriter = Agent(
    role="Copywriter",
    goal="Write compelling product narratives "
         "and marketing copy",
    backstory="Award-winning B2B SaaS copywriter",
    llm=llm, verbose=True, max_iter=3)

tech_writer = Agent(
    role="Technical Writer",
    goal="Create clear technical documentation "
         "and architecture guides",
    backstory="Senior tech writer with cloud "
              "architecture background",
    llm=llm, verbose=True, max_iter=3)

task = Task(
    description="Create a product launch brief "
        "for a new AI monitoring SaaS product",
    expected_output="Complete launch brief with "
        "market analysis, positioning, copy, "
        "and technical overview",
    agents=[pm, researcher, copywriter, tech_writer])

crew = Crew(
    agents=[pm, researcher, copywriter, tech_writer],
    tasks=[task],
    process=Process.sequential,
    verbose=True)

result = crew.kickoff()
VALIDATE: Run the crew and confirm: (1) all 4 agents contribute, (2) the final output contains sections from each agent, (3) total iterations < 20.
PITFALL: Not setting max_iter on agents causes runaway conversations. Always cap individual agent iterations (3-5) and total crew iterations (20).
5Build an AutoGen Group Chat
Create a SecurityAuditor + CodeFixer + DeploymentManager + HumanProxy group chat. Agents debate and collaborate on security fixes with a human approval gate.
python
from autogen import (
    AssistantAgent, UserProxyAgent, GroupChat,
    GroupChatManager)

config_list = [{
    "model": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
    "api_type": "litellm"
}]

security_auditor = AssistantAgent(
    name="SecurityAuditor",
    system_message="You are a security expert. "
        "Scan code for vulnerabilities, "
        "classify severity, and recommend fixes.",
    llm_config={"config_list": config_list})

code_fixer = AssistantAgent(
    name="CodeFixer",
    system_message="You fix security issues. "
        "Generate patched code with explanations. "
        "Always preserve existing functionality.",
    llm_config={"config_list": config_list})

deployer = AssistantAgent(
    name="DeploymentManager",
    system_message="You manage deployments. "
        "Rebuild Docker images, run tests, "
        "and deploy fixes.",
    llm_config={"config_list": config_list})

human = UserProxyAgent(
    name="HumanProxy",
    human_input_mode="TERMINATE",
    max_consecutive_auto_reply=0,
    code_execution_config=False)

groupchat = GroupChat(
    agents=[security_auditor, code_fixer,
            deployer, human],
    messages=[], max_round=12)

manager = GroupChatManager(
    groupchat=groupchat,
    llm_config={"config_list": config_list})

human.initiate_chat(manager,
    message="Scan app.py for vulnerabilities "
            "and fix any CRITICAL issues found.")
VALIDATE: Inject a known SQL injection vulnerability and confirm: SecurityAuditor finds it, CodeFixer generates a patch, DeploymentManager proposes a rebuild, HumanProxy gets approval prompt.
PITFALL: AutoGen group chats without max_round spiral into infinite loops. Always set max_round (8-12) and implement a termination condition.
6Containerize with Multi-Stage Docker Build
Write a production Dockerfile with multi-stage build. First stage installs dependencies, second stage copies only runtime artifacts. Keep final image under 500MB.
dockerfile
# Dockerfile
FROM python:3.11-slim AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir \
    --prefix=/install -r requirements.txt

FROM python:3.11-slim AS runtime
WORKDIR /app

# Copy only installed packages
COPY --from=builder /install /usr/local

# Copy application code
COPY src/ ./src/
COPY config/ ./config/

# Non-root user for security
RUN useradd -r -s /bin/false agentuser
USER agentuser

# Health check
HEALTHCHECK --interval=30s --timeout=5s \
    CMD python -c "import requests; \
    requests.get('http://localhost:8080/health')"

ENV PYTHONUNBUFFERED=1
EXPOSE 8080

CMD ["python", "-m", "src.main"]
VALIDATE: Build the image and confirm: (1) final image < 500MB, (2) runs as non-root user, (3) health check passes, (4) no dev dependencies included.
PITFALL: Using a single-stage build includes pip, compilers, and dev headers in the final image, inflating it to 2GB+. Always use multi-stage builds.
7Deploy to 3 Clouds with Terraform Modules
Write reusable Terraform modules for ECS (AWS), AKS (Azure), and Cloud Run (GCP). Deploy the same container image to all three clouds.
hcl
# modules/ecs/main.tf
resource "aws_ecs_service" "agent" {
  name            = "agentforge-agent"
  cluster         = aws_ecs_cluster.main.id
  task_definition = aws_ecs_task_definition.agent.arn
  desired_count   = 2
  launch_type     = "FARGATE"

  network_configuration {
    subnets         = var.private_subnets
    security_groups = [aws_security_group.agent.id]
  }
}

# modules/cloud-run/main.tf
resource "google_cloud_run_service" "agent" {
  name     = "agentforge-agent"
  location = var.region

  template {
    spec {
      containers {
        image = var.container_image
        resources {
          limits = {
            memory = "512Mi"
            cpu    = "1"
          }
        }
        env {
          name  = "LLM_PROVIDER"
          value = "vertex_ai"
        }
      }
    }
  }
}

# Deploy all three:
# terraform apply -target=module.ecs
# terraform apply -target=module.aks
# terraform apply -target=module.cloud_run
VALIDATE: Deploy to all 3 clouds and confirm the health check endpoint returns 200 on each. Verify the container runs the same image SHA across all deployments.
PITFALL: Hard-coding cloud-specific configs in the container image breaks portability. Always pass cloud-specific settings (LLM_PROVIDER, API endpoints) via environment variables.
8Route Test Prompts Across Providers
Build a test harness that sends identical prompts to all 3 LLM backends. Compare latency, cost, and response quality metrics to identify the optimal provider for each use case.
python
import time, json

PROVIDERS = {
    "bedrock": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
    "azure": "azure/gpt-4",
    "vertex": "vertex_ai/gemini-pro"
}

TEST_PROMPTS = [
    "Summarize the key features of Kubernetes",
    "Write a Python function to parse JSON logs",
    "Explain the CAP theorem in 3 sentences",
]

def benchmark_providers():
    results = []
    for prompt in TEST_PROMPTS:
        for name, model in PROVIDERS.items():
            start = time.time()
            try:
                resp = litellm.completion(
                    model=model,
                    messages=[{"role": "user",
                               "content": prompt}],
                    max_tokens=500)
                latency = (time.time() - start) * 1000
                results.append({
                    "provider": name,
                    "prompt": prompt[:50],
                    "latency_ms": round(latency),
                    "tokens_in": resp.usage.prompt_tokens,
                    "tokens_out": resp.usage.completion_tokens,
                    "cost": litellm.completion_cost(
                        completion_response=resp),
                    "success": True
                })
            except Exception as e:
                results.append({
                    "provider": name,
                    "prompt": prompt[:50],
                    "error": str(e),
                    "success": False
                })
    return results
VALIDATE: Run the benchmark with 10 prompts across 3 providers. Generate a comparison table showing p50/p95 latency, cost per 1K tokens, and success rate.
PITFALL: Benchmarking on a single prompt type biases results. Include at least 3 categories (summarization, code generation, Q&A) for realistic comparison.
9Implement Shared State and Memory Across Agents
Build a shared memory layer using Redis that all agents can read/write. This enables agents to share context, avoid duplicate work, and build on each other’s outputs.
python
import redis, json
from datetime import datetime

r = redis.Redis(
    host=os.environ.get("REDIS_HOST", "localhost"),
    port=6379, decode_responses=True)

class SharedMemory:
    def __init__(self, session_id: str):
        self.session_id = session_id
        self.prefix = f"agent:{session_id}"

    def write(self, agent_name: str,
              key: str, value: any):
        r.hset(f"{self.prefix}:{agent_name}",
            key, json.dumps(value))
        r.expire(f"{self.prefix}:{agent_name}",
            3600)  # 1 hour TTL

    def read(self, agent_name: str,
             key: str) -> any:
        val = r.hget(
            f"{self.prefix}:{agent_name}", key)
        return json.loads(val) if val else None

    def read_all_agents(self) -> dict:
        """Read state from all agents"""
        keys = r.keys(f"{self.prefix}:*")
        state = {}
        for k in keys:
            agent = k.split(":")[-1]
            state[agent] = {
                f: json.loads(v)
                for f, v in r.hgetall(k).items()}
        return state

# Usage in agent:
mem = SharedMemory("session-123")
mem.write("researcher", "findings",
    {"topic": "AI monitoring", "key_points": [...]})
# Other agents can read:
findings = mem.read("researcher", "findings")
VALIDATE: Have Agent A write to shared memory and Agent B read it. Confirm data integrity and TTL expiration after 1 hour.
PITFALL: Using in-memory Python dicts for shared state breaks in distributed deployments. Always use an external store (Redis, DynamoDB) for multi-agent state.
10Add Observability Across All Agents
Instrument every agent invocation with OpenTelemetry traces. Track: agent name, tool calls, LLM provider, tokens, latency, and cost per invocation.
python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    SimpleSpanProcessor)
from opentelemetry.exporter.otlp.proto.grpc \
    .trace_exporter import OTLPSpanExporter

provider = TracerProvider()
provider.add_span_processor(
    SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agentforge")

def traced_agent_call(agent_name, func):
    def wrapper(*args, **kwargs):
        with tracer.start_as_current_span(
                f"agent.{agent_name}") as span:
            span.set_attribute(
                "agent.name", agent_name)
            span.set_attribute(
                "agent.provider",
                get_current_provider())
            start = time.time()
            result = func(*args, **kwargs)
            span.set_attribute(
                "agent.latency_ms",
                (time.time() - start) * 1000)
            span.set_attribute(
                "agent.tokens",
                result.get("tokens_used", 0))
            span.set_attribute(
                "agent.cost_usd",
                result.get("cost", 0))
            return result
    return wrapper
VALIDATE: Run a multi-agent workflow and confirm OpenTelemetry traces show the full call graph with correct parent-child span relationships.
PITFALL: Tracing only the top-level request misses inter-agent communication. Propagate trace context through shared memory so all agent spans link to the same trace.
11Build a Cross-Cloud Deployment Pipeline
CI/CD pipeline that builds once, pushes to 3 registries (ECR, ACR, GCR), and deploys to all 3 clouds with Terraform.
yaml
# .github/workflows/deploy-multicloud.yml
name: Multi-Cloud Deploy
on:
  push:
    branches: [main]

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Build Docker image
        run: |
          docker build -t agentforge-agent:$GITHUB_SHA .
      - name: Push to ECR
        run: |
          aws ecr get-login-password | docker login \
            --username AWS --password-stdin $ECR_URI
          docker tag agentforge-agent:$GITHUB_SHA \
            $ECR_URI:$GITHUB_SHA
          docker push $ECR_URI:$GITHUB_SHA
      - name: Push to GCR
        run: |
          gcloud auth configure-docker
          docker tag agentforge-agent:$GITHUB_SHA \
            gcr.io/$GCP_PROJECT/agentforge:$GITHUB_SHA
          docker push gcr.io/$GCP_PROJECT/agentforge:$GITHUB_SHA

  deploy:
    needs: build
    strategy:
      matrix:
        cloud: [aws, azure, gcp]
    runs-on: ubuntu-latest
    steps:
      - uses: hashicorp/setup-terraform@v3
      - run: |
          cd infrastructure/terraform/${{ matrix.cloud }}
          terraform init
          terraform apply -auto-approve \
            -var="image_tag=$GITHUB_SHA"
VALIDATE: Push a commit and confirm the pipeline: (1) builds one image, (2) pushes to 3 registries, (3) deploys to all 3 clouds with the same SHA.
PITFALL: Using -auto-approve in production without a plan review step is dangerous. Add a manual approval gate for production deployments.
12Implement Cost and Quality Comparison Dashboard
Build a unified dashboard comparing all 3 clouds: latency, cost per query, quality scores, uptime, and error rates. Use this data to optimize provider routing.
python
# Emit cross-cloud metrics
def emit_comparison_metrics(results: list):
    for r in results:
        cw.put_metric_data(
            Namespace="AgentForge/CrossCloud",
            MetricData=[
                {"MetricName": "Latency",
                 "Value": r["latency_ms"],
                 "Unit": "Milliseconds",
                 "Dimensions": [
                     {"Name": "Provider",
                      "Value": r["provider"]}]},
                {"MetricName": "CostPerQuery",
                 "Value": r["cost"],
                 "Unit": "None",
                 "Dimensions": [
                     {"Name": "Provider",
                      "Value": r["provider"]}]},
                {"MetricName": "SuccessRate",
                 "Value": 1 if r["success"] else 0,
                 "Unit": "None",
                 "Dimensions": [
                     {"Name": "Provider",
                      "Value": r["provider"]}]}
            ])

# Smart routing based on metrics
def smart_route(task_type: str) -> str:
    """Route to cheapest provider meeting SLA"""
    for provider in ["bedrock", "azure", "vertex"]:
        p95 = get_p95_latency(provider)
        success = get_success_rate(provider)
        if p95 < 5000 and success > 0.99:
            return provider
    return "bedrock"  # default fallback
VALIDATE: Run 100 queries across 3 providers and confirm the dashboard shows per-provider latency, cost, and success rate. Verify smart routing selects the optimal provider.
PITFALL: Routing purely on cost ignores quality differences between providers. Always include quality score (evaluated on a test set) as a routing factor alongside cost and latency.
Azure-Specific Deployment

Deploy the containerized multi-agent system to Azure Kubernetes Service (AKS) or Azure Container Apps (serverless). LLM backend via Azure OpenAI Service with managed GPT-4 deployments. Vector storage via Azure AI Search. Shared state via Azure Cache for Redis. Observability via Azure Monitor Application Insights with distributed tracing. CI/CD via Azure PipelinesACRAKS.

AKS Container Apps Azure OpenAI Azure Cache for Redis Application Insights ACR
hcl
resource "azurerm_kubernetes_cluster" "agent" {
  name                = "agentforge-aks"
  location            = var.location
  resource_group_name = var.resource_group
  dns_prefix          = "agentforge"

  default_node_pool {
    name       = "default"
    node_count = 2
    vm_size    = "Standard_D2s_v3"
  }
  identity { type = "SystemAssigned" }
}
GCP-Specific Deployment

Deploy to Cloud Run (serverless, scales to zero) or GKE Autopilot (managed Kubernetes). LLM backend via Vertex AI with Gemini Pro model endpoints. Vector storage via Vertex AI Vector Search (formerly Matching Engine). Shared state via Memorystore for Redis. Observability via Cloud Trace + Cloud Monitoring. CI/CD via Cloud BuildArtifact RegistryCloud Run.

Cloud Run GKE Autopilot Vertex AI Memorystore Redis Cloud Trace Artifact Registry
hcl
resource "google_cloud_run_v2_service" "agent" {
  name     = "agentforge-agent"
  location = var.region

  template {
    containers {
      image = "${var.artifact_registry}/agentforge:${var.image_tag}"
      resources {
        limits = { memory = "512Mi", cpu = "1" }
      }
      env { name = "LLM_PROVIDER" value = "vertex_ai" }
    }
    scaling { min_instance_count = 0  max_instance_count = 10 }
  }
}
PRODUCTION CHECKLIST — NODE 08
Estimated Lab Time: 5–7 hours (Intermediate to Advanced)
Reference Docs & Node Links
Lab Exercises
Beginner Exercise 8.1
LangGraph supervisor routing three specialist agents
Build a LangGraph graph with a Supervisor node that routes to one of three sub-agents: ResearchAgent (RAG), CodeAgent (code generation), WriterAgent (copywriting). Test with 5 mixed prompts.
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor_fn)
graph.add_conditional_edges("supervisor", route_fn)
Intermediate Exercise 8.2
CrewAI product launch team: 4 agents, one goal
Define a Product Manager, Market Researcher, Copywriter, and Technical Writer crew. Give them a single goal: "Launch brief for a new AI monitoring SaaS." Observe inter-agent communication and final deliverable.
Advanced Exercise 8.3
Cross-cloud agent: same code, three LLM backends
Build one agent using LiteLLM as the provider abstraction. Deploy to three clouds using Docker + Terraform. Route 100 test prompts: 33 to Bedrock, 33 to Azure OpenAI, 34 to Vertex AI. Compare cost and latency.
Stretch Exercise 8.4
Self-healing DevSecOps pipeline with AutoGen
Build an AutoGen group chat: SecurityAuditor, CodeFixer, DeploymentManager, and HumanProxy agents. Inject a CVE into a Docker image. Watch the agents debate, fix, rebuild, and re-scan — with human approval gate.
METHODOLOGY
HOW AGENTFORGE WORKS
01
Concept First
Every lab starts with a one-paragraph mental model. You understand why before you touch a line of code. No cargo-culting patterns you can't explain.
02
Build From Scratch
Start with raw SDKs and cloud APIs. Only reach for frameworks once you can implement the primitive. This ensures you know what the abstraction is hiding.
03
Observe Everything
Structured JSON logging on every invocation, tool call, memory read, and cost event. If it isn't logged it doesn't count. Production systems must be observable.
04
Deploy With Terraform
Every lab has a matching Terraform module. Network, security, and application layers are always separate. Infrastructure is code and code is reviewed.
05
Progressive Difficulty
Four exercises per category: Beginner → Intermediate → Advanced → Stretch. You can stop at any tier. The Stretch exercises are genuinely hard and will teach you the most.
06
Cloud-Agnostic Core
AgentBase works with any Bedrock-compatible model. Swap the client for any cloud provider. The patterns transfer. Vendor lock-in is architectural, not mandatory.
PROJECT STRUCTURE
GITHUB REPOSITORY LAYOUT
agentforge/ ├── core/ │ ├── agent_base.py # shared ReAct loop, tool registry, cost tracking │ └── __init__.py ├── labs/ │ ├── lab_01_first_contact/ │ │ ├── agent.py # raw Bedrock Converse API, 3 tools, ReAct loop │ │ └── requirements.txt │ ├── lab_02_persistent_mind/ │ │ ├── agent.py # STM ring buffer + DynamoDB LTM + vector semantic │ │ └── requirements.txt │ ├── lab_03_knowledge_vault/ │ │ ├── agent.py # RAG pipeline: Titan Embeddings + Chroma + re-rank │ │ └── requirements.txt │ ├── lab_04_the_collective/ │ │ ├── agent.py # multi-agent: Supervisor + Research + Code + Write │ │ └── requirements.txt │ └── lab_05_mission_control/ │ ├── agent.py # full production agent: streaming + eval pipeline │ └── requirements.txt ├── infrastructure/ │ ├── terraform/ │ │ ├── modules/ │ │ │ ├── runtime/ # Lambda + API Gateway + IAM │ │ │ ├── datastore/ # DynamoDB + S3 + KMS + OpenSearch │ │ │ └── gateway/ # VPC + subnets + NAT + VPC endpoints │ │ └── environments/ │ │ ├── dev/ # main.tf, variables.tf, outputs.tf │ │ └── prod/ │ └── docker/ │ └── Dockerfile # containerised agent runtime ├── tests/ │ ├── test_agent_base.py │ └── test_memory.py ├── portal/ │ └── index.html # this file — deployable to GitHub Pages ├── .github/workflows/ │ └── deploy.yml # lint → test → terraform plan → apply ├── .env.example ├── README.md └── LICENSE
15 PRODUCTION BEST PRACTICES
01
Model Selection
Always benchmark candidate models against your specific use case before production. Start with the smallest capable model — e.g., Claude Haiku or GPT-4o-mini for classification tasks, larger models only when quality demands it. Track cost-per-invocation across models using a standard evaluation harness. Document model selection rationale and re-evaluate quarterly as new models release.
02
Prompt Engineering
Structure every prompt with explicit system roles, task boundaries, and output format constraints. Use few-shot examples (3-5) for consistent formatting. Apply chain-of-thought reasoning for multi-step tasks. Version-control your prompts alongside code — prompt drift causes silent regressions. Test prompts against adversarial inputs and edge cases before deployment.
03
RAG Architecture
Use hybrid search combining dense vector retrieval (FAISS, OpenSearch) with sparse keyword search (BM25) for best recall. Chunk documents at 512 tokens with 50-token overlap to preserve context boundaries. Apply a cross-encoder re-ranker (e.g., ms-marco-MiniLM) on retrieved chunks before feeding to the LLM. Monitor retrieval relevance with NDCG@10 and adjust chunk strategy based on document types.
04
Memory Management
Implement three memory tiers: (1) Short-term memory — a sliding window or ring buffer of recent messages for conversation continuity; (2) Long-term memory — DynamoDB or AgentCore Memory Store for persistent facts, user preferences, and cross-session context; (3) Semantic memory — vector-indexed memories for relevance-based retrieval. Apply TTL policies to prevent unbounded growth. Use namespace isolation per user and session.
05
Tool Design
Define every tool with typed JSON Schema parameters and clear docstrings — the LLM uses these to decide when and how to call tools. Validate tool outputs before returning to the agent loop. Implement loop guards (max 10 iterations) to prevent infinite tool-calling cycles. Keep tools atomic and composable — one tool, one responsibility. Log every tool invocation with input/output for debugging.
06
Evaluation
Build a ground-truth dataset of at least 50 question-answer pairs for your domain. Use RAGAS metrics (faithfulness, answer relevance, context precision, context recall) for RAG pipelines. Run evaluations on every prompt template change and model update. Set up drift alerts — if faithfulness drops below 0.8, trigger a review. Automate evaluation in CI/CD with pass/fail thresholds.
07
Safety Guardrails
Deploy input guardrails: detect and redact PII (names, emails, SSNs) before sending to the LLM. Apply content filters on both input and output — block harmful, toxic, or off-topic content. Implement prompt injection detection using pattern matching and classifier models. Use Bedrock Guardrails or custom Lambda filters. Log all blocked requests for security review. Test with red-team prompts monthly.
08
Cost Optimization
Cache frequent queries and their responses (semantic cache with similarity threshold). Set per-request token budgets — truncate context rather than exceeding limits. Use DynamoDB PAY_PER_REQUEST billing for unpredictable workloads, provisioned capacity for steady-state. Batch embedding requests where possible. Track cost per conversation and per tool call. Set CloudWatch billing alarms at 80% of monthly budget.
09
Latency
Enable streaming responses for user-facing agents — perceived latency drops dramatically. Use async I/O for parallel tool calls when tools are independent. Cache embeddings for frequently queried documents. Keep Lambda functions warm with provisioned concurrency for sub-second cold starts. Profile your agent loop: target <2s for simple queries, <8s for multi-tool chains. Pre-compute embeddings at ingestion time, not query time.
10
Observability
Emit structured JSON logs on every event: invocation start, tool call, memory read/write, LLM request/response, and cost. Instrument with AWS X-Ray for distributed tracing across Lambda, DynamoDB, and Bedrock calls. Set CloudWatch alarms on error rate (>1%), p99 latency (>10s), and cost anomalies. Build dashboards showing invocations/day, tool usage distribution, and token consumption trends. If it isn't logged, it doesn't exist in production.
11
Security
Encrypt all data at rest with KMS customer-managed keys (CMK). Use VPC endpoints for Bedrock, DynamoDB, and S3 — no traffic over the public internet. Enable CloudTrail for API audit logging. Apply least-privilege IAM policies — each Lambda gets only the permissions it needs. Rotate API keys and secrets via Secrets Manager with automatic rotation. Scan dependencies for CVEs in CI/CD. Review IAM policies quarterly.
12
Scalable Deployment
Use Terraform modules with separate network, security, and application layers. Deploy across multiple Availability Zones for resilience. Configure auto-scaling based on invocation concurrency, not CPU. Use blue-green or canary deployments for zero-downtime updates. Pin model versions in production — never auto-upgrade. Store Terraform state in S3 with DynamoDB locking. Tag all resources with cost-center, environment, and owner.
13
Responsible AI
Test for bias across demographic groups using fairness metrics (equalized odds, demographic parity). Provide explainability — log the reasoning chain and retrieved context for every response. Maintain a model card documenting intended use, limitations, and known biases. Conduct compliance audits against your organization's AI governance framework. Enable user feedback mechanisms and review flagged responses weekly.
14
Data Management
Enable Point-in-Time Recovery (PITR) on all DynamoDB tables. Version your knowledge base documents in S3 with bucket versioning. Apply TTL on conversation records (e.g., 90 days for chat history, indefinite for user preferences). Implement data lineage tracking — know exactly which document chunks contributed to each response. Use S3 Lifecycle policies to transition old data to Glacier. Document your data retention policy and enforce it in code.
15
Human-in-the-Loop
Define escalation tiers: confidence <0.7 triggers soft escalation (flag for review), confidence <0.4 triggers hard escalation (route to human immediately). Implement approval gates for high-stakes actions — financial transactions, account changes, data deletion. Build a feedback loop: humans rate agent responses, ratings feed back into evaluation datasets. Track escalation rate as a key metric — aim for <5% in steady state.
WHY / WHAT / HOW / WHEN
CRITICAL USE CASES FOR AI AGENTS IN PRODUCTION
WHY
Why AI Agents Are Critically Needed
Enterprise complexity exceeds human bandwidth. Modern organizations handle thousands of customer queries, security events, data pipelines, and operational tasks simultaneously. Rule-based automation breaks at scale because real-world inputs are messy, ambiguous, and context-dependent.

LLMs alone are not enough. A language model can generate text but cannot take action — it cannot query a database, file a ticket, trigger a deployment, or remember a previous conversation. Agents bridge this gap by combining LLM reasoning with tool use, memory, and autonomous decision-making.

The cost of inaction is measurable. Manual triage of security alerts takes 30-45 minutes per incident. Customer support without intelligent routing leads to 3-5x longer resolution times. Data teams spend 60% of their time on repetitive query reformulation. AI agents eliminate these bottlenecks by reasoning about context and acting autonomously within defined guardrails.
WHAT
What AI Agents Actually Do
Conversational Assistants — Handle customer inquiries with context-aware responses, escalating to humans only when confidence is low. Reduce support ticket volume by 40-60%.

Autonomous Task Agents — Execute multi-step workflows: classify documents, extract metadata, route to the correct system, and verify completion. Replace brittle ETL pipelines with adaptive reasoning.

Security & Compliance Agents — Monitor GuardDuty findings, correlate with CloudTrail logs, assess severity, and trigger automated remediation playbooks. Response time drops from hours to seconds.

Data & Analytics Agents — Translate natural language questions into SQL/API queries, retrieve results, generate visualizations, and narrate insights. Democratize data access across the organization.

DevOps & SRE Agents — Scan code for vulnerabilities, generate patches, create PRs, monitor deployments, and auto-remediate infrastructure drift. Shift security left without slowing development.
HOW
How to Build Production-Grade Agents
1. Define the agent's scope. An agent that tries to do everything does nothing well. Start with one high-value workflow — e.g., "answer FAQ questions from our knowledge base."

2. Build the reasoning core. Use a ReAct loop (Reason → Act → Observe) powered by LangGraph or a similar framework. The agent decides which tool to call based on the user's intent and accumulated context.

3. Design typed tools. Each tool is a function with a JSON Schema interface. Tools are atomic (one action each), validated (check outputs before returning), and guarded (max iteration limits prevent infinite loops).

4. Add retrieval (RAG). Embed your knowledge base into a vector store (FAISS, OpenSearch, Pinecone). The agent retrieves relevant context before generating a response — grounding it in facts, not hallucinations.

5. Integrate memory. Use AgentCore Memory for session persistence (checkpointer) and cross-session knowledge (store). Middleware hooks inject context before the LLM and save responses after.

6. Deploy with guardrails. Wrap the agent in AgentCore Runtime for managed scaling, observability, and safety. Add input/output filters, cost budgets, and human escalation thresholds.
WHEN
When to Deploy AI Agents (and When Not To)
Deploy agents when:
✓ The task requires reasoning over unstructured inputs (natural language, documents, images)
✓ The workflow involves multiple conditional steps that change based on context
✓ Human experts spend >30% of their time on repetitive triage, classification, or routing
✓ The domain has a well-defined knowledge base that can ground the agent's responses
✓ You can define clear success metrics (accuracy, resolution time, escalation rate)
✓ There is a human-in-the-loop fallback for high-stakes decisions

Do NOT deploy agents when:
✗ A simple rule or regex can solve the problem reliably (don't over-engineer)
✗ The task requires guaranteed deterministic output (use traditional code instead)
✗ There is no ground-truth dataset to evaluate quality
✗ The cost of a wrong answer exceeds the cost of human processing
✗ Regulatory requirements prohibit automated decision-making in the domain
✗ You lack observability infrastructure to monitor agent behavior in production
HANDS-ON IMPLEMENTATION
STEP-BY-STEP QUICK START
01
Environment Setup
Install Python 3.13+, uv package manager, and AWS CLI v2. Configure AWS credentials with aws configure. Clone the repository and run uv sync to install all dependencies. Create a .env file with your GROQ_API_KEY and optional HF_API_KEY.
02
Run Locally First
Start with python 00_langgraph_agent.py to validate your environment. This runs the LangGraph agent locally with FAISS vector search over the FAQ dataset. Verify you get a coherent response about "roaming activation" before proceeding to cloud deployment.
03
Configure AgentCore
Run agentcore configure -e 01_agentcore_runtime.py to auto-generate bedrock_agentcore.yaml. This YAML defines your entrypoint handler, tool schemas, and runtime settings. Review the generated config — understand what each field controls before launching.
04
Deploy & Invoke
Launch with agentcore launch --env GROQ_API_KEY=your_key. Test with agentcore invoke '{"prompt": "..."}'. Try different queries to exercise all three tools: search_faq, search_detailed_faq, and reformulate_query. Observe the ReAct reasoning chain in the logs.
05
Add Memory
Switch to 02_agentcore_memory.py — configure and deploy as before. Test session continuity by sending multiple messages with the same actor_id and thread_id. Verify the agent remembers context from earlier in the conversation. Test cross-session preference retrieval with a new thread_id.
06
Extend & Customize
Add your own tools (API calls, database queries, calculations). Swap the Lauki FAQ dataset with your own domain knowledge base. Experiment with different embedding models and chunk sizes. Modify the system prompt to match your use case. Build evaluation datasets and measure quality with RAGAS metrics.
RESOURCES & REFERENCE LINKS
AWS
AWS Documentation & Guides
Amazon Bedrock AgentCore — Product overview and features
AgentCore Developer Guide — Official toolkit documentation
AgentCore Code Samples — Official AWS sample repository
Amazon Bedrock User Guide — Foundation models and APIs
Bedrock Knowledge Bases — Managed RAG service
Bedrock Guardrails — Content filtering and safety
AZ
Azure AI & Agent Resources
Azure OpenAI Service — GPT and embedding model hosting
Azure AI Search — Vector and hybrid search service
Azure AI Foundry — End-to-end AI development platform
Semantic Kernel — Microsoft's AI orchestration SDK
Azure Cosmos DB — Multi-model database for agent memory
GCP
Google Cloud AI Resources
Vertex AI Documentation — Google's ML and AI platform
Vertex AI Agent Builder — Build and deploy AI agents
Gemini API Reference — Google's multimodal AI models
BigQuery Documentation — Analytics and data warehouse
Firestore Documentation — Serverless document database
FW
Frameworks & Libraries
LangGraph — Graph-based agent orchestration framework
LangChain — LLM application development framework
CrewAI — Multi-agent role-playing framework
AutoGen — Microsoft's multi-agent conversation framework
FAISS — Facebook's efficient similarity search library
Sentence Transformers — State-of-the-art text embedding models
Groq API Docs — Ultra-fast LLM inference platform
IaC
Infrastructure & DevOps
Terraform Documentation — Infrastructure as Code
Docker Documentation — Containerisation platform
GitHub Actions — CI/CD workflow automation
AWS X-Ray — Distributed tracing and observability
uv Package Manager — Ultra-fast Python dependency management
EDU
Learning & Community
Build With AgentCore Challenge — AWS community challenge
AgentForge GitHub Repository — Source code and examples
RAGAS Documentation — RAG evaluation framework
DeepLearning.AI Short Courses — Free AI agent courses
ReAct Paper (Yao et al.) — The foundational ReAct reasoning pattern