FORGE
- AWS CLI v2 configured with AdministratorAccess (scope down before production)
- Python 3.11+, boto3, aws-cdk-lib installed
- S3 bucket with at least 20 FAQ documents (PDF or TXT)
- DynamoDB table: AgentForgeSessions (PK: sessionId, SK: timestamp)
- Bedrock model access enabled: Claude 3 Sonnet in your region
# taxonomy.json
{
"intents": [
{
"name": "CheckOrderStatus",
"slots": ["orderId", "email"],
"confidence_threshold": 0.75,
"fallback_action": "escalate_human"
},
{
"name": "ActivateRoaming",
"slots": ["destination_country", "plan_type"],
"confidence_threshold": 0.80,
"fallback_action": "send_guide_link"
}
]
}aws lexv2-models create-bot \
--bot-name "AgentForgeCopilot" \
--description "Enterprise FAQ and workflow copilot" \
--role-arn "arn:aws:iam::ACCOUNT_ID:role/LexBotRole" \
--data-privacy '{"childDirected": false}' \
--idle-session-ttl-in-seconds 900
aws lexv2-models create-bot-locale \
--bot-id $BOT_ID --bot-version "DRAFT" \
--locale-id "en_US" \
--nlu-intent-confidence-threshold 0.70 \
--voice-settings '{"voiceId":"Joanna","engine":"neural"}'aws lexv2-models describe-bot --bot-id $BOT_ID and confirm status is "Available."{
"slotTypeName": "ProductType",
"valueSelectionSetting": {
"resolutionStrategy": "OriginalValue"
},
"slotTypeValues": [
{"sampleValue": {"value": "enterprise"},
"synonyms": [{"value": "business"}]},
{"sampleValue": {"value": "starter"},
"synonyms": [{"value": "free tier"}]},
{"sampleValue": {"value": "professional"}}
]
}
# Then create:
aws lexv2-models create-slot-type \
--bot-id $BOT_ID --bot-version "DRAFT" \
--locale-id "en_US" \
--cli-input-json file://slot_type_product.jsonaws lexv2-models list-slot-types --bot-id $BOT_ID --bot-version DRAFT --locale-id en_US and confirm all types appear.import boto3, json, os
bedrock = boto3.client("bedrock-runtime",
region_name=os.environ["AWS_REGION"])
dynamodb = boto3.resource("dynamodb")
sessions = dynamodb.Table(os.environ["SESSIONS_TABLE"])
INTENT_HANDLERS = {
"CheckOrderStatus": handle_order_status,
"ActivateRoaming": handle_roaming,
"FallbackIntent": handle_escalation,
}
def lambda_handler(event, context):
intent = event["sessionState"]["intent"]["name"]
slots = event["sessionState"]["intent"]["slots"]
sid = event["sessionId"]
conf = event["sessionState"]["intent"] \
.get("nluConfidence", {}).get("score", 0)
sessions.put_item(Item={
"sessionId": sid,
"timestamp": int(context.get_remaining_time_in_millis()),
"intent": intent,
"slots": json.dumps(slots),
"confidence": str(conf)
})
handler = INTENT_HANDLERS.get(intent, handle_escalation)
return handler(slots, sid, event)aws lambda invoke --function-name AgentForgeLex --payload file://test_event.json out.json. Confirm valid Lex V2 dialogAction structure.def rag_query(query_text: str, top_k: int = 5) -> str:
embed_resp = bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=json.dumps({"inputText": query_text})
)
vector = json.loads(
embed_resp["body"].read())["embedding"]
search_body = {
"size": top_k,
"query": {"knn": {
"embedding": {"vector": vector, "k": top_k}
}},
"_source": ["text", "source", "chunk_id"]
}
resp = requests.post(
f"{os.environ['OPENSEARCH_ENDPOINT']}"
f"/knowledge-index/_search",
headers={"Content-Type": "application/json"},
json=search_body)
hits = resp.json()["hits"]["hits"]
return "\n\n".join(
[h["_source"]["text"] for h in hits])def get_session_history(session_id, window=10):
resp = sessions.query(
KeyConditionExpression="sessionId = :sid",
ExpressionAttributeValues={":sid": session_id},
ScanIndexForward=False, Limit=window)
turns = list(reversed(resp["Items"]))
return [{"role": t["role"],
"content": t["content"]} for t in turns]
def build_prompt(history, context, user_msg):
messages = history.copy()
messages.append({"role": "user", "content":
f"Context from knowledge base:\n{context}"
f"\n\nUser question: {user_msg}"})
return messagescomprehend = boto3.client("comprehend")
def analyze_sentiment(text: str) -> dict:
resp = comprehend.detect_sentiment(
Text=text, LanguageCode="en")
return {
"sentiment": resp["Sentiment"],
"score": resp["SentimentScore"]["Negative"]
}
def should_escalate(session_id, current_score):
history = get_session_history(session_id, window=2)
neg = sum(1 for t in history
if float(t.get("negative_score", 0)) > 0.6)
return current_score > 0.6 and neg >= 1CHANNEL_SCHEMAS = {
"slack": lambda e: {
"text": e["event"]["text"],
"userId": e["event"]["user"]},
"connect": lambda e: {
"text": e["Details"]["ContactData"]
["Attributes"]["query"],
"userId": e["Details"]["ContactData"]
["ContactId"]},
"web": lambda e: {
"text": e["body"]["message"],
"userId": e["body"]["userId"]},
}
def normalize_event(raw_event, channel):
parser = CHANNEL_SCHEMAS.get(channel)
if not parser:
raise ValueError(f"Unknown channel: {channel}")
canonical = parser(raw_event)
canonical["channel"] = channel
return canonicalconnect_client = boto3.client("connect")
def escalate_to_human(session_id, summary,
contact_flow_id):
resp = connect_client.start_task_contact(
InstanceId=os.environ["CONNECT_INSTANCE_ID"],
ContactFlowId=contact_flow_id,
Name=f"Bot Escalation - {session_id[:8]}",
Description=summary,
Attributes={
"sessionId": session_id,
"escalation_reason": "sentiment_negative",
"bot_summary": summary[:4000]
},
TaskTemplateId=os.environ["TASK_TEMPLATE_ID"])
return resp["ContactId"]guardrail = bedrock_client.create_guardrail(
name="AgentForgeCopilotGuardrail",
topicPolicyConfig={"topicsConfig": [
{"name": "LegalAdvice",
"definition": "Advice about lawsuits or liability",
"examples": ["Can I sue them?"],
"type": "DENY"},
{"name": "CompetitorMention",
"definition": "References to competing products",
"type": "DENY"}
]},
sensitiveInformationPolicyConfig={
"piiEntitiesConfig": [
{"type": "EMAIL", "action": "ANONYMIZE"},
{"type": "PHONE", "action": "ANONYMIZE"},
{"type": "NAME", "action": "ANONYMIZE"}
]},
contentPolicyConfig={"filtersConfig": [
{"type": "HATE",
"inputStrength": "HIGH",
"outputStrength": "HIGH"},
{"type": "VIOLENCE",
"inputStrength": "HIGH",
"outputStrength": "HIGH"}
]})import time, json, logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def emit_telemetry(session_id, intent, confidence,
sentiment, chunks, in_tok, out_tok,
start_ms, escalated, guardrail):
logger.info(json.dumps({
"event": "agent_turn_complete",
"sessionId": session_id,
"intent": intent,
"nluConfidence": confidence,
"sentiment": sentiment,
"rag_chunks_used": chunks,
"bedrock_input_tokens": in_tok,
"bedrock_output_tokens": out_tok,
"latency_ms": int(
(time.time()*1000) - start_ms),
"escalated": escalated,
"guardrail_triggered": guardrail,
"estimated_cost_usd": round(
(in_tok*0.000003)
+ (out_tok*0.000015), 6)
}))aws logs tail /aws/lambda/AgentForgeLex --since 1m --format json and confirm all 11 fields are present.aws cloudwatch put-metric-alarm \
--alarm-name "AgentForge-EscalationRateHigh" \
--metric-name "EscalationRate" \
--namespace "AgentForge/Copilot" \
--statistic Average --period 300 \
--threshold 15 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 2 \
--alarm-actions \
"arn:aws:sns:us-east-1:ACCOUNT:AgentForgeAlerts" \
--treat-missing-data notBreachingaws cloudwatch set-alarm-state and confirm SNS notification arrives in Slack within 60 seconds.Azure Implementation Path
Replace Amazon Lex with Azure AI Bot Service + Microsoft Copilot Studio for intent classification and dialogue management. Use Azure AI Language (CLU) for custom NLU models with the same intent taxonomy. Session state moves from DynamoDB to Cosmos DB (partition key: sessionId). RAG retrieval uses Azure AI Search (vector + hybrid) with Azure OpenAI Embeddings. LLM generation swaps to Azure OpenAI GPT-4. Sentiment analysis via Azure AI Language Sentiment. Omnichannel delivery through Teams, Power Virtual Agents web chat, and Dynamics 365 Contact Center for voice + human hand-off. Guardrails via Azure AI Content Safety.
az bot create --resource-group agentforge-rg \
--name AgentForgeCopilot --kind webapp \
--sku S1 --location eastus
az cognitiveservices account create \
--name agentforge-language \
--resource-group agentforge-rg \
--kind TextAnalytics --sku S \
--location eastus
az search service create \
--name agentforge-search \
--resource-group agentforge-rg \
--sku standard --partition-count 1GCP Implementation Path
Replace Amazon Lex with Dialogflow CX for intent classification with advanced flow-based conversation design. Session state in Firestore (document: sessions/{sessionId}). RAG retrieval via Vertex AI Search (vector + keyword hybrid) with Vertex AI Embeddings. LLM generation with Gemini Pro via Vertex AI. Sentiment analysis through Cloud Natural Language API. Omnichannel: Dialogflow CX Messenger (web), Dialogflow CX Phone Gateway (voice), and Google Chat integration. Content filtering via Vertex AI Safety Filters. Human hand-off via Contact Center AI (CCAI).
gcloud dialogflow cx agents create \
--display-name="AgentForgeCopilot" \
--location=us-central1 \
--default-language-code=en \
--time-zone="America/New_York"
gcloud ai endpoints create \
--display-name=agentforge-embedding \
--region=us-central1
gcloud alpha contact-center-insights \
operations list --location=us-central1--locale en-US --child-directed false
- Bedrock Agent access enabled; model access for Claude 3 Sonnet confirmed
- IAM role for Bedrock Agent with lambda:InvokeFunction, s3:GetObject, s3:PutObject, dynamodb:PutItem, bedrock:InvokeModel
- OpenSearch Serverless collection created (type: VECTORSEARCH)
- AgentCore CLI installed: pip install amazon-bedrock-agentcore-sdk
# action_group_calendar.yaml
openapi: 3.0.0
info:
title: CalendarActions
description: "Tools for creating and querying
calendar events."
version: 1.0.0
paths:
/create_event:
post:
operationId: create_event
description: "Creates a new calendar event.
Call ONLY after start_time and attendees
are confirmed."
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- title
- start_time
- duration_minutes
- attendees
properties:
title:
type: string
start_time:
type: string
format: date-time
duration_minutes:
type: integer
minimum: 15
maximum: 480
attendees:
type: array
items:
type: string
format: emailnpx @redocly/openapi-cli lint action_group_calendar.yaml and confirm no errors.import json
def lambda_handler(event, context):
action_group = event["actionGroup"]
function = event["function"]
parameters = {p["name"]: p["value"]
for p in event.get("parameters", [])}
dispatch = {
"create_event": handle_create_event,
"get_availability": handle_get_availability,
"send_email": handle_send_email,
}
handler = dispatch.get(function)
if not handler:
return build_response(action_group, function,
{"error": f"Unknown: {function}"})
try:
result = handler(parameters)
return build_response(
action_group, function, result)
except Exception as e:
return build_response(action_group, function,
{"error": str(e), "retry_safe": True})
def build_response(ag, fn, body):
return {
"actionGroup": ag, "function": fn,
"functionResponse": {"responseBody": {
"TEXT": {"body": json.dumps(body)}}}
}import boto3, os
bedrock_agent = boto3.client("bedrock-agent")
agent = bedrock_agent.create_agent(
agentName="AgentForgeTaskAgent",
foundationModel=
"anthropic.claude-3-sonnet-20240229-v1:0",
agentResourceRoleArn=
os.environ["AGENT_ROLE_ARN"],
instruction="""You are an enterprise task agent.
Rules:
1. Confirm high-impact actions before executing.
2. If a required parameter is missing, ask.
3. After each tool call, explain what you did.
4. If a tool returns retry_safe=true, retry once.
5. If a tool fails twice, stop and explain.
6. Never reveal internal tool names to the user.
7. Maximum 8 tool calls per request.""",
idleSessionTTLInSeconds=1800)
AGENT_ID = agent["agent"]["agentId"]kb = bedrock_agent.create_knowledge_base(
name="AgentForgeKB",
roleArn=os.environ["KB_ROLE_ARN"],
knowledgeBaseConfiguration={
"type": "VECTOR",
"vectorKnowledgeBaseConfiguration": {
"embeddingModelArn":
"arn:aws:bedrock:us-east-1::foundation-model/"
"amazon.titan-embed-text-v2:0"
}
},
storageConfiguration={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": {
"collectionArn":
os.environ["OPENSEARCH_COLLECTION_ARN"],
"vectorIndexName": "agentforge-kb-index",
"fieldMapping": {
"vectorField": "embedding",
"textField": "text",
"metadataField": "metadata"
}
}
})
KB_ID = kb["knowledgeBase"]["knowledgeBaseId"]
bedrock_agent.associate_agent_knowledge_base(
agentId=AGENT_ID, agentVersion="DRAFT",
knowledgeBaseId=KB_ID,
description="Company policy docs and FAQ",
knowledgeBaseState="ENABLED")guardrail = boto3.client("bedrock").create_guardrail(
name="TaskAgentGuardrail",
blockedInputMessaging=
"I can't help with that specific request.",
blockedOutputsMessaging=
"I can't share that information.",
topicPolicyConfig={"topicsConfig": [
{"name": "FinancialAdvice",
"definition": "Advice on investments or trading",
"type": "DENY"}
]},
sensitiveInformationPolicyConfig={
"piiEntitiesConfig": [
{"type": "AWS_ACCESS_KEY", "action": "BLOCK"},
{"type": "PASSWORD", "action": "BLOCK"},
{"type": "EMAIL", "action": "ANONYMIZE"}
]},
wordPolicyConfig={
"managedWordListsConfig": [
{"type": "PROFANITY"}],
"wordsConfig": [
{"text": "INTERNAL_CODENAME_HERMES"}]}
)bedrock_agent.prepare_agent(agentId=AGENT_ID)
import time
for _ in range(30):
status = bedrock_agent.get_agent(
agentId=AGENT_ID
)["agent"]["agentStatus"]
if status == "PREPARED":
break
time.sleep(10)
else:
raise RuntimeError("Agent preparation timed out")
alias = bedrock_agent.create_agent_alias(
agentId=AGENT_ID,
agentAliasName="production-v1",
description="Production alias - stable")
ALIAS_ID = alias["agentAlias"]["agentAliasId"]# eventbridge_rule.json
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {"name": ["agentforge-inbox"]},
"object": {"key": [{"suffix": ".pdf"}]}
}
}
aws events put-rule \
--name "AgentForge-S3-Trigger" \
--event-pattern file://eventbridge_rule.json \
--state ENABLED
# In the trigger Lambda:
def build_agent_prompt(s3_event):
bucket = s3_event["detail"]["bucket"]["name"]
key = s3_event["detail"]["object"]["key"]
return (
f"New document: s3://{bucket}/{key}. "
f"Classify type, extract metadata, "
f"route to DynamoDB, send Slack alert.")class AgentMiddleware:
MAX_RETRIES = 2
RETRYABLE = ["ThrottlingException",
"ServiceUnavailableException",
"timeout", "connection refused"]
def __init__(self, agent_id, alias_id):
self.agent_id = agent_id
self.alias_id = alias_id
self.runtime = boto3.client(
"bedrock-agent-runtime")
self.retry_count = 0
def invoke_with_retry(self, session_id, prompt):
try:
return self._stream_response(
session_id, prompt)
except Exception as e:
err = str(e)
retryable = any(
r in err for r in self.RETRYABLE)
if retryable and \
self.retry_count < self.MAX_RETRIES:
self.retry_count += 1
return self.invoke_with_retry(
session_id,
f"Previous attempt failed: {err}."
f" Revise plan and retry.")
raise# bedrock_agentcore.yaml
entrypoint: agent_handler.py
handler: lambda_handler
runtime: python3.11
memory: 512
timeout: 60
environment:
AGENT_ID: ${AGENT_ID}
AGENT_ALIAS_ID: ${AGENT_ALIAS_ID}
OPENSEARCH_ENDPOINT: ${OPENSEARCH_ENDPOINT}
SESSIONS_TABLE: AgentForgeSessions
tools:
- name: create_event
schema: ./schemas/action_group_calendar.yaml
- name: send_email
schema: ./schemas/action_group_email.yaml
memory:
session_store: dynamodb://AgentForgeSessions
observability:
cloudwatch: enabled
xray: enabled
# Deploy:
agentcore configure -e agent_handler.py
agentcore launch
agentcore invoke '{"prompt": "Schedule a Q3 review"}'agentcore status shows "RUNNING" and invoke response contains a tool call trace.from dataclasses import dataclass
@dataclass
class TestCase:
id: str
prompt: str
expected_tools: list # ordered
expected_contains: list # keywords
expected_excludes: list
def evaluate_agent(test_cases):
results = {"passed": 0, "failed": 0,
"tool_accuracy": []}
for tc in test_cases:
response, trace = invoke_agent_with_trace(
tc.prompt)
actual = [t["name"] for t in trace
if t["type"] == "tool_call"]
tool_ok = actual == tc.expected_tools
content_ok = all(
kw in response
for kw in tc.expected_contains)
content_bad = any(
kw in response
for kw in tc.expected_excludes)
passed = tool_ok and content_ok \
and not content_bad
results["passed" if passed
else "failed"] += 1
results["tool_accuracy"].append(
1 if tool_ok else 0)
results["accuracy_pct"] = sum(
results["tool_accuracy"]
) / len(test_cases) * 100
return resultsAzure Implementation Path
Replace Bedrock Agents with Azure AI Agent Service (formerly Azure AI Foundry Agents) for ReAct-style autonomous task execution. Action Groups map to Azure Functions with OpenAPI tool definitions. Knowledge Base equivalent: Azure AI Search with Azure OpenAI Embeddings. Orchestration for deterministic branching via Azure Logic Apps or Durable Functions. Guardrails via Azure AI Content Safety + Responsible AI tools. Event-driven triggers through Event Grid → Azure Functions. Session memory in Cosmos DB.
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
client = AIProjectClient(
credential=DefaultAzureCredential(),
endpoint="https://agentforge.openai.azure.com")
agent = client.agents.create_agent(
model="gpt-4",
name="AgentForgeTaskAgent",
instructions="You are an enterprise task agent...",
tools=[{"type": "function", "function": {
"name": "create_event",
"parameters": {...}}}])GCP Implementation Path
Replace Bedrock Agents with Vertex AI Agent Builder for autonomous agent creation with tool-use. Action Groups map to Cloud Functions (2nd gen) with function declarations. Knowledge Base via Vertex AI Search (RAG engine). Orchestration via Cloud Workflows for deterministic steps. Event-driven triggers through Eventarc → Cloud Functions. Guardrails via Vertex AI Safety Settings. Session state in Firestore.
from vertexai.preview import reasoning_engines
agent = reasoning_engines.LangchainAgent(
model="gemini-1.5-pro",
tools=[create_event, get_availability],
agent_executor_kwargs={
"return_intermediate_steps": True})
remote_agent = reasoning_engines.ReasoningEngine.create(
agent, display_name="AgentForgeTaskAgent",
requirements=["google-cloud-aiplatform"])--bucket agentforge-inbox --notification-config file://notify.json
- GitHub repo with Actions enabled, PAT with repo + pull_request scope
- AWS CLI v2, Terraform 1.6+, Python 3.11+, boto3
- Bedrock model access: Claude 3 Sonnet
- IAM role with SecurityAudit + SSMAutomationRole policies
- Jira API token for ticket creation
- Security Hub enabled in target account
# .github/workflows/ai-review.yml
name: AI Code Review
on:
pull_request:
types: [opened, synchronize]
jobs:
review:
runs-on: ubuntu-latest
permissions:
pull-requests: write
contents: read
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Get PR diff
id: diff
run: |
gh pr diff ${{ github.event.number }} \
> /tmp/pr_diff.txt
echo "lines=$(wc -l < /tmp/pr_diff.txt)" \
>> $GITHUB_OUTPUT
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Run AI Review
run: python review_agent.py \
${{ github.event.number }}
env:
BEDROCK_REGION: us-east-1
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}import boto3, json
bedrock = boto3.client("bedrock-runtime",
region_name="us-east-1")
def review_diff(diff_text: str) -> list:
prompt = f"""Review this code diff. Return JSON:
[{{"file": "path", "line": N,
"severity": "CRITICAL|HIGH|MEDIUM|LOW",
"issue": "description",
"suggestion": "fix"}}]
Rules:
- Flag security issues as CRITICAL
- Flag performance issues as HIGH
- Flag style issues as LOW
- Include line numbers from the diff
Diff:
{diff_text[:12000]}"""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{"role": "user",
"content": prompt}]
}))
body = json.loads(resp["body"].read())
return json.loads(
body["content"][0]["text"])import requests, os
def post_review_comments(pr_number, findings):
token = os.environ["GITHUB_TOKEN"]
repo = os.environ["GITHUB_REPOSITORY"]
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github+json"
}
comments = []
for f in findings:
comments.append({
"path": f["file"],
"line": f["line"],
"body": f"**[{f['severity']}]** "
f"{f['issue']}\n\n"
f"**Suggestion:** {f['suggestion']}"
})
critical = sum(
1 for f in findings
if f["severity"] == "CRITICAL")
event = "REQUEST_CHANGES" if critical > 0 \
else "COMMENT"
resp = requests.post(
f"https://api.github.com/repos/{repo}"
f"/pulls/{pr_number}/reviews",
headers=headers,
json={"body": f"AI Review: {len(findings)} "
f"issues ({critical} critical)",
"event": event,
"comments": comments})
return resp.status_code# .github/workflows/drift-detect.yml
name: Terraform Drift Detection
on:
schedule:
- cron: '0 6 * * *' # Daily at 6 AM UTC
jobs:
drift:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: hashicorp/setup-terraform@v3
with:
terraform_version: 1.6.0
- name: Terraform Plan
run: |
cd infrastructure/terraform/environments/prod
terraform init -backend-config=prod.hcl
terraform plan -detailed-exitcode \
-json -out=plan.json \
> plan_output.json 2>&1 || true
- name: Analyze Drift
if: steps.plan.outcome == 'failure'
run: python drift_agent.py plan_output.json|| true and check exit code separately.def handle_drift(plan_json: dict) -> dict:
drift_resources = [
r for r in plan_json.get(
"resource_changes", [])
if r["change"]["actions"] != ["no-op"]]
prompt = f"""Analyze this Terraform drift:
{json.dumps(drift_resources[:5], indent=2)}
For each drifted resource, provide:
1. What changed and likely root cause
2. Corrective HCL code to fix the drift
3. Risk assessment (LOW/MEDIUM/HIGH)
4. Whether auto-apply is safe
Return JSON array."""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{"role": "user",
"content": prompt}]}))
remediation = json.loads(
resp["body"].read()
)["content"][0]["text"]
return json.loads(remediation)# EventBridge rule for Security Hub
aws events put-rule \
--name "SecurityHub-HighFindings" \
--event-pattern '{
"source": ["aws.securityhub"],
"detail-type": ["Security Hub Findings"],
"detail": {
"findings": {
"Severity": {"Label": ["HIGH","CRITICAL"]}
}
}
}' --state ENABLED
# Lambda handler:
def handle_security_finding(event, context):
finding = event["detail"]["findings"][0]
resource = finding["Resources"][0]
prompt = (
f"Security Hub finding: "
f"{finding['Title']}\n"
f"Resource: {resource['Id']}\n"
f"Severity: {finding['Severity']['Label']}\n"
f"Generate Terraform remediation code."
)
remediation = invoke_bedrock(prompt)
create_pr(remediation, finding["Id"])
return {"statusCode": 200}# step_function_definition.json
{
"StartAt": "PlanReview",
"States": {
"PlanReview": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCT:function:TfPlan",
"Next": "WaitForApproval"
},
"WaitForApproval": {
"Type": "Task",
"Resource": "arn:aws:states:::sqs:sendMessage.waitForTaskToken",
"Parameters": {
"QueueUrl": "${ApprovalQueueUrl}",
"MessageBody": {
"taskToken.$": "$$.Task.Token",
"plan.$": "$.plan_summary"
}
},
"TimeoutSeconds": 86400,
"Next": "Apply"
},
"Apply": {
"Type": "Task",
"Resource": "arn:aws:lambda:REGION:ACCT:function:TfApply",
"End": true
}
}
}def handle_pagerduty_webhook(event, context):
incident = json.loads(event["body"])
service = incident["event"]["data"]["service"]["name"]
# Fetch recent error logs
logs_client = boto3.client("logs")
resp = logs_client.filter_log_events(
logGroupName=f"/aws/lambda/{service}",
startTime=int((time.time() - 3600) * 1000),
filterPattern="ERROR",
limit=50)
error_logs = "\n".join(
[e["message"] for e in resp["events"]])
prompt = (
f"Service: {service}\n"
f"Recent errors:\n{error_logs[:8000]}\n\n"
f"1. Identify root cause\n"
f"2. Generate a code fix\n"
f"3. Explain the fix"
)
fix = invoke_bedrock(prompt)
pr_url = create_pr_with_fix(service, fix)
notify_oncall(pr_url, incident["event"]["data"])
return {"statusCode": 200}def estimate_blast_radius(plan_json: dict) -> dict:
changes = plan_json.get("resource_changes", [])
affected = [r for r in changes
if r["change"]["actions"] != ["no-op"]]
# Check dependency graph
destroyed = [r for r in affected
if "delete" in r["change"]["actions"]]
modified = [r for r in affected
if "update" in r["change"]["actions"]]
risk_score = (
len(destroyed) * 3 +
len(modified) * 1)
return {
"total_affected": len(affected),
"destroyed": len(destroyed),
"modified": len(modified),
"risk_score": risk_score,
"requires_approval": risk_score > 10,
"summary": f"{len(affected)} resources "
f"affected, {len(destroyed)} destroyed"
}def log_agent_action(action_type, metadata):
logger.info(json.dumps({
"event": "devops_agent_action",
"action_type": action_type,
"timestamp": datetime.utcnow().isoformat(),
"pr_number": metadata.get("pr_number"),
"findings_count": metadata.get("findings", 0),
"severity_breakdown": metadata.get("severity"),
"drift_resources": metadata.get("drift_count"),
"remediation_generated": metadata.get("remediated"),
"latency_ms": metadata.get("latency_ms"),
"estimated_cost_usd": metadata.get("cost")
}))
# Usage:
log_agent_action("code_review", {
"pr_number": 42,
"findings": 5,
"severity": {"CRITICAL": 1, "HIGH": 2,
"MEDIUM": 1, "LOW": 1},
"latency_ms": 3200,
"cost": 0.012
})# In the GitHub Actions workflow:
- name: Quality Gate
run: |
CRITICAL=$(python -c "
import json
f = json.load(open('findings.json'))
print(sum(1 for x in f
if x['severity'] == 'CRITICAL'))")
if [ "$CRITICAL" -gt 0 ]; then
echo "::error::CRITICAL issues found"
exit 1
fi
# Branch protection rule:
gh api repos/{owner}/{repo}/branches/main/protection \
--method PUT --input - << 'EOF'
{
"required_status_checks": {
"strict": true,
"contexts": ["AI Code Review / review"]
},
"enforce_admins": true,
"required_pull_request_reviews": {
"required_approving_review_count": 1
}
}
EOFaws cloudwatch put-dashboard \
--dashboard-name "AgentForge-DevOps" \
--dashboard-body '{
"widgets": [
{"type": "metric", "properties": {
"title": "PRs Reviewed / Day",
"metrics": [
["AgentForge/DevOps", "PRsReviewed",
"Period", 86400]
]}},
{"type": "metric", "properties": {
"title": "Mean Review Latency (ms)",
"metrics": [
["AgentForge/DevOps", "ReviewLatency",
"Statistic", "Average"]
]}},
{"type": "metric", "properties": {
"title": "Remediation Success Rate",
"metrics": [
["AgentForge/DevOps", "RemediationSuccess"]
]}},
{"type": "metric", "properties": {
"title": "Cost per Review (USD)",
"metrics": [
["AgentForge/DevOps", "CostPerReview"]
]}}
]}'Azure Implementation Path
Replace GitHub Actions with Azure Pipelines for CI/CD triggers on PRs. Code analysis via Azure DevOps AI-assisted reviews. IaC drift detection with Terraform Cloud or Azure Resource Graph queries. Security findings from Microsoft Defender for Cloud (replacing Security Hub). Remediation PRs via Azure Repos API. Approval gates via Azure Pipelines Environments with manual approval checks. Incident alerting via Azure Monitor + Logic Apps integration.
# azure-pipelines.yml
trigger:
branches:
include: [main]
pr:
branches:
include: [main]
stages:
- stage: AIReview
jobs:
- job: CodeReview
steps:
- script: |
python review_agent.py \
--provider azure \
--model gpt-4
env:
AZURE_OPENAI_KEY: $(AZURE_KEY)GCP Implementation Path
Replace GitHub Actions with Cloud Build triggers on PRs. Code analysis via Gemini Code Assist. IaC drift detection using Cloud Asset Inventory + Terraform. Security findings from Security Command Center (SCC). Approval gates via Cloud Deploy approval policies. Incident alerting via Cloud Monitoring + Cloud Alerting. LLM analysis via Vertex AI (Gemini Pro).
# cloudbuild.yaml
steps:
- name: 'python:3.11'
entrypoint: 'python'
args: ['review_agent.py', '--provider', 'gcp']
secretEnv: ['VERTEX_API_KEY']
- name: 'hashicorp/terraform'
args: ['plan', '-detailed-exitcode']
availableSecrets:
secretManager:
- versionName: projects/P/secrets/vertex-key/versions/1
env: 'VERTEX_API_KEY'- run: python review_agent.py ${{ github.event.pull_request.number }}
- S3 bucket with 50+ documents (PDF, CSV, HTML)
- OpenSearch Serverless collection (type: VECTORSEARCH)
- Bedrock model access: Claude 3 Sonnet + Titan Embeddings V2
- Glue Data Catalog with at least one database + 3 tables
- Athena workgroup configured with S3 output location
- Python 3.11+, boto3, opensearch-py, pandas
from langchain.text_splitter import (
RecursiveCharacterTextSplitter)
import fitz # PyMuPDF
def chunk_pdf(pdf_path: str,
chunk_size=512, overlap=102):
doc = fitz.open(pdf_path)
full_text = ""
for page in doc:
full_text += page.get_text() + "\n"
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
length_function=lambda t: len(
t.split()), # token-approximate
separators=["\n\n", "\n", ". ", " "])
chunks = splitter.create_documents(
[full_text],
metadatas=[{"source": pdf_path,
"total_pages": len(doc)}])
return chunks
# Process all documents
import glob
all_chunks = []
for pdf in glob.glob("s3_docs/*.pdf"):
all_chunks.extend(chunk_pdf(pdf))
print(f"Total chunks: {len(all_chunks)}")# Create collection
aws opensearchserverless create-collection \
--name agentforge-vectors \
--type VECTORSEARCH \
--description "RAG vector store"
# Create index mapping
PUT agentforge-kb-index
{
"settings": {
"index": {
"knn": true,
"knn.algo_param.ef_search": 512
}
},
"mappings": {
"properties": {
"embedding": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"engine": "faiss",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"ef_construction": 512,
"m": 16
}
}
},
"text": {"type": "text"},
"source": {"type": "keyword"},
"chunk_id": {"type": "keyword"},
"metadata": {"type": "object"}
}
}
}aws opensearchserverless list-collections and confirm status is ACTIVE. Verify the index exists via the OpenSearch dashboard.import boto3, json, time
from opensearchpy import OpenSearch
bedrock = boto3.client("bedrock-runtime")
def embed_text(text: str) -> list:
resp = bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=json.dumps({"inputText": text}))
return json.loads(
resp["body"].read())["embedding"]
def index_chunks(chunks, os_client, index_name):
for i, chunk in enumerate(chunks):
vector = embed_text(chunk.page_content)
doc = {
"embedding": vector,
"text": chunk.page_content,
"source": chunk.metadata["source"],
"chunk_id": f"chunk-{i}",
"metadata": chunk.metadata
}
os_client.index(
index=index_name,
body=doc,
id=f"chunk-{i}")
if i % 50 == 0:
print(f"Indexed {i}/{len(chunks)}")
time.sleep(0.1) # Rate limit guardGET agentforge-kb-index/_count. Confirm the count matches expected chunks.def hybrid_search(query: str, top_k: int = 20):
query_vec = embed_text(query)
# Vector search (kNN)
vector_results = os_client.search(
index="agentforge-kb-index",
body={"size": top_k, "query": {"knn": {
"embedding": {
"vector": query_vec, "k": top_k
}}}})
# Keyword search (BM25)
keyword_results = os_client.search(
index="agentforge-kb-index",
body={"size": top_k, "query": {
"match": {"text": query}}})
# Reciprocal Rank Fusion
scores = {}
for rank, hit in enumerate(
vector_results["hits"]["hits"]):
doc_id = hit["_id"]
scores[doc_id] = scores.get(doc_id, 0) \
+ 1.0 / (60 + rank)
for rank, hit in enumerate(
keyword_results["hits"]["hits"]):
doc_id = hit["_id"]
scores[doc_id] = scores.get(doc_id, 0) \
+ 1.0 / (60 + rank)
ranked = sorted(scores.items(),
key=lambda x: x[1], reverse=True)
return ranked[:top_k]from sentence_transformers import CrossEncoder
reranker = CrossEncoder(
"cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank_results(query: str,
candidates: list, top_k: int = 5):
pairs = [(query, doc["text"])
for doc in candidates]
scores = reranker.predict(pairs)
scored = list(zip(candidates, scores))
scored.sort(key=lambda x: x[1], reverse=True)
return [
{**doc, "rerank_score": float(score)}
for doc, score in scored[:top_k]
]
# Usage in pipeline:
raw_results = hybrid_search(user_query, top_k=20)
docs = fetch_documents(raw_results)
top_docs = rerank_results(user_query, docs, top_k=5)def generate_answer(query: str,
context_docs: list) -> dict:
context = "\n\n".join([
f"[Source: {d['source']}, "
f"Score: {d['rerank_score']:.2f}]\n"
f"{d['text']}"
for d in context_docs])
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"system": "Answer based ONLY on the "
"provided context. Cite sources "
"as [Source: filename]. If the "
"context does not contain the "
"answer, say so explicitly.",
"messages": [{"role": "user",
"content": f"Context:\n{context}"
f"\n\nQuestion: {query}"}]
}))
body = json.loads(resp["body"].read())
return {
"answer": body["content"][0]["text"],
"sources": [d["source"]
for d in context_docs],
"tokens_used": body["usage"]
}glue = boto3.client("glue")
athena = boto3.client("athena")
def get_schema(database: str) -> str:
tables = glue.get_tables(
DatabaseName=database)["TableList"]
schema_str = ""
for t in tables:
cols = ", ".join(
[f"{c['Name']} {c['Type']}"
for c in t["StorageDescriptor"]["Columns"]])
schema_str += f"Table: {t['Name']} ({cols})\n"
return schema_str
def nl_to_sql(question: str, database: str) -> str:
schema = get_schema(database)
prompt = f"""Convert to SQL for Amazon Athena.
Schema:
{schema}
Rules: Use double quotes for identifiers.
Only SELECT queries. No DDL/DML.
Question: {question}
Return ONLY the SQL, no explanation."""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1024,
"messages": [{"role": "user",
"content": prompt}]}))
sql = json.loads(
resp["body"].read()
)["content"][0]["text"].strip()
return sqldef gated_answer(query: str) -> dict:
raw = hybrid_search(query, top_k=20)
docs = fetch_documents(raw)
ranked = rerank_results(query, docs, top_k=5)
top_score = ranked[0]["rerank_score"] \
if ranked else 0
if top_score < 0.72:
log_low_confidence(query, top_score)
return {
"answer": "I don't have enough "
"information in the knowledge base "
"to answer this question accurately. "
"Please contact support for help.",
"confidence": top_score,
"gated": True,
"sources": []
}
return generate_answer(query, ranked)def build_citations(context_docs: list,
answer: str) -> list:
citations = []
for doc in context_docs:
# Check if this source was actually cited
source_name = doc["source"].split("/")[-1]
if source_name.lower() in answer.lower():
citations.append({
"source": doc["source"],
"chunk_id": doc["chunk_id"],
"relevance_score":
doc["rerank_score"],
"page": doc.get("metadata", {})
.get("page_number", "N/A"),
"excerpt": doc["text"][:200]
})
return citations
# Include in response:
result = generate_answer(query, ranked)
result["citations"] = build_citations(
ranked, result["answer"])from ragas import evaluate
from ragas.metrics import (
faithfulness, answer_relevancy,
context_precision, context_recall)
from datasets import Dataset
def run_ragas_eval(test_cases: list) -> dict:
data = {
"question": [tc["question"]
for tc in test_cases],
"answer": [tc["generated_answer"]
for tc in test_cases],
"contexts": [tc["retrieved_contexts"]
for tc in test_cases],
"ground_truth": [tc["expected_answer"]
for tc in test_cases]
}
dataset = Dataset.from_dict(data)
results = evaluate(
dataset,
metrics=[faithfulness, answer_relevancy,
context_precision, context_recall])
return {
"faithfulness": results["faithfulness"],
"answer_relevancy":
results["answer_relevancy"],
"context_precision":
results["context_precision"],
"context_recall":
results["context_recall"]
}import tiktoken
def estimate_cost(in_tokens, out_tokens,
model="claude-3-sonnet"):
prices = {
"claude-3-sonnet": {
"input": 0.000003,
"output": 0.000015}}
p = prices[model]
return round(
in_tokens * p["input"] +
out_tokens * p["output"], 6)
def compress_context(chunks: list,
max_tokens: int = 4000) -> list:
"""Keep top chunks within token budget"""
enc = tiktoken.get_encoding("cl100k_base")
selected, total = [], 0
for chunk in chunks:
tokens = len(enc.encode(chunk["text"]))
if total + tokens > max_tokens:
break
selected.append(chunk)
total += tokens
return selectedaws cloudwatch put-metric-alarm \
--alarm-name "RAG-FaithfulnessDrop" \
--metric-name "Faithfulness" \
--namespace "AgentForge/RAG" \
--statistic Average --period 3600 \
--threshold 0.80 \
--comparison-operator LessThanThreshold \
--evaluation-periods 3 \
--alarm-actions \
"arn:aws:sns:us-east-1:ACCOUNT:RAGAlerts" \
--treat-missing-data notBreaching
# Also track per-query metrics:
aws cloudwatch put-metric-data \
--namespace "AgentForge/RAG" \
--metric-data '[
{"MetricName":"QueryLatency",
"Value":1250,"Unit":"Milliseconds"},
{"MetricName":"CostPerQuery",
"Value":0.008,"Unit":"None"},
{"MetricName":"RetrievalScore",
"Value":0.87,"Unit":"None"}
]'Azure Implementation Path
Replace OpenSearch with Azure AI Search (vector + semantic hybrid search). Embeddings via Azure OpenAI text-embedding-ada-002. NL-to-SQL targets Azure Synapse Analytics or Azure SQL with schema from Azure Purview (data catalog). BI integration via Power BI Copilot for natural language queries on dashboards. Evaluation via Azure AI Studio built-in evaluation. Re-ranking via Azure AI Search semantic ranker (built-in, no external model needed).
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery
vector_query = VectorizedQuery(
vector=embed_query(user_question),
k_nearest_neighbors=20,
fields="content_vector")
results = search_client.search(
search_text=user_question, # BM25 hybrid
vector_queries=[vector_query],
query_type="semantic", # built-in re-ranker
semantic_configuration_name="default",
top=5)GCP Implementation Path
Replace OpenSearch with Vertex AI Search (managed RAG with built-in chunking, embedding, and retrieval). Embeddings via Vertex AI textembedding-gecko. NL-to-SQL targets BigQuery with schema from Dataplex (data catalog). BI via Looker with Gemini NLQ for natural language queries. Built-in evaluation via Vertex AI Evaluation. BigQuery ML for in-database embedding generation at scale.
from google.cloud import discoveryengine_v1
client = discoveryengine_v1.SearchServiceClient()
request = discoveryengine_v1.SearchRequest(
serving_config=f"projects/{PROJECT}/locations/global"
f"/collections/default_collection"
f"/engines/agentforge-search/servingConfigs/default",
query=user_question,
page_size=5,
content_search_spec={
"snippet_spec": {"return_snippet": True},
"summary_spec": {
"summary_result_count": 5,
"include_citations": True,
"model_spec": {"version": "gemini-1.5-flash"}
}
})WHERE region = 'us-east-1' AND date > '2024-01-01'
- GuardDuty enabled in all active regions
- Security Hub enabled with AWS Foundational Security Best Practices standard
- CloudTrail logging to S3 with organization trail
- EventBridge configured in the security account
- IAM role with SecurityAudit, GuardDuty read, SSM Automation permissions
- Bedrock model access: Claude 3 Sonnet
- Slack webhook URL for security alerts; Jira project for security findings
#!/bin/bash
REGIONS=$(aws ec2 describe-regions \
--query 'Regions[].RegionName' \
--output text)
for REGION in $REGIONS; do
echo "Enabling GuardDuty in $REGION"
aws guardduty create-detector \
--enable \
--finding-publishing-frequency FIFTEEN_MINUTES \
--region $REGION 2>/dev/null || true
done
# EventBridge rule for ALL GuardDuty findings
aws events put-rule \
--name "GuardDuty-AllFindings" \
--event-pattern '{
"source": ["aws.guardduty"],
"detail-type": [
"GuardDuty Finding"
]
}' \
--state ENABLED
aws events put-targets \
--rule "GuardDuty-AllFindings" \
--targets "Id"="SecurityTriageLambda","Arn"="arn:aws:lambda:us-east-1:ACCOUNT:function:SecurityTriage"aws guardduty list-detectors --region us-east-1 in each region and confirm a detector exists. Verify the EventBridge rule with aws events describe-rule.import boto3, json
from datetime import datetime, timedelta
cloudtrail = boto3.client("cloudtrail")
def enrich_finding(finding: dict) -> dict:
resource = finding["resource"]
actor = finding.get("service", {}).get(
"action", {}).get("awsApiCallAction", {})
principal = actor.get("remoteIpDetails", {}).get(
"ipAddressV4", "unknown")
# Query CloudTrail for context
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
events = cloudtrail.lookup_events(
LookupAttributes=[{
"AttributeKey": "ResourceType",
"AttributeValue":
resource.get("resourceType", "")
}],
StartTime=start_time,
EndTime=end_time,
MaxResults=50
)["Events"]
return {
"finding": finding,
"cloudtrail_context": [
{"event_name": e["EventName"],
"event_time": str(e["EventTime"]),
"username": e.get("Username", "N/A"),
"source_ip": json.loads(
e["CloudTrailEvent"]
).get("sourceIPAddress", "N/A")}
for e in events
],
"enrichment_timestamp":
datetime.utcnow().isoformat()
}def classify_mitre(enriched_finding: dict) -> dict:
prompt = f"""You are a security analyst.
Classify this GuardDuty finding using
MITRE ATT&CK framework.
Finding: {json.dumps(
enriched_finding["finding"], indent=2)}
CloudTrail context: {json.dumps(
enriched_finding["cloudtrail_context"][:10],
indent=2)}
Return JSON:
{{
"tactic": "TA0001-TA0011 name",
"technique": "T-number and name",
"sub_technique": "if applicable",
"confidence": 0.0-1.0,
"severity_override": "LOW/MEDIUM/HIGH/CRITICAL",
"kill_chain_phase": "recon/weaponize/deliver/exploit/install/c2/action",
"ioc_indicators": ["list of IOCs"],
"recommended_actions": ["list"]
}}"""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"messages": [{"role": "user",
"content": prompt}]}))
return json.loads(
resp["body"].read()
)["content"][0]["text"]def generate_narrative(enriched: dict,
mitre: dict) -> str:
prompt = f"""Write a security incident narrative.
MITRE Classification: {json.dumps(mitre)}
Finding Details: {json.dumps(
enriched["finding"], indent=2)[:3000]}
Structure your response as:
## What Happened
[1-2 sentence summary]
## Affected Resources
[List resources with ARNs]
## Blast Radius
[Estimate: how many other resources could
be affected if this is a real attack]
## Immediate Actions (next 30 minutes)
[Numbered list]
## Long-Term Remediation (next 7 days)
[Numbered list]
## Risk Score: [1-10]"""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"messages": [{"role": "user",
"content": prompt}]}))
return json.loads(
resp["body"].read()
)["content"][0]["text"]ec2 = boto3.client("ec2")
iam = boto3.client("iam")
ssm = boto3.client("ssm")
def auto_contain(finding: dict):
severity = finding["severity"]
if severity < 7.0:
return {"action": "monitor_only"}
resource = finding["resource"]
instance_id = resource.get(
"instanceDetails", {}).get("instanceId")
if not instance_id:
return {"action": "no_instance"}
# 1. Create isolation security group
vpc_id = resource["instanceDetails"] \
["networkInterfaces"][0]["vpcId"]
iso_sg = ec2.create_security_group(
GroupName=f"ISOLATION-{instance_id}",
Description="Forensic isolation - no traffic",
VpcId=vpc_id)
# No ingress/egress rules = fully isolated
# 2. Swap security groups
ec2.modify_instance_attribute(
InstanceId=instance_id,
Groups=[iso_sg["GroupId"]])
# 3. Create memory snapshot
ssm.send_command(
InstanceIds=[instance_id],
DocumentName="AWS-RunShellScript",
Parameters={"commands": [
"dd if=/dev/mem of=/tmp/memdump.raw "
"bs=1M count=1024",
"aws s3 cp /tmp/memdump.raw "
f"s3://forensics-bucket/{instance_id}/"
]})
return {"action": "contained",
"instance": instance_id,
"isolation_sg": iso_sg["GroupId"]}iam_client = boto3.client("iam")
def analyze_policies():
paginator = iam_client.get_paginator(
"list_policies")
overpermissive = []
for page in paginator.paginate(Scope="Local"):
for policy in page["Policies"]:
version = iam_client.get_policy_version(
PolicyArn=policy["Arn"],
VersionId=policy["DefaultVersionId"]
)["PolicyVersion"]["Document"]
# Check for wildcards
for stmt in version.get(
"Statement", []):
actions = stmt.get("Action", [])
if isinstance(actions, str):
actions = [actions]
if any("*" in a for a in actions):
overpermissive.append({
"arn": policy["Arn"],
"name": policy["PolicyName"],
"document": version
})
return overpermissive
def suggest_tightened_policy(policy: dict) -> str:
prompt = f"""This IAM policy is overly permissive:
{json.dumps(policy["document"], indent=2)}
Generate a least-privilege replacement that:
1. Replaces wildcard actions with specific actions
2. Adds resource-level constraints
3. Adds condition keys where appropriate
4. Preserves the intended functionality
Return valid IAM policy JSON only."""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{"role": "user",
"content": prompt}]}))
return json.loads(
resp["body"].read()
)["content"][0]["text"]aws iam create-policy --dry-run.import requests
def route_alert(finding: dict, narrative: str,
mitre: dict):
severity = finding["severity"]
if severity >= 9.0: # CRITICAL
send_pagerduty(finding, narrative)
send_slack(finding, narrative, "#security-critical")
create_jira_ticket(finding, narrative, "Critical")
elif severity >= 7.0: # HIGH
send_slack(finding, narrative, "#security-alerts")
create_jira_ticket(finding, narrative, "High")
elif severity >= 4.0: # MEDIUM
create_jira_ticket(finding, narrative, "Medium")
else: # LOW
log_to_dashboard(finding, narrative)
def send_slack(finding, narrative, channel):
requests.post(os.environ["SLACK_WEBHOOK"], json={
"channel": channel,
"blocks": [{
"type": "section",
"text": {"type": "mrkdwn",
"text": f"*Security Alert*\n"
f"Severity: {finding['severity']}\n"
f"Type: {finding['type']}\n"
f"{narrative[:500]}"}
}]
})config_client = boto3.client("config")
def run_compliance_sweep():
# Get conformance pack results
results = config_client \
.get_conformance_pack_compliance_details(
ConformancePackName=
"agentforge-soc2-pack",
Filters={"ComplianceType":
"NON_COMPLIANT"})
non_compliant = results[
"ConformancePackRuleComplianceList"]
# Generate delta report via Bedrock
prompt = f"""SOC 2 Type II compliance sweep.
{len(non_compliant)} non-compliant controls found:
{json.dumps(non_compliant[:20], indent=2)}
Generate a compliance delta report with:
1. Summary of non-compliant controls
2. Risk level for each control (1-10)
3. Remediation steps for each
4. Estimated effort to fix (hours)
5. Priority ranking
Return structured JSON."""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"messages": [{"role": "user",
"content": prompt}]}))
return json.loads(
resp["body"].read()
)["content"][0]["text"]macie = boto3.client("macie2")
# Enable Macie and create classification job
macie.enable_macie()
job = macie.create_classification_job(
jobType="SCHEDULED",
name="AgentForge-PII-Scan",
s3JobDefinition={
"bucketDefinitions": [{
"accountId": os.environ["ACCOUNT_ID"],
"buckets": [
"customer-data-bucket",
"analytics-raw-data"
]
}]
},
scheduleFrequencyUpdate={
"dailySchedule": {}
},
managedDataIdentifierSelector="ALL"
)
def handle_macie_finding(finding):
prompt = f"""Macie PII finding:
Type: {finding["type"]}
Severity: {finding["severity"]["score"]}
Resource: {finding["resourcesAffected"]}
PII types: {finding["classificationDetails"]}
Recommend: 1) immediate action,
2) data handling fix, 3) prevention."""
return invoke_bedrock(prompt)def correlate_threats(findings: list) -> dict:
# Group by source IP
ip_groups = {}
for f in findings:
ip = f.get("service", {}).get("action", {}) \
.get("networkConnectionAction", {}) \
.get("remoteIpDetails", {}) \
.get("ipAddressV4", "unknown")
if ip not in ip_groups:
ip_groups[ip] = []
ip_groups[ip].append(f)
# Identify potential campaigns
campaigns = []
for ip, group in ip_groups.items():
if len(group) >= 3:
campaigns.append({
"source_ip": ip,
"finding_count": len(group),
"finding_types": list(set(
f["type"] for f in group)),
"time_span_hours":
calculate_time_span(group),
"is_campaign": True
})
prompt = f"""Analyze these potential attack campaigns:
{json.dumps(campaigns, indent=2)}
For each campaign:
1. Classify attack type
2. Assess sophistication (1-10)
3. Predict next likely action
4. Recommend countermeasures"""
return invoke_bedrock(prompt)aws cloudwatch put-dashboard \
--dashboard-name "AgentForge-Security" \
--dashboard-body '{
"widgets": [
{"type":"metric","properties":{
"title":"Findings by Severity (Daily)",
"metrics":[
["AgentForge/Security","Findings",
"Severity","CRITICAL"],
["AgentForge/Security","Findings",
"Severity","HIGH"],
["AgentForge/Security","Findings",
"Severity","MEDIUM"]
],"period":86400}},
{"type":"metric","properties":{
"title":"Mean Time to Remediate (hours)",
"metrics":[
["AgentForge/Security","MTTR",
"Severity","CRITICAL"],
["AgentForge/Security","MTTR",
"Severity","HIGH"]
],"stat":"Average"}},
{"type":"metric","properties":{
"title":"Containment Success Rate (%)",
"metrics":[
["AgentForge/Security",
"ContainmentSuccess"]
]}},
{"type":"metric","properties":{
"title":"Compliance Score Trend",
"metrics":[
["AgentForge/Security",
"SOC2ComplianceScore"]
],"period":86400}}
]}'def generate_weekly_report():
# Collect data from multiple sources
findings = get_weekly_findings()
compliance = get_compliance_status()
containment = get_containment_stats()
prompt = f"""Generate a weekly security
posture report for executive leadership.
New findings this week: {len(findings["new"])}
Resolved findings: {len(findings["resolved"])}
Open findings: {len(findings["open"])}
Compliance score: {compliance["score"]}%
Compliance drift: {compliance["drift"]}
Top findings: {json.dumps(
findings["new"][:10], indent=2)}
Write a professional report with:
1. Executive Summary (3 sentences)
2. Key Metrics table
3. Top 5 Risks with risk scores
4. Week-over-week trend analysis
5. Recommended Actions (prioritized)
6. Compliance Status by control family"""
report = invoke_bedrock(prompt)
# Send via SES
ses = boto3.client("ses")
ses.send_email(
Source="security@company.com",
Destination={"ToAddresses":
["ciso@company.com"]},
Message={
"Subject": {"Data":
"Weekly Security Posture Report"},
"Body": {"Html": {"Data": report}}
})Azure Implementation Path
Replace GuardDuty with Microsoft Defender for Cloud for threat detection across Azure resources. Security aggregation via Microsoft Sentinel (SIEM/SOAR) replacing Security Hub. Identity risk analysis via Microsoft Entra ID Protection. CloudTrail equivalent: Azure Activity Log + Azure Monitor. Auto-containment via Azure Logic Apps playbooks triggered by Sentinel. Compliance via Microsoft Defender for Cloud regulatory compliance dashboard (CIS, SOC 2, ISO 27001 built-in). IAM analysis via Entra Permissions Management.
az sentinel alert-rule create \
--resource-group agentforge-security \
--workspace-name agentforge-sentinel \
--rule-name "HighSeverityThreat" \
--severity High \
--query "SecurityAlert | where AlertSeverity == 'High'"
az security assessment create \
--name "SOC2-Compliance" \
--status-code "Unhealthy"GCP Implementation Path
Replace GuardDuty with Security Command Center (SCC) Premium for threat detection. SIEM via Chronicle Security Operations replacing Security Hub. Identity analysis via IAM Recommender + Policy Intelligence. Audit logging via Cloud Audit Logs. Auto-containment via Cloud Functions triggered by SCC notifications. Compliance via SCC Compliance Reports (CIS, PCI DSS, NIST). Secrets management via Secret Manager.
gcloud scc notifications create agentforge-alerts \
--pubsub-topic=projects/PROJECT/topics/scc-findings \
--filter='severity="HIGH" OR severity="CRITICAL"'
gcloud policy-intelligence lint-policy \
--policy-file=policy.json \
--resource="//cloudresourcemanager.googleapis.com/projects/PROJECT"
gcloud iam recommender recommendations list \
--project=PROJECT --location=global--version-id v1 | python policy_agent.py
- Amazon Comprehend Medical API access enabled
- HealthLake datastore created (FHIR R4)
- S3 bucket with 50 sample clinical notes (de-identified)
- Amazon Personalize dataset group created
- Macie enabled for PHI detection
- DynamoDB table for HIPAA audit logs (PK: accessId, SK: timestamp)
- Bedrock model access: Claude 3 Sonnet
- SNOMED CT and ICD-10 reference files in S3
import boto3, json
comprehend_medical = boto3.client(
"comprehendmedical")
s3 = boto3.client("s3")
def extract_medical_entities(note_text: str):
resp = comprehend_medical.detect_entities_v2(
Text=note_text)
entities = {
"conditions": [],
"medications": [],
"procedures": [],
"anatomy": [],
"test_results": []
}
for entity in resp["Entities"]:
category = entity["Category"]
entry = {
"text": entity["Text"],
"score": entity["Score"],
"type": entity["Type"],
"traits": [t["Name"]
for t in entity.get("Traits", [])],
"attributes": [
{"type": a["Type"],
"text": a["Text"],
"score": a["Score"]}
for a in entity.get("Attributes", [])
]
}
if category == "MEDICAL_CONDITION":
entities["conditions"].append(entry)
elif category == "MEDICATION":
entities["medications"].append(entry)
elif category == "PROCEDURE":
entities["procedures"].append(entry)
elif category == "ANATOMY":
entities["anatomy"].append(entry)
elif category == "TEST_TREATMENT_PROCEDURE":
entities["test_results"].append(entry)
return entitiesdef detect_and_redact_phi(note_text: str):
resp = comprehend_medical.detect_phi(
Text=note_text)
phi_entities = resp["Entities"]
redacted = note_text
# Sort by offset descending to preserve positions
sorted_phi = sorted(phi_entities,
key=lambda e: e["BeginOffset"],
reverse=True)
phi_log = []
for entity in sorted_phi:
start = entity["BeginOffset"]
end = entity["EndOffset"]
phi_type = entity["Type"]
original = note_text[start:end]
# Replace with type tag
replacement = f"[{phi_type}_REDACTED]"
redacted = (redacted[:start]
+ replacement + redacted[end:])
phi_log.append({
"type": phi_type,
"score": entity["Score"],
"offset": start,
"length": end - start
})
return {
"redacted_text": redacted,
"phi_count": len(phi_entities),
"phi_types": list(set(
e["Type"] for e in phi_entities)),
"phi_log": phi_log
}healthlake = boto3.client("healthlake")
def to_fhir_condition(entity: dict,
patient_id: str) -> dict:
return {
"resourceType": "Condition",
"subject": {
"reference": f"Patient/{patient_id}"
},
"code": {
"coding": [{
"system":
"http://snomed.info/sct",
"code": lookup_snomed(
entity["text"]),
"display": entity["text"]
}],
"text": entity["text"]
},
"clinicalStatus": {
"coding": [{
"system": "http://terminology"
".hl7.org/CodeSystem/"
"condition-clinical",
"code": "active"
}]
},
"verificationStatus": {
"coding": [{
"system": "http://terminology"
".hl7.org/CodeSystem/"
"condition-ver-status",
"code": "confirmed"
if entity["score"] > 0.9
else "provisional"
}]
}
}
def store_in_healthlake(resource: dict,
datastore_id: str):
resp = healthlake.create_resource(
DatastoreId=datastore_id,
ResourceType=resource["resourceType"],
ResourceBody=json.dumps(resource))
return resp["ResourceId"]def clinical_summary(patient_id: str,
redacted_note: str,
fhir_conditions: list) -> str:
conditions_text = json.dumps(
fhir_conditions[:10], indent=2)
prompt = f"""You are a clinical documentation
assistant. Generate a structured clinical summary.
Redacted clinical note:
{redacted_note[:4000]}
FHIR Conditions on file:
{conditions_text}
Requirements:
1. Summarize in 3-5 sentences
2. Reference SNOMED codes for all conditions
3. List active medications with dosages
4. Note any drug interactions
5. Flag conditions needing follow-up
6. Use professional clinical language
7. NEVER attempt to diagnose - only summarize
Format as structured clinical note."""
resp = bedrock.invoke_model(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 2048,
"messages": [{"role": "user",
"content": prompt}]}))
return json.loads(
resp["body"].read()
)["content"][0]["text"]import uuid
from datetime import datetime
audit_table = dynamodb.Table("HIPAAAuditLog")
def log_access(user_id: str, resource_type: str,
resource_id: str, action: str,
purpose: str, phi_accessed: bool):
audit_table.put_item(Item={
"accessId": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat(),
"userId": user_id,
"resourceType": resource_type,
"resourceId": resource_id,
"action": action,
"purpose": purpose,
"phiAccessed": phi_accessed,
"sourceIp": get_client_ip(),
"sessionId": get_session_id(),
"ttl": int((datetime.utcnow()
+ timedelta(days=2555)).timestamp())
})
# Decorator for automatic audit logging
def hipaa_audited(resource_type):
def decorator(func):
def wrapper(*args, **kwargs):
log_access(
get_current_user(),
resource_type,
kwargs.get("resource_id", "N/A"),
func.__name__,
kwargs.get("purpose", "clinical"),
True)
return func(*args, **kwargs)
return wrapper
return decorator
@hipaa_audited("Condition")
def get_patient_conditions(resource_id,
purpose="treatment"):
return healthlake.read_resource(...)personalize = boto3.client("personalize")
personalize_runtime = boto3.client(
"personalize-runtime")
# Get recommendations
def get_recommendations(user_id: str,
num_results: int = 5) -> list:
resp = personalize_runtime \
.get_recommendations(
campaignArn=os.environ[
"PERSONALIZE_CAMPAIGN_ARN"],
userId=user_id,
numResults=num_results)
return resp["itemList"]
# Generate narrative with Bedrock
def personalized_narrative(user_id: str,
user_context: dict) -> str:
recs = get_recommendations(user_id)
items = [get_product_details(r["itemId"])
for r in recs]
prompt = f"""Generate a personalized product
recommendation for this user.
User profile: {json.dumps(user_context)}
Recommended products:
{json.dumps(items, indent=2)}
Write a warm, conversational 3-paragraph
recommendation that:
1. References the user's recent activity
2. Explains why each product fits
3. Suggests a bundle deal if applicable"""
return invoke_bedrock(prompt)kinesis = boto3.client("kinesis")
def process_supply_chain_event(record):
event = json.loads(record["Data"])
event_type = event["type"]
severity_map = {
"shipment_delayed": "MEDIUM",
"supplier_outage": "HIGH",
"port_closure": "CRITICAL",
"quality_recall": "CRITICAL",
"price_spike": "LOW"
}
severity = severity_map.get(
event_type, "MEDIUM")
if severity in ("HIGH", "CRITICAL"):
prompt = f"""Supply chain disruption:
Type: {event_type}
Supplier: {event["supplier"]}
Product: {event["product"]}
Region: {event["region"]}
ETA Impact: {event["eta_impact_days"]} days
From the knowledge base, suggest:
1. Alternative suppliers for this product
2. Estimated lead time from each alternative
3. Price impact estimate
4. Risk of switching suppliers"""
analysis = invoke_bedrock(prompt)
send_alert(severity, event, analysis)
return {"severity": severity,
"processed": True}import csv
def load_snomed_index(s3_path: str) -> dict:
"""Load SNOMED CT concept file into lookup"""
obj = s3.get_object(
Bucket="ontology-bucket",
Key=s3_path)
reader = csv.DictReader(
obj["Body"].read().decode().splitlines(),
delimiter="\t")
return {row["conceptId"]: row["term"]
for row in reader
if row["active"] == "1"}
SNOMED_INDEX = load_snomed_index(
"snomed/sct2_Description.txt")
def validate_medical_codes(summary: str) -> dict:
import re
codes = re.findall(
r'SNOMED:\s*(\d+)', summary)
icd_codes = re.findall(
r'ICD-10:\s*([A-Z]\d{2}\.?\d*)',
summary)
invalid_snomed = [c for c in codes
if c not in SNOMED_INDEX]
valid_snomed = [c for c in codes
if c in SNOMED_INDEX]
return {
"valid_snomed": len(valid_snomed),
"invalid_snomed": invalid_snomed,
"icd_codes_found": len(icd_codes),
"all_valid": len(invalid_snomed) == 0
}def generate_patient_summary(patient_id: str):
# Fetch all FHIR resources for patient
conditions = healthlake_query(
f"Condition?subject=Patient/{patient_id}"
f"&clinical-status=active")
meds = healthlake_query(
f"MedicationStatement?"
f"subject=Patient/{patient_id}"
f"&status=active")
allergies = healthlake_query(
f"AllergyIntolerance?"
f"subject=Patient/{patient_id}")
procedures = healthlake_query(
f"Procedure?subject=Patient/{patient_id}"
f"&date=ge2024-01-01")
prompt = f"""Generate a one-page patient summary.
Active Conditions: {json.dumps(conditions[:10])}
Current Medications: {json.dumps(meds[:10])}
Allergies: {json.dumps(allergies)}
Recent Procedures: {json.dumps(procedures[:5])}
Format as:
PATIENT SUMMARY
===============
Active Problems: [bulleted list with SNOMED]
Medications: [name, dose, frequency]
Allergies: [substance, reaction, severity]
Recent Procedures: [date, type, outcome]
Care Plan: [recommended next steps]
Include drug-allergy interaction warnings."""
summary = invoke_bedrock(prompt)
log_access(get_current_user(), "Patient",
patient_id, "generate_summary",
"treatment", True)
return summaryHIPAA_CONTROLS = {
"164.312(a)(1)": {
"name": "Access Control",
"checks": [
("unique_user_ids",
"Every user has unique ID"),
("emergency_access",
"Emergency access procedure exists"),
("auto_logoff",
"Session timeout <= 15 minutes"),
("encryption_at_rest",
"PHI encrypted at rest with AES-256")
]
},
"164.312(b)": {
"name": "Audit Controls",
"checks": [
("audit_logging",
"All PHI access logged"),
("audit_retention",
"Logs retained >= 6 years"),
("audit_review",
"Weekly audit log review")
]
},
"164.312(c)(1)": {
"name": "Integrity",
"checks": [
("data_integrity",
"PHI integrity verified on access"),
("tamper_detection",
"Unauthorized changes detected")
]
}
}
def run_hipaa_validation() -> dict:
results = {}
for control_id, control in \
HIPAA_CONTROLS.items():
checks = []
for check_id, desc in control["checks"]:
passed = run_check(check_id)
checks.append({
"check": check_id,
"description": desc,
"passed": passed})
results[control_id] = {
"name": control["name"],
"checks": checks,
"compliant": all(
c["passed"] for c in checks)
}
return resultsINDUSTRY_CONFIGS = {
"healthcare": {
"ontology": "snomed_ct",
"compliance": "hipaa",
"phi_detection": True,
"audit_retention_days": 2555,
"entity_extractor": "comprehend_medical",
"output_format": "fhir_r4"
},
"retail": {
"ontology": None,
"compliance": "pci_dss",
"phi_detection": False,
"audit_retention_days": 365,
"entity_extractor": "comprehend",
"output_format": "json"
},
"supply_chain": {
"ontology": "gs1",
"compliance": "sox",
"phi_detection": False,
"audit_retention_days": 2555,
"entity_extractor": "comprehend",
"output_format": "json"
}
}
class IndustryAgent:
def __init__(self, industry: str):
self.config = INDUSTRY_CONFIGS[industry]
self.industry = industry
def process(self, input_data: dict):
if self.config["phi_detection"]:
input_data = self.redact_phi(input_data)
entities = self.extract_entities(input_data)
self.audit_log(input_data, entities)
return self.generate_output(entities)aws cloudwatch put-dashboard \
--dashboard-name "AgentForge-Industry" \
--dashboard-body '{
"widgets": [
{"type":"metric","properties":{
"title":"Notes Processed / Day",
"metrics":[
["AgentForge/Industry",
"NotesProcessed"]
],"period":86400}},
{"type":"metric","properties":{
"title":"PHI Detection Rate",
"metrics":[
["AgentForge/Industry",
"PHIDetected"],
["AgentForge/Industry",
"PHIMissed"]
]}},
{"type":"metric","properties":{
"title":"FHIR Mapping Success %",
"metrics":[
["AgentForge/Industry",
"FHIRMappingSuccess"]
]}},
{"type":"metric","properties":{
"title":"HIPAA Compliance Score",
"metrics":[
["AgentForge/Industry",
"ComplianceScore"]
],"period":86400}}
]}'Azure Implementation Path
Replace Comprehend Medical with Azure AI Health Text Analytics for medical NER and entity linking. FHIR storage via Azure Health Data Services (FHIR R4 server). Clinical documentation via Nuance DAX Copilot for ambient listening and note generation. PHI detection via Azure AI Language PII detection (healthcare category). Retail personalization via Azure AI Personalizer. Supply chain via Dynamics 365 Supply Chain Management + Copilot. HIPAA compliance managed through Azure Compliance Manager.
from azure.ai.textanalytics import TextAnalyticsClient
client = TextAnalyticsClient(
endpoint="https://agentforge.cognitiveservices.azure.com",
credential=credential)
result = client.begin_analyze_healthcare_entities(
[clinical_note_text])
for entity in result[0].entities:
print(f"{entity.text}: {entity.category} "
f"({entity.confidence_score:.2f})")
for link in entity.data_sources:
print(f" -> {link.entity_id} ({link.name})")GCP Implementation Path
Replace Comprehend Medical with Vertex AI Healthcare NLP API (Healthcare Natural Language API) for medical entity extraction and relationship detection. FHIR storage via Cloud Healthcare API (FHIR R4 store). Clinical reasoning via Vertex AI (Gemini Pro) with Med-PaLM grounding. PHI detection via Cloud DLP with healthcare infoTypes. Retail personalization via Vertex AI Recommendations. Supply chain via Supply Chain Twin on GCP.
from google.cloud import healthcare_v1
client = healthcare_v1.FhirServiceClient()
fhir_store = (f"projects/{PROJECT}/locations/{LOCATION}"
f"/datasets/{DATASET}/fhirStores/{FHIR_STORE}")
# Create FHIR Condition resource
response = client.create_resource(
parent=fhir_store,
type="Condition",
body=json.dumps(fhir_condition))
# NLP analysis via Healthcare NLP
nlp_client = healthcare_v1.CloudHealthcareNlpServiceClient()
result = nlp_client.analyze_entities(
nlp_service=f"projects/{PROJECT}/locations/{LOCATION}"
f"/services/nlp",
document_content=clinical_note_text)- CloudWatch configured with detailed monitoring on target resources
- EventBridge rules enabled in the ops account
- EKS cluster with Prometheus + AlertManager deployed
- IAM role with SSM Automation, EC2, EKS, Cost Explorer permissions
- AWS FIS experiment templates created for chaos testing
- Bedrock model access: Claude 3 Sonnet
- Slack webhook for ops alerts; Confluence API token for post-incident docs
# Create anomaly detection alarm
aws cloudwatch put-metric-alarm \
--alarm-name "AgentForge-CPU-Anomaly" \
--metrics '[
{"Id":"m1","MetricStat":{
"Metric":{"Namespace":"AWS/EC2",
"MetricName":"CPUUtilization",
"Dimensions":[{"Name":"AutoScalingGroupName",
"Value":"agentforge-asg"}]},
"Period":300,"Stat":"Average"}},
{"Id":"ad1","Expression":
"ANOMALY_DETECTION_BAND(m1, 2)"}
]' \
--comparison-operator \
LessThanLowerOrGreaterThanUpperThreshold \
--threshold-metric-id "ad1" \
--evaluation-periods 3 \
--alarm-actions \
"arn:aws:sns:us-east-1:ACCOUNT:OpsAlerts" \
--treat-missing-data notBreaching
# Composite alarm: CPU AND error rate
aws cloudwatch put-composite-alarm \
--alarm-name "AgentForge-Critical-Composite" \
--alarm-rule 'ALARM("AgentForge-CPU-Anomaly") AND ALARM("AgentForge-ErrorRate-High")'# EventBridge rule
aws events put-rule \
--name "CW-Alarm-To-Agent" \
--event-pattern '{
"source": ["aws.cloudwatch"],
"detail-type": ["CloudWatch Alarm State Change"],
"detail": {
"state": {"value": ["ALARM"]}
}
}' --state ENABLED
# Lambda enrichment handler
def enrich_alarm(event, context):
alarm = event["detail"]
alarm_name = alarm["alarmName"]
metric = alarm["configuration"]["metrics"][0]
# Fetch recent metric data
cw = boto3.client("cloudwatch")
data = cw.get_metric_data(
MetricDataQueries=[{
"Id": "m1",
"MetricStat": metric["metricStat"],
"ReturnData": True
}],
StartTime=datetime.utcnow()
- timedelta(hours=1),
EndTime=datetime.utcnow())
return {
"alarm_name": alarm_name,
"current_state": alarm["state"]["value"],
"reason": alarm["state"]["reason"],
"metric_values": data["MetricDataResults"],
"timestamp": alarm["state"]["timestamp"]
}RUNBOOKS = {
"high_cpu": "AWS-RestartEC2Instance",
"high_memory": "Custom-IncreaseASGCapacity",
"disk_full": "Custom-CleanupDiskSpace",
"error_rate": "Custom-RollbackDeployment",
"oom_killed": "Custom-PatchK8sMemoryLimits",
"connection_timeout": "Custom-RestartService"
}
def select_and_execute_runbook(enriched_alarm):
prompt = f"""You are an SRE agent. Analyze this
alarm and select the best remediation runbook.
Alarm: {json.dumps(enriched_alarm, indent=2)}
Available runbooks:
{json.dumps(RUNBOOKS, indent=2)}
Return JSON:
{{"runbook": "runbook_name",
"parameters": {{}},
"confidence": 0.0-1.0,
"reasoning": "why this runbook",
"requires_approval": true/false}}"""
decision = invoke_bedrock(prompt)
parsed = json.loads(decision)
if parsed["confidence"] > 0.85 \
and not parsed["requires_approval"]:
execute_runbook(
parsed["runbook"],
parsed["parameters"])
else:
request_approval(parsed)
return parsedce = boto3.client("ce")
def analyze_costs():
now = datetime.utcnow()
current_month = now.strftime("%Y-%m-01")
prev_month = (now - timedelta(days=30)) \
.strftime("%Y-%m-01")
resp = ce.get_cost_and_usage(
TimePeriod={
"Start": prev_month,
"End": current_month},
Granularity="MONTHLY",
Metrics=["BlendedCost"],
GroupBy=[{"Type": "DIMENSION",
"Key": "SERVICE"}])
# Compare with previous month
current = resp["ResultsByTime"][-1]["Groups"]
previous = resp["ResultsByTime"][0]["Groups"]
spikes = []
for svc in current:
name = svc["Keys"][0]
curr_cost = float(
svc["Metrics"]["BlendedCost"]["Amount"])
prev_svc = next(
(p for p in previous
if p["Keys"][0] == name), None)
prev_cost = float(
prev_svc["Metrics"]["BlendedCost"]
["Amount"]) if prev_svc else 0
if prev_cost > 0:
change_pct = ((curr_cost - prev_cost)
/ prev_cost) * 100
if change_pct > 20:
spikes.append({
"service": name,
"current": curr_cost,
"previous": prev_cost,
"change_pct": round(change_pct, 1)
})
if spikes:
return get_rightsizing_recs(spikes)
return {"status": "no_anomalies"}def get_rightsizing_recs(spikes):
# Fetch EC2 rightsizing from Cost Explorer
rs = ce.get_rightsizing_recommendation(
Service="AmazonEC2",
Configuration={
"RecommendationTarget": "SAME_INSTANCE_FAMILY",
"BenefitsConsidered": True})
prompt = f"""You are a FinOps analyst.
Cost spikes detected:
{json.dumps(spikes, indent=2)}
AWS Rightsizing recommendations:
{json.dumps(rs["RightsizingRecommendations"][:10],
indent=2)}
For each spike:
1. Root cause analysis
2. Specific rightsizing action
3. Estimated monthly savings (USD)
4. Implementation risk (LOW/MEDIUM/HIGH)
5. Recommended timeline
Also suggest:
- Reserved Instance opportunities
- Savings Plan coverage gaps
- Spot Instance candidates"""
return invoke_bedrock(prompt)import subprocess, json, yaml
def handle_oom_alert(alert):
namespace = alert["labels"]["namespace"]
pod = alert["labels"]["pod"]
container = alert["labels"]["container"]
# Get current deployment manifest
deploy_name = pod.rsplit("-", 2)[0]
manifest = json.loads(subprocess.run(
["kubectl", "get", "deployment",
deploy_name, "-n", namespace,
"-o", "json"],
capture_output=True, text=True).stdout)
# Get actual memory usage from Prometheus
current_limit = get_container_limit(
manifest, container)
prompt = f"""EKS pod OOMKilled.
Deployment: {deploy_name}
Container: {container}
Current memory limit: {current_limit}
Pod restarts (24h): {alert["annotations"]["restarts"]}
Calculate a safe memory limit:
1. Current limit + 50% headroom
2. Never exceed node capacity
3. Consider other pods on the node
Return JSON:
{{"new_limit": "512Mi",
"new_request": "256Mi",
"reasoning": "..."}}"""
rec = json.loads(invoke_bedrock(prompt))
# Patch the deployment
patch = {
"spec": {"template": {"spec": {
"containers": [{
"name": container,
"resources": {
"limits": {
"memory": rec["new_limit"]},
"requests": {
"memory": rec["new_request"]}
}}]}}}
}
subprocess.run([
"kubectl", "patch", "deployment",
deploy_name, "-n", namespace,
"--type", "strategic",
"-p", json.dumps(patch)])
return recfis = boto3.client("fis")
def run_chaos_experiment(template_id: str):
# Start FIS experiment
experiment = fis.start_experiment(
experimentTemplateId=template_id,
tags={"agentforge": "chaos-test"})
exp_id = experiment["experiment"]["id"]
# Monitor during experiment
import time
for i in range(12): # 60 seconds
time.sleep(5)
status = fis.get_experiment(
id=exp_id)["experiment"]["state"]
# Check system health
health = check_system_health()
if not health["healthy"]:
prompt = f"""Chaos experiment running.
Experiment: {template_id}
System health: {json.dumps(health)}
Duration: {i * 5} seconds
The system is degraded. Recommend:
1. Should we stop the experiment?
2. What compensating action is needed?
3. Is this an expected degradation?
Return JSON with action recommendation."""
decision = json.loads(
invoke_bedrock(prompt))
if decision.get("stop_experiment"):
fis.stop_experiment(id=exp_id)
execute_compensation(
decision["compensating_action"])
break
return get_experiment_report(exp_id)def generate_post_incident(incident: dict):
prompt = f"""Generate a post-incident report.
Incident: {json.dumps(incident, indent=2)}
Format:
# Post-Incident Report
## Summary
[1-2 sentences]
## Timeline
[Chronological events with timestamps]
## Root Cause
[Technical root cause analysis]
## Impact
[Users/services affected, duration]
## Resolution
[What fixed it, who was involved]
## Action Items
[Numbered list with owners and deadlines]
## Lessons Learned
[What went well, what didn't]"""
report = invoke_bedrock(prompt)
# Push to Confluence
requests.post(
f"{os.environ['CONFLUENCE_URL']}"
f"/rest/api/content",
headers={
"Authorization":
f"Bearer {os.environ['CONFLUENCE_TOKEN']}",
"Content-Type": "application/json"},
json={
"type": "page",
"title": f"Incident Report - "
f"{incident['id']} - "
f"{datetime.utcnow().strftime('%Y-%m-%d')}",
"space": {"key": "INCIDENTS"},
"body": {"storage": {
"value": report,
"representation": "wiki"
}}
})
return reportdef normalize_costs():
# AWS Cost Explorer
aws_costs = ce.get_cost_and_usage(...)
# Azure Cost Management (via REST API)
azure_costs = requests.get(
f"https://management.azure.com/"
f"subscriptions/{SUB_ID}/providers/"
f"Microsoft.CostManagement/query",
headers={"Authorization":
f"Bearer {azure_token}"},
json={"type": "ActualCost",
"timeframe": "MonthToDate",
"dataset": {"granularity": "Monthly",
"aggregation": {
"totalCost": {
"name": "Cost",
"function": "Sum"}
}}}).json()
# Unified schema
unified = {
"period": "2025-03",
"clouds": {
"aws": {"total": aws_total,
"by_service": aws_breakdown},
"azure": {"total": azure_total,
"by_service": azure_breakdown},
"gcp": {"total": gcp_total,
"by_service": gcp_breakdown}
},
"equivalent_services": [
{"category": "compute",
"aws": "EC2", "azure": "VMs",
"gcp": "GCE",
"cheapest": "calculate..."}
]
}
return unifieddef analyze_ri_opportunities():
# Get RI recommendations from Cost Explorer
ri_recs = ce.get_reservation_purchase_recommendation(
Service="Amazon Elastic Compute Cloud - Compute",
LookbackPeriodInDays="NINETY_DAYS",
TermInYears="ONE_YEAR",
PaymentOption="NO_UPFRONT")
prompt = f"""Analyze these RI purchase opportunities:
{json.dumps(ri_recs["Recommendations"][:10], indent=2)}
For each recommendation:
1. Break-even point (months)
2. Monthly savings vs On-Demand
3. Annual savings
4. Risk assessment (what if usage drops?)
5. Recommendation: BUY/SKIP/WAIT
Also consider:
- Savings Plans as alternative to RIs
- Convertible vs Standard RIs
- Payment option trade-offs
Return structured JSON with BUY recommendations
sorted by annual savings descending."""
return invoke_bedrock(prompt)autoscaling = boto3.client("autoscaling")
def tune_scaling_policies(asg_name: str):
# Get scaling history
activities = autoscaling \
.describe_scaling_activities(
AutoScalingGroupName=asg_name,
MaxRecords=100)
# Get current policies
policies = autoscaling \
.describe_policies(
AutoScalingGroupName=asg_name)
# Get CloudWatch metrics for the ASG
metrics = get_asg_metrics(asg_name, days=14)
prompt = f"""Analyze this Auto Scaling Group.
ASG: {asg_name}
Current policies:
{json.dumps(policies["ScalingPolicies"], indent=2)}
Scaling events (last 100):
{json.dumps([{
"time": str(a["StartTime"]),
"cause": a["Cause"][:100],
"status": a["StatusCode"]}
for a in activities["Activities"][:20]],
indent=2)}
14-day metrics summary:
{json.dumps(metrics)}
Recommend policy adjustments:
1. Optimal min/max/desired capacity
2. Scale-out threshold and cooldown
3. Scale-in threshold and cooldown
4. Predictive scaling opportunity
5. Estimated cost impact of changes"""
return invoke_bedrock(prompt)aws cloudwatch put-dashboard \
--dashboard-name "AgentForge-InfraOps" \
--dashboard-body '{
"widgets": [
{"type":"metric","properties":{
"title":"Auto-Remediation Success Rate",
"metrics":[
["AgentForge/Ops","RemediationSuccess"],
["AgentForge/Ops","RemediationFailed"]
]}},
{"type":"metric","properties":{
"title":"Mean Time to Remediate (min)",
"metrics":[
["AgentForge/Ops","MTTR"]
],"stat":"Average"}},
{"type":"metric","properties":{
"title":"Monthly Cost Savings (USD)",
"metrics":[
["AgentForge/Ops","CostSavings"]
],"period":2592000}},
{"type":"metric","properties":{
"title":"Chaos Experiment Pass Rate",
"metrics":[
["AgentForge/Ops","ChaosPassRate"]
]}}
]}'
aws cloudwatch put-metric-alarm \
--alarm-name "Ops-MTTR-SLA-Breach" \
--metric-name "MTTR" \
--namespace "AgentForge/Ops" \
--statistic Average --period 3600 \
--threshold 30 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 2 \
--alarm-actions \
"arn:aws:sns:us-east-1:ACCOUNT:OpsAlerts"Azure Implementation Path
Replace CloudWatch with Azure Monitor + Azure Automanage for infrastructure monitoring and auto-remediation. Alarms via Azure Monitor Alerts with dynamic thresholds. Event routing via Event Grid → Azure Functions. Runbook execution via Azure Automation Runbooks (PowerShell/Python). FinOps via Azure Cost Management + Azure Advisor for rightsizing. EKS equivalent: AKS with Azure Monitor for containers. Chaos engineering via Azure Chaos Studio. Post-incident reports via Azure DevOps Wiki.
az monitor metrics alert create \
--name "CPU-Anomaly" \
--resource-group agentforge-rg \
--scopes "/subscriptions/SUB_ID/resourceGroups/agentforge-rg" \
--condition "avg Percentage CPU > dynamic HighSensitivity" \
--action "/subscriptions/SUB_ID/resourceGroups/agentforge-rg/providers/microsoft.insights/actionGroups/OpsTeam"
az advisor recommendation list \
--category Cost \
--output tableGCP Implementation Path
Replace CloudWatch with Cloud Monitoring + Cloud Operations Suite. Alarms via Cloud Alerting with MQL queries. Event routing via Eventarc → Cloud Functions. Runbook execution via Cloud Workflows + OS Config for VM patching. FinOps via Cloud Billing API + Recommender for rightsizing. EKS equivalent: GKE with GKE AutoOps (auto-repair, auto-upgrade). Chaos engineering via Chaos Toolkit on GKE. Cost anomaly detection via Cloud Billing Budget Alerts.
gcloud monitoring policies create \
--policy-from-file=cpu-anomaly-policy.json
gcloud recommender recommendations list \
--project=PROJECT \
--recommender=google.compute.instance.MachineTypeRecommender \
--location=us-central1 \
--format="table(name, priority, content.overview)"
gcloud container clusters update agentforge-gke \
--enable-autorepair --enable-autoupgrade \
--location=us-central1--time-period Start=2025-01-01,End=2025-02-01 \
--granularity MONTHLY --metrics BlendedCost
- Python 3.11+, pip install langchain langgraph crewai autogen-agentchat litellm
- Docker Desktop installed and running
- Terraform 1.6+ with AWS, Azure, and GCP provider credentials
- Bedrock, Azure OpenAI, and Vertex AI API keys configured
- GitHub repo for the multi-agent project
- Container registry access (ECR, ACR, or GCR)
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal
class AgentState(TypedDict):
messages: list
next_agent: str
final_answer: str
def supervisor(state: AgentState) -> AgentState:
last_msg = state["messages"][-1]["content"]
# Route based on intent classification
prompt = f"""Classify this request into one of:
- research: needs information retrieval
- code: needs code generation
- write: needs content writing
Route: {last_msg}
Return ONLY the category name."""
route = llm.invoke(prompt).content.strip()
return {**state, "next_agent": route}
def route_fn(state: AgentState) -> Literal[
"research", "code", "write", "end"]:
return state["next_agent"]
graph = StateGraph(AgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("research", research_agent)
graph.add_node("code", code_agent)
graph.add_node("write", write_agent)
graph.set_entry_point("supervisor")
graph.add_conditional_edges(
"supervisor", route_fn,
{"research": "research",
"code": "code",
"write": "write",
"end": END})
# All agents return to supervisor for review
for agent in ["research", "code", "write"]:
graph.add_edge(agent, "supervisor")
app = graph.compile()from langchain.tools import tool
from langchain_community.chat_models import (
ChatLiteLLM)
llm = ChatLiteLLM(model="bedrock/anthropic.claude-3-sonnet-20240229-v1:0")
@tool
def search_knowledge_base(query: str) -> str:
"""Search the knowledge base for information."""
results = vector_store.similarity_search(
query, k=5)
return "\n\n".join(
[r.page_content for r in results])
@tool
def generate_code(spec: str) -> str:
"""Generate code based on a specification."""
resp = llm.invoke(
f"Generate production Python code for: "
f"{spec}\nInclude type hints and docstrings.")
return resp.content
@tool
def write_document(outline: str) -> str:
"""Write a structured document from an outline."""
resp = llm.invoke(
f"Write a professional document: {outline}")
return resp.content
def research_agent(state: AgentState):
result = search_knowledge_base.invoke(
state["messages"][-1]["content"])
state["messages"].append(
{"role": "assistant",
"content": f"Research result: {result}"})
return stateimport litellm
# Configure provider routing
litellm.set_verbose = False
PROVIDER_CONFIG = {
"primary": {
"model": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"timeout": 30,
"max_retries": 2
},
"fallback_1": {
"model": "azure/gpt-4",
"api_key": os.environ["AZURE_API_KEY"],
"api_base": os.environ["AZURE_ENDPOINT"],
"timeout": 30
},
"fallback_2": {
"model": "vertex_ai/gemini-pro",
"timeout": 30
}
}
def invoke_with_fallback(messages: list) -> str:
for name, config in PROVIDER_CONFIG.items():
try:
resp = litellm.completion(
model=config["model"],
messages=messages,
timeout=config.get("timeout", 30),
max_retries=config.get(
"max_retries", 1))
return resp.choices[0].message.content
except Exception as e:
print(f"{name} failed: {e}")
continue
raise RuntimeError("All providers failed")from crewai import Agent, Task, Crew, Process
pm = Agent(
role="Product Manager",
goal="Define product requirements and "
"prioritize features based on market data",
backstory="10 years PM experience at "
"enterprise SaaS companies",
llm=llm, verbose=True, max_iter=5)
researcher = Agent(
role="Market Researcher",
goal="Gather competitive intelligence and "
"market sizing data",
backstory="Former Gartner analyst specializing "
"in AI/ML markets",
llm=llm, verbose=True, max_iter=5)
copywriter = Agent(
role="Copywriter",
goal="Write compelling product narratives "
"and marketing copy",
backstory="Award-winning B2B SaaS copywriter",
llm=llm, verbose=True, max_iter=3)
tech_writer = Agent(
role="Technical Writer",
goal="Create clear technical documentation "
"and architecture guides",
backstory="Senior tech writer with cloud "
"architecture background",
llm=llm, verbose=True, max_iter=3)
task = Task(
description="Create a product launch brief "
"for a new AI monitoring SaaS product",
expected_output="Complete launch brief with "
"market analysis, positioning, copy, "
"and technical overview",
agents=[pm, researcher, copywriter, tech_writer])
crew = Crew(
agents=[pm, researcher, copywriter, tech_writer],
tasks=[task],
process=Process.sequential,
verbose=True)
result = crew.kickoff()from autogen import (
AssistantAgent, UserProxyAgent, GroupChat,
GroupChatManager)
config_list = [{
"model": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"api_type": "litellm"
}]
security_auditor = AssistantAgent(
name="SecurityAuditor",
system_message="You are a security expert. "
"Scan code for vulnerabilities, "
"classify severity, and recommend fixes.",
llm_config={"config_list": config_list})
code_fixer = AssistantAgent(
name="CodeFixer",
system_message="You fix security issues. "
"Generate patched code with explanations. "
"Always preserve existing functionality.",
llm_config={"config_list": config_list})
deployer = AssistantAgent(
name="DeploymentManager",
system_message="You manage deployments. "
"Rebuild Docker images, run tests, "
"and deploy fixes.",
llm_config={"config_list": config_list})
human = UserProxyAgent(
name="HumanProxy",
human_input_mode="TERMINATE",
max_consecutive_auto_reply=0,
code_execution_config=False)
groupchat = GroupChat(
agents=[security_auditor, code_fixer,
deployer, human],
messages=[], max_round=12)
manager = GroupChatManager(
groupchat=groupchat,
llm_config={"config_list": config_list})
human.initiate_chat(manager,
message="Scan app.py for vulnerabilities "
"and fix any CRITICAL issues found.")# Dockerfile
FROM python:3.11-slim AS builder
WORKDIR /build
COPY requirements.txt .
RUN pip install --no-cache-dir \
--prefix=/install -r requirements.txt
FROM python:3.11-slim AS runtime
WORKDIR /app
# Copy only installed packages
COPY --from=builder /install /usr/local
# Copy application code
COPY src/ ./src/
COPY config/ ./config/
# Non-root user for security
RUN useradd -r -s /bin/false agentuser
USER agentuser
# Health check
HEALTHCHECK --interval=30s --timeout=5s \
CMD python -c "import requests; \
requests.get('http://localhost:8080/health')"
ENV PYTHONUNBUFFERED=1
EXPOSE 8080
CMD ["python", "-m", "src.main"]# modules/ecs/main.tf
resource "aws_ecs_service" "agent" {
name = "agentforge-agent"
cluster = aws_ecs_cluster.main.id
task_definition = aws_ecs_task_definition.agent.arn
desired_count = 2
launch_type = "FARGATE"
network_configuration {
subnets = var.private_subnets
security_groups = [aws_security_group.agent.id]
}
}
# modules/cloud-run/main.tf
resource "google_cloud_run_service" "agent" {
name = "agentforge-agent"
location = var.region
template {
spec {
containers {
image = var.container_image
resources {
limits = {
memory = "512Mi"
cpu = "1"
}
}
env {
name = "LLM_PROVIDER"
value = "vertex_ai"
}
}
}
}
}
# Deploy all three:
# terraform apply -target=module.ecs
# terraform apply -target=module.aks
# terraform apply -target=module.cloud_runimport time, json
PROVIDERS = {
"bedrock": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
"azure": "azure/gpt-4",
"vertex": "vertex_ai/gemini-pro"
}
TEST_PROMPTS = [
"Summarize the key features of Kubernetes",
"Write a Python function to parse JSON logs",
"Explain the CAP theorem in 3 sentences",
]
def benchmark_providers():
results = []
for prompt in TEST_PROMPTS:
for name, model in PROVIDERS.items():
start = time.time()
try:
resp = litellm.completion(
model=model,
messages=[{"role": "user",
"content": prompt}],
max_tokens=500)
latency = (time.time() - start) * 1000
results.append({
"provider": name,
"prompt": prompt[:50],
"latency_ms": round(latency),
"tokens_in": resp.usage.prompt_tokens,
"tokens_out": resp.usage.completion_tokens,
"cost": litellm.completion_cost(
completion_response=resp),
"success": True
})
except Exception as e:
results.append({
"provider": name,
"prompt": prompt[:50],
"error": str(e),
"success": False
})
return resultsimport redis, json
from datetime import datetime
r = redis.Redis(
host=os.environ.get("REDIS_HOST", "localhost"),
port=6379, decode_responses=True)
class SharedMemory:
def __init__(self, session_id: str):
self.session_id = session_id
self.prefix = f"agent:{session_id}"
def write(self, agent_name: str,
key: str, value: any):
r.hset(f"{self.prefix}:{agent_name}",
key, json.dumps(value))
r.expire(f"{self.prefix}:{agent_name}",
3600) # 1 hour TTL
def read(self, agent_name: str,
key: str) -> any:
val = r.hget(
f"{self.prefix}:{agent_name}", key)
return json.loads(val) if val else None
def read_all_agents(self) -> dict:
"""Read state from all agents"""
keys = r.keys(f"{self.prefix}:*")
state = {}
for k in keys:
agent = k.split(":")[-1]
state[agent] = {
f: json.loads(v)
for f, v in r.hgetall(k).items()}
return state
# Usage in agent:
mem = SharedMemory("session-123")
mem.write("researcher", "findings",
{"topic": "AI monitoring", "key_points": [...]})
# Other agents can read:
findings = mem.read("researcher", "findings")from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
SimpleSpanProcessor)
from opentelemetry.exporter.otlp.proto.grpc \
.trace_exporter import OTLPSpanExporter
provider = TracerProvider()
provider.add_span_processor(
SimpleSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("agentforge")
def traced_agent_call(agent_name, func):
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(
f"agent.{agent_name}") as span:
span.set_attribute(
"agent.name", agent_name)
span.set_attribute(
"agent.provider",
get_current_provider())
start = time.time()
result = func(*args, **kwargs)
span.set_attribute(
"agent.latency_ms",
(time.time() - start) * 1000)
span.set_attribute(
"agent.tokens",
result.get("tokens_used", 0))
span.set_attribute(
"agent.cost_usd",
result.get("cost", 0))
return result
return wrapper# .github/workflows/deploy-multicloud.yml
name: Multi-Cloud Deploy
on:
push:
branches: [main]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: |
docker build -t agentforge-agent:$GITHUB_SHA .
- name: Push to ECR
run: |
aws ecr get-login-password | docker login \
--username AWS --password-stdin $ECR_URI
docker tag agentforge-agent:$GITHUB_SHA \
$ECR_URI:$GITHUB_SHA
docker push $ECR_URI:$GITHUB_SHA
- name: Push to GCR
run: |
gcloud auth configure-docker
docker tag agentforge-agent:$GITHUB_SHA \
gcr.io/$GCP_PROJECT/agentforge:$GITHUB_SHA
docker push gcr.io/$GCP_PROJECT/agentforge:$GITHUB_SHA
deploy:
needs: build
strategy:
matrix:
cloud: [aws, azure, gcp]
runs-on: ubuntu-latest
steps:
- uses: hashicorp/setup-terraform@v3
- run: |
cd infrastructure/terraform/${{ matrix.cloud }}
terraform init
terraform apply -auto-approve \
-var="image_tag=$GITHUB_SHA"# Emit cross-cloud metrics
def emit_comparison_metrics(results: list):
for r in results:
cw.put_metric_data(
Namespace="AgentForge/CrossCloud",
MetricData=[
{"MetricName": "Latency",
"Value": r["latency_ms"],
"Unit": "Milliseconds",
"Dimensions": [
{"Name": "Provider",
"Value": r["provider"]}]},
{"MetricName": "CostPerQuery",
"Value": r["cost"],
"Unit": "None",
"Dimensions": [
{"Name": "Provider",
"Value": r["provider"]}]},
{"MetricName": "SuccessRate",
"Value": 1 if r["success"] else 0,
"Unit": "None",
"Dimensions": [
{"Name": "Provider",
"Value": r["provider"]}]}
])
# Smart routing based on metrics
def smart_route(task_type: str) -> str:
"""Route to cheapest provider meeting SLA"""
for provider in ["bedrock", "azure", "vertex"]:
p95 = get_p95_latency(provider)
success = get_success_rate(provider)
if p95 < 5000 and success > 0.99:
return provider
return "bedrock" # default fallbackAzure-Specific Deployment
Deploy the containerized multi-agent system to Azure Kubernetes Service (AKS) or Azure Container Apps (serverless). LLM backend via Azure OpenAI Service with managed GPT-4 deployments. Vector storage via Azure AI Search. Shared state via Azure Cache for Redis. Observability via Azure Monitor Application Insights with distributed tracing. CI/CD via Azure Pipelines → ACR → AKS.
resource "azurerm_kubernetes_cluster" "agent" {
name = "agentforge-aks"
location = var.location
resource_group_name = var.resource_group
dns_prefix = "agentforge"
default_node_pool {
name = "default"
node_count = 2
vm_size = "Standard_D2s_v3"
}
identity { type = "SystemAssigned" }
}GCP-Specific Deployment
Deploy to Cloud Run (serverless, scales to zero) or GKE Autopilot (managed Kubernetes). LLM backend via Vertex AI with Gemini Pro model endpoints. Vector storage via Vertex AI Vector Search (formerly Matching Engine). Shared state via Memorystore for Redis. Observability via Cloud Trace + Cloud Monitoring. CI/CD via Cloud Build → Artifact Registry → Cloud Run.
resource "google_cloud_run_v2_service" "agent" {
name = "agentforge-agent"
location = var.region
template {
containers {
image = "${var.artifact_registry}/agentforge:${var.image_tag}"
resources {
limits = { memory = "512Mi", cpu = "1" }
}
env { name = "LLM_PROVIDER" value = "vertex_ai" }
}
scaling { min_instance_count = 0 max_instance_count = 10 }
}
}graph.add_node("supervisor", supervisor_fn)
graph.add_conditional_edges("supervisor", route_fn)
LLMs alone are not enough. A language model can generate text but cannot take action — it cannot query a database, file a ticket, trigger a deployment, or remember a previous conversation. Agents bridge this gap by combining LLM reasoning with tool use, memory, and autonomous decision-making.
The cost of inaction is measurable. Manual triage of security alerts takes 30-45 minutes per incident. Customer support without intelligent routing leads to 3-5x longer resolution times. Data teams spend 60% of their time on repetitive query reformulation. AI agents eliminate these bottlenecks by reasoning about context and acting autonomously within defined guardrails.
Autonomous Task Agents — Execute multi-step workflows: classify documents, extract metadata, route to the correct system, and verify completion. Replace brittle ETL pipelines with adaptive reasoning.
Security & Compliance Agents — Monitor GuardDuty findings, correlate with CloudTrail logs, assess severity, and trigger automated remediation playbooks. Response time drops from hours to seconds.
Data & Analytics Agents — Translate natural language questions into SQL/API queries, retrieve results, generate visualizations, and narrate insights. Democratize data access across the organization.
DevOps & SRE Agents — Scan code for vulnerabilities, generate patches, create PRs, monitor deployments, and auto-remediate infrastructure drift. Shift security left without slowing development.
2. Build the reasoning core. Use a ReAct loop (Reason → Act → Observe) powered by LangGraph or a similar framework. The agent decides which tool to call based on the user's intent and accumulated context.
3. Design typed tools. Each tool is a function with a JSON Schema interface. Tools are atomic (one action each), validated (check outputs before returning), and guarded (max iteration limits prevent infinite loops).
4. Add retrieval (RAG). Embed your knowledge base into a vector store (FAISS, OpenSearch, Pinecone). The agent retrieves relevant context before generating a response — grounding it in facts, not hallucinations.
5. Integrate memory. Use AgentCore Memory for session persistence (checkpointer) and cross-session knowledge (store). Middleware hooks inject context before the LLM and save responses after.
6. Deploy with guardrails. Wrap the agent in AgentCore Runtime for managed scaling, observability, and safety. Add input/output filters, cost budgets, and human escalation thresholds.
✓ The task requires reasoning over unstructured inputs (natural language, documents, images)
✓ The workflow involves multiple conditional steps that change based on context
✓ Human experts spend >30% of their time on repetitive triage, classification, or routing
✓ The domain has a well-defined knowledge base that can ground the agent's responses
✓ You can define clear success metrics (accuracy, resolution time, escalation rate)
✓ There is a human-in-the-loop fallback for high-stakes decisions
Do NOT deploy agents when:
✗ A simple rule or regex can solve the problem reliably (don't over-engineer)
✗ The task requires guaranteed deterministic output (use traditional code instead)
✗ There is no ground-truth dataset to evaluate quality
✗ The cost of a wrong answer exceeds the cost of human processing
✗ Regulatory requirements prohibit automated decision-making in the domain
✗ You lack observability infrastructure to monitor agent behavior in production
aws configure. Clone the repository and run uv sync to install all dependencies. Create a .env file with your GROQ_API_KEY and optional HF_API_KEY.python 00_langgraph_agent.py to validate your environment. This runs the LangGraph agent locally with FAISS vector search over the FAQ dataset. Verify you get a coherent response about "roaming activation" before proceeding to cloud deployment.agentcore configure -e 01_agentcore_runtime.py to auto-generate bedrock_agentcore.yaml. This YAML defines your entrypoint handler, tool schemas, and runtime settings. Review the generated config — understand what each field controls before launching.agentcore launch --env GROQ_API_KEY=your_key. Test with agentcore invoke '{"prompt": "..."}'. Try different queries to exercise all three tools: search_faq, search_detailed_faq, and reformulate_query. Observe the ReAct reasoning chain in the logs.02_agentcore_memory.py — configure and deploy as before. Test session continuity by sending multiple messages with the same actor_id and thread_id. Verify the agent remembers context from earlier in the conversation. Test cross-session preference retrieval with a new thread_id.AgentCore Developer Guide — Official toolkit documentation
AgentCore Code Samples — Official AWS sample repository
Amazon Bedrock User Guide — Foundation models and APIs
Bedrock Knowledge Bases — Managed RAG service
Bedrock Guardrails — Content filtering and safety
Azure AI Search — Vector and hybrid search service
Azure AI Foundry — End-to-end AI development platform
Semantic Kernel — Microsoft's AI orchestration SDK
Azure Cosmos DB — Multi-model database for agent memory
Vertex AI Agent Builder — Build and deploy AI agents
Gemini API Reference — Google's multimodal AI models
BigQuery Documentation — Analytics and data warehouse
Firestore Documentation — Serverless document database
LangChain — LLM application development framework
CrewAI — Multi-agent role-playing framework
AutoGen — Microsoft's multi-agent conversation framework
FAISS — Facebook's efficient similarity search library
Sentence Transformers — State-of-the-art text embedding models
Groq API Docs — Ultra-fast LLM inference platform
Docker Documentation — Containerisation platform
GitHub Actions — CI/CD workflow automation
AWS X-Ray — Distributed tracing and observability
uv Package Manager — Ultra-fast Python dependency management
AgentForge GitHub Repository — Source code and examples
RAGAS Documentation — RAG evaluation framework
DeepLearning.AI Short Courses — Free AI agent courses
ReAct Paper (Yao et al.) — The foundational ReAct reasoning pattern