ENGINEER BLOG ENGINEER BLOG
  • 公開日
  • 最終更新日

【Lambda】レスポンスストリーミングを試してみた

この記事を共有する

目次

はじめに

皆さんこんにちは!パーソル&サーバーワークスの小泉です。

前回の記事【Bedrock AgentCore】ストリーミング応答を試してみたでは、Bedrock AgentCoreにストリーミング応答のRuntimeと非ストリーミング応答のランタイムを作成して、挙動の違いについて紹介しました。  今回は、その続編としてAWS Lambda(以下Lambda)のレスポンスストリーミング機能を使用して、Bedrock AgentCoreのストリーミング応答のランタイムと非ストリーミング応答のランタイムを用いて挙動の違いを検証してみたいと思います。
検証した理由は、Bedrock AgentCoreが非ストリーミング応答の構成でLambdaのレスポンスストリーミングを使用していた場合に、クライアントはどのようなレスポンスを受け取るのが気になり検証しました。

結論、Bedrock AgentCoreが非ストリーミング応答の場合は一括でレスポンスを返すことがわかりました。

レスポンスストリーミングとは

レスポンスストリーミングとは、処理結果を一度に返すのではなく、データが生成されるたびに逐次的にクライアントに送信する仕組みです。
通常のLambda関数では、関数の実行が完了した後にレスポンスが返されていましたが、レスポンスストリーミングを使用すると、処理の途中経過をリアルタイムで返すことができます。
特にAIモデルの応答のように生成に時間がかかる処理や、大量のデータを扱う処理において、ユーザー体験を大幅に向上させることができます。
通常の処理方式とレスポンスストリーミングの違いを簡単に図示してみました。

通常の処理方式

従来の処理.png

レスポンスストリーミング

レスポンスストリーミング.png

レスポンスストリーミングのメリットは以下の通りです。

  • 体感的な応答時間の短縮: ユーザーは最初のレスポンスを早く受け取れるため、アプリケーションの応答が速く感じられます
  • インタラクティブな体験: 生成AI応答など、徐々に構築される情報をリアルタイムで確認できます

構成

今回検証する構成は以下のようになります。

構成.png

Lambda関数をストリーミングモードで構成し、Bedrock AgentCoreのランタイムを呼び出します。
今回は2つのパターンを検証します。

  1. 非ストリーミング応答: Bedrock AgentCoreの非ストリーミングランタイムを呼び出す
  2. ストリーミング応答: Bedrock AgentCoreのストリーミングランタイムを呼び出す

手順

Lambda関数の作成

  1. AWSマネジメントコンソールにログインし、Lambdaに移動します。
  2. 「関数の作成」をクリックし、関数名を入力します。
  3. ランタイムとして「Node.js 20.x」を選択します。
  4. その他の設定から関数URLにチェックを入れて、「RESPONSE_STREAM」を選択します。
  5. 「関数を作成」をクリックします。

stream.png

コードの実装

import { pipeline } from 'node:stream/promises';
import { randomUUID } from 'crypto';
import { BedrockAgentCoreClient, InvokeAgentRuntimeCommand } from '@aws-sdk/client-bedrock-agentcore';
// AgentCore クライアントの初期化
const agentCore = new BedrockAgentCoreClient({ region: 'us-west-2' });
const AGENT_ARN = 'BedrockAgentCoreのarn';
export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
  const prompt = event.body ? JSON.parse(event.body).prompt : event.queryStringParameters?.prompt || 'Hello world';
  const sessionId = randomUUID();
  const httpResponseMetadata = {
    statusCode: 200,
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Access-Control-Allow-Origin': '*'
    }
  };
  responseStream = awslambda.HttpResponseStream.from(responseStream, httpResponseMetadata);
  try {
    const command = new InvokeAgentRuntimeCommand({
      agentRuntimeArn: AGENT_ARN,
      runtimeSessionId: sessionId,
      payload: new TextEncoder().encode(JSON.stringify({ prompt })),
      qualifier: 'DEFAULT'
    });
    const response = await agentCore.send(command);
    await pipeline(response.response, responseStream);
  } catch (error) {
    responseStream.write(`data: {"type": "error", "message": "${error.message || 'Unknown error'}"}\n\n`);
    responseStream.end();
  }
});

やってみた

実際に実装したコードをデプロイして、非ストリーミングとストリーミングの両方のパターンでテストしてみました。

非ストリーミング応答

まずは、非ストリーミング応答のランタイムを使用した場合の結果です。
この場合、Lambdaがストリーミング対応していてもBedrock AgentCoreが非ストリーミング応答の場合は完全なレスポンスが生成された後にかえってくることがわかりました。

notstream関数URL.png

ストリーミング応答

次に、ストリーミング応答のランタイムを使用した場合です。この場合はもちろんレスポンスがストリーミング形式でかえってきました。

関数URL.png

ただ実際に、このレスポンスをもらうユーザは内容が理解しにくいので必要な部分をフィルタリングして再度テストをしました。

import { pipeline } from 'node:stream/promises';
import { Transform } from 'node:stream';
import { randomUUID } from 'crypto';
import { BedrockAgentCoreClient, InvokeAgentRuntimeCommand } from '@aws-sdk/client-bedrock-agentcore';
const agentCore = new BedrockAgentCoreClient({ region: 'us-west-2' });
const AGENT_ARN = 'BedrockAgentCoreのarn';
export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
  const prompt = event.body ? JSON.parse(event.body).prompt : event.queryStringParameters?.prompt || 'Hello world';
  const sessionId = randomUUID();
  const httpResponseMetadata = {
    statusCode: 200,
    headers: {
      'Content-Type': 'text/plain',
      'Cache-Control': 'no-cache',
      'Access-Control-Allow-Origin': '*'
    }
  };
  responseStream = awslambda.HttpResponseStream.from(responseStream, httpResponseMetadata);
  const textFilter = new Transform({
    transform(chunk, encoding, callback) {
      const lines = chunk.toString().split('\n');
      for (const line of lines) {
        if (line.startsWith('data: {')) {
          try {
            const data = JSON.parse(line.substring(6));
            if (data.event?.contentBlockDelta?.delta?.text) {
              this.push(data.event.contentBlockDelta.delta.text);
            }
          } catch (e) {
          }
        }
      }
      callback();
    }
  });
  try {
    const command = new InvokeAgentRuntimeCommand({
      agentRuntimeArn: AGENT_ARN,
      runtimeSessionId: sessionId,
      payload: new TextEncoder().encode(JSON.stringify({ prompt })),
      qualifier: 'DEFAULT'
    });
    const response = await agentCore.send(command);
    await pipeline(response.response, textFilter, responseStream);
  } catch (error) {
    responseStream.write(`Error: ${error.message || 'Unknown error'}`);
    responseStream.end();
  }
});

必要な部分だけ、抽出されてレスポンスを返すことができました。

関数URL抽出.png

まとめ

考えてみれば、当たり前の結果になりましたが実際に手を動かすことで、机上で考えるよりも理解度は深まると思いました。

参考

この記事は私が書きました

小泉 和貴

記事一覧

全国を旅行することを目標に、仕事を頑張っています。

小泉 和貴

この記事を共有する

クラウドのご相談

CONTACT

クラウド導入や運用でお悩みの方は、お気軽にご相談ください。
専門家がサポートします。

サービス資料ダウンロード

DOWNLOAD

ビジネスをクラウドで加速させる準備はできていますか?
今すぐサービス資料をダウンロードして、詳細をご確認ください。