- 公開日
- 最終更新日
【Strands Agents】RSS情報からAWS環境を自動チェック
この記事を共有する

目次
はじめに
皆さんこんにちは!パーソル&サーバーワークスの小泉です。
今回は、AWS AI Agent Global Hackathonが開催されるということに触発されて自身で考えたAI Agentを構築・検証したので、備忘として記載します。
構成図
処理概要
- LambdaがAWSのRSS情報を取得し、Dynamo DBにURLを格納する
- LambdaがBedrock AgentCoreにデプロイされたStrands Agentを実行する
- Crawler AgentがDynamo DBからURLを取得し、Fetch MCPを使用し内容を取得する
- Explorer Agentが記事の内部リンクを抽出して内容を読み込む
- Analyst Agentが記事の要約・重要度・洞察を行い、記事の内容がAWS内の調査が必要なのかを判定しDynamo DBに値を格納する
- Ops Reporter Agentがコードを生成・実行(※)し、実行結果をAmazon SNSで管理者に通知する
※実行は読み取り専用のAPIに制限します
※Analyst Agentが生成した内容をCloud Frontで配信します
UI
実行結果
今回はあらかじめDynamo DBにAWS Lambda Python 3.9 End-of-SupportのURLを格納しました。
また、実行結果がわかりやすいようにStrands AgentをBedrock AgentCoreにデプロイせずローカルで実施しました。
エージェントが自身でコードを生成・修正して実行しているのがわかります。
無事にOps Reporter Agentがコードを生成・実行し、実行結果をAmazon SNSで管理者に通知することを確認できました!
が、再度実行するとOps Reporter Agentが生成したコードが誤っている状態で処理を終了したり、意図しない場面でSNS通知が来ることがあったのでAgentが生成するコード品質の向上、システムプロンプトの調整が必要だと感じました。
実行ファイル
# agent_fetcher.py # ------------------------------------------------------------ # 4-Agentワークフロー: # Stage1: DynamoDBからURL取得 → MCP "fetch" # Stage2: 内部リンク抽出 # Stage3: Bedrockモデルで要約/分類/重大度/メリット/洞察/参照元抽出 → DynamoDBへ保存 # Stage4: LLMがboto3コードを生成・実行して環境確認 → SNS通知 (運用報告レポート形式) # ------------------------------------------------------------ import os import json import re import uuid import time from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from typing import Any, Dict, List, Tuple from urllib.parse import urlparse import boto3 from boto3.dynamodb.conditions import Attr from pydantic import BaseModel, Field # Strands from strands import Agent, tool from strands.tools.mcp import MCPClient from mcp import stdio_client, StdioServerParameters # ========================= # 環境設定 # ========================= TABLE_NAME = os.environ.get("TABLE_NAME", "ArticleTable") AWS_REGION = os.environ.get("AWS_REGION", "us-west-2") SNS_TOPIC_ARN = "arn:aws:sns:us-west-2:<アカウントID>:NewsAgentNotifications" SESSIONS_DIR = Path(os.environ.get("SESSIONS_DIR", "./sessions")) SESSIONS_DIR.mkdir(parents=True, exist_ok=True) BATCH_SIZE = int(os.environ.get("BATCH_SIZE", "3")) MAX_WORKERS_STAGE1 = int(os.environ.get("MAX_WORKERS_STAGE1", "3")) MAX_WORKERS_STAGE2 = int(os.environ.get("MAX_WORKERS_STAGE2", "3")) MAX_WORKERS_STAGE3 = int(os.environ.get("MAX_WORKERS_STAGE3", "2")) MAX_WORKERS_STAGE4 = int(os.environ.get("MAX_WORKERS_STAGE4", "1")) MCP_COMMAND = os.environ.get("MCP_COMMAND", "uvx") MCP_ARGS = os.environ.get("MCP_ARGS", "mcp-server-fetch").split() BEDROCK_MODEL_ID = os.environ.get( "BEDROCK_MODEL_ID", "anthropic.claude-3-5-sonnet-20240620-v1:0" ) # ========================= # クライアント # ========================= dynamodb = boto3.resource("dynamodb", region_name=AWS_REGION) table = dynamodb.Table(TABLE_NAME) sns_client = boto3.client("sns", region_name=AWS_REGION) # ========================= # ユーティリティ # ========================= MD_LINK_RE = re.compile(r"\[([^\]]+)\]\((https?://[^)\s]+)\)") def extract_markdown_links(md: str) -> List[str]: return [m.group(2) for m in MD_LINK_RE.finditer(md or "")] def same_domain(url: str, candidate: str) -> bool: try: return urlparse(url).netloc == urlparse(candidate).netloc except Exception: return False def sk_to_session_id(sk: str) -> str: if "#" in sk: return sk.split("#", 1)[1] return str(uuid.uuid4()) def ensure_clean_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def read_json(path: Path) -> Dict[str, Any]: if not path.exists() or path.stat().st_size == 0: return {} with path.open("r", encoding="utf-8") as f: return json.load(f) def write_json(path: Path, data: Dict[str, Any]) -> None: with path.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def mcp_fetch_markdown(url: str, max_length: int = 15000, start_index: int = 0, raw: bool = False) -> str: stdio_mcp_client = MCPClient(lambda: stdio_client(StdioServerParameters( command=MCP_COMMAND, args=MCP_ARGS ))) with stdio_mcp_client: tools = stdio_mcp_client.list_tools_sync() agent = Agent(model=None, tools=tools) result = agent.tool.fetch( url=url, max_length=max_length, start_index=start_index, raw=raw ) if isinstance(result, dict) and "content" in result: texts = [c.get("text", "") for c in result.get("content", []) if isinstance(c, dict)] return "\n".join(t for t in texts if t) if isinstance(result, str): return result return "" # ========================= # Stage1: 取得 # ========================= def list_candidates(limit: int) -> List[Dict[str, Any]]: resp = table.scan( Limit=limit * 5, FilterExpression="attribute_not_exists(processed_at)" # 未処理の記事のみ ) return resp.get("Items", [])[:limit] def stage1_fetch_and_write(item: Dict[str, Any]) -> Tuple[str, bool, str]: pk = item["pk"] sk = item["sk"] url = item["url"] title = item.get("title", "") session_id = sk_to_session_id(sk) session_dir = SESSIONS_DIR / session_id ensure_clean_dir(session_dir) session_file = session_dir / "session.json" try: md = mcp_fetch_markdown(url) base: Dict[str, Any] = read_json(session_file) base.update({ "pk": pk, "sk": sk, "url": url, "title": title, "stage1": { "markdown": md, "fetched_at": int(time.time()) } }) write_json(session_file, base) return session_id, True, "fetched" except Exception as e: return session_id, False, f"ERROR: {e}" # ========================= # Stage2: 内部リンク収集 # ========================= def stage2_enrich_links(session_id: str, link_limit: int = 5) -> Tuple[str, bool, str]: session_dir = SESSIONS_DIR / session_id session_file = session_dir / "session.json" data = read_json(session_file) if not data or "stage1" not in data or not data["stage1"].get("markdown"): return session_id, False, "stage1 missing" origin_url = data["url"] md = data["stage1"]["markdown"] links = extract_markdown_links(md) internal_links = [l for l in links if same_domain(origin_url, l)] internal_links = list(dict.fromkeys(internal_links))[:link_limit] fetched_links: List[Dict[str, Any]] = [] for lnk in internal_links: try: lmd = mcp_fetch_markdown(lnk, max_length=8000) fetched_links.append({"url": lnk, "markdown": lmd}) except Exception as e: fetched_links.append({"url": lnk, "error": str(e)}) data["stage2"] = { "internal_links": fetched_links, "enriched_at": int(time.time()) } write_json(session_file, data) return session_id, True, f"{len(fetched_links)} links" # ========================= # Stage3: 要約+保存+判定 # ========================= class ArticleDecision(BaseModel): title: str genre: str severity: str summary: str benefits: str insights: str sources: List[str] needs_action: bool = Field(description="AWS環境確認が必要なら True、不要なら False") def build_summary_prompt(data: Dict[str, Any]) -> str: title = data.get("title", "") url = data.get("url", "") body = data.get("stage1", {}).get("markdown", "") links = data.get("stage2", {}).get("internal_links", []) link_snips = [] for li in links: if isinstance(li, dict) and "url" in li: link_snips.append(f"- {li['url']}") link_list = "\n".join(link_snips) prompt = f""" あなたは技術新聞の編集者かつクラウド自動化の専門家です。 以下の内容を読み、要約・分類・重大度判定・メリット・洞察を作成し、 さらに「AWS環境での確認が必要かどうか(needs_action)」を判定してください。 [記事タイトル候補] {title} [元記事URL] {url} [本文抜粋] {body[:12000]} [内部リンク] {link_list} 要件: - 出力は日本語 - 要約は300〜500字 - severityは LOW, MEDIUM, HIGH, CRITICAL のいずれか - needs_action: AWS LambdaやEC2ランタイムEOLなど「環境で確認が必要」なら True """ return prompt def stage3_summarize_and_decide(session_id: str) -> Tuple[str, bool, str]: session_dir = SESSIONS_DIR / session_id session_file = session_dir / "session.json" data = read_json(session_file) if not data or "stage1" not in data: return session_id, False, "stage1 missing" agent = Agent(model=BEDROCK_MODEL_ID) prompt = build_summary_prompt(data) try: result: ArticleDecision = agent.structured_output(ArticleDecision, prompt) data["stage3"] = result.model_dump() write_json(session_file, data) table.update_item( Key={"pk": data["pk"], "sk": data["sk"]}, UpdateExpression=""" SET #s = :s, genre = :g, severity = :sev, benefits = :b, insights = :i, sources = :src, needs_action = :na, processed_at = :ts """, ExpressionAttributeNames={"#s": "summary"}, ExpressionAttributeValues={ ":s": result.summary, ":g": result.genre, ":sev": result.severity, ":b": result.benefits, ":i": result.insights, ":src": result.sources, ":na": result.needs_action, ":ts": int(time.time()) } ) return session_id, True, "summarized & decided" except Exception as e: return session_id, False, f"ERROR: {e}" # ========================= # Stage4: boto3コード生成+実行+SNS通知 # ========================= @tool def execute_python(code: str) -> str: """boto3 読み取り専用コードを実行""" FORBIDDEN = ["delete_", "update_", "terminate_", "create_", "put_", "modify_", "shutdown"] lowered = code.lower() for bad in FORBIDDEN: if bad in lowered: return f"❌ Blocked dangerous operation: contains '{bad}'" try: exec_locals = {} exec(code, {"boto3": boto3, "json": json}, exec_locals) if "result" in exec_locals: return json.dumps(exec_locals["result"], ensure_ascii=False, indent=2) return "✅ Executed but no 'result'." except Exception as e: return f"❌ Error: {e}" @tool def notify_sns(message: str) -> str: """SNS通知""" try: resp = sns_client.publish( TopicArn=SNS_TOPIC_ARN, Message=message, Subject="運用報告レポート" ) return f"✅ Notified: {resp['MessageId']}" except Exception as e: return f"❌ SNS通知失敗: {e}" def stage4_check_and_notify(session_id: str) -> Tuple[str, bool, str]: session_dir = SESSIONS_DIR / session_id session_file = session_dir / "session.json" data = read_json(session_file) if not data or "stage3" not in data: return session_id, False, "stage3 missing" summary_text = data["stage3"].get("summary", "") title = data["stage3"].get("title", data.get("title", "")) severity = data["stage3"].get("severity", "LOW") needs_action = data["stage3"].get("needs_action", False) sources = data["stage3"].get("sources", []) if not needs_action: return session_id, True, "no action needed" # ==== システムプロンプト ==== check_prompt = f""" あなたはAWSとPythonのプロフェッショナルであり、クラウド自動化エージェントです。 以下の記事要約を読み、もし環境で確認すべき内容があれば boto3 を使った完全なPythonコードを生成し、 必ず execute_python で実行してください。その結果を要約し、notify_sns で運用報告レポートを送信してください。 制約: - boto3 読み取り専用APIのみ利用可能 (list/get/describe 系) - 生成コードは必ず 'result' に格納し、JSON形式で返すこと - import 文を含め、即時実行可能な完成コードにすること - 出力は一度きり、繰り返し修正しないこと - ニュース本文そのものではなく「自分のAWS環境での確認結果」を通知すること 記事要約: \"\"\"{summary_text}\"\"\" """ agent = Agent(model=BEDROCK_MODEL_ID, tools=[execute_python, notify_sns]) result = agent(check_prompt) return session_id, True, "checked & notified" # ========================= # メインワークフロー # ========================= def run_stage(func, session_ids: List[str], max_workers: int) -> List[str]: done_ids: List[str] = [] with ThreadPoolExecutor(max_workers=max_workers) as ex: futures = {ex.submit(func, sid): sid for sid in session_ids} for fut in as_completed(futures): sid, ok, msg = fut.result() print(f"[{func.__name__}] {sid} -> {ok} ({msg})") if ok: done_ids.append(sid) return done_ids def main() -> None: items = list_candidates(BATCH_SIZE) if not items: print("No candidates.") return print(f"Picked {len(items)} items.") s1_ids = run_stage(stage1_fetch_and_write, items, MAX_WORKERS_STAGE1) if not s1_ids: return s2_ids = run_stage(stage2_enrich_links, s1_ids, MAX_WORKERS_STAGE2) s3_ids = run_stage(stage3_summarize_and_decide, s2_ids or s1_ids, MAX_WORKERS_STAGE3) run_stage(stage4_check_and_notify, s3_ids, MAX_WORKERS_STAGE4) if __name__ == "__main__": main()
まとめ
今回、構築したAI Agentをさらにブラッシュアップしてより便利なAI Agentを作りたいと思いました!
今後の拡張としてはJiraなどのチケット管理ツールに自動起票してくれる機能を追加していきたいと思います。
この記事は私が書きました
小泉 和貴
記事一覧全国を旅行することを目標に、仕事を頑張っています。
