yunwu

yunwu

Node.js のストリームについて

ストリームとは何ですか#

ストリームは、私にとって非常に困惑する概念でした - ファイルの読み書き、ネットワークリクエストなど、これらはすべてストリームになります。なぜファイルを読み書きするのに「ストリーム」という複雑な操作が必要なのでしょうか?

ストリームはデータのコレクションであり、しかし、これらのデータは一度に完全に取得する必要はありません。つまり、大量のメモリを一度に使用する必要がないため、大容量のデータ(大きなファイルの読み書き、長時間かかるネットワークリクエストなど)をより効率的に処理できます。

サンプル#

以下は、大きなファイル big.file を提供する Web サーバーの例です。

const fs = require('fs');
const server = require('http').createServer();
server.on('request', (req, res) => {
    fs.readFile('./big.file', (err, data) => {
        if(err) throw err;
        res.end(data);
    })
});
server.listen(8000);

この方法では、リクエスト時に大量のメモリを使用するため、ファイル全体を一度にメモリに読み込み、それを一度にクライアントに返します。ファイルが非常に大きい場合、サーバーがクラッシュしたり応答が遅くなる可能性があります。この場合、ストリームを使用してこの問題を解決し、ファイルを小さなチャンクに分割して送信し、メモリの使用量を減らすことができます。

Node の fs モジュールでは、createReadStream メソッドを使用して読み込み可能なストリームを作成できます。 これをレスポンスオブジェクトにパイプ(pipe)することができます。

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
});

server.listen(8000);

これにより、メモリの使用量が比較的低いレベルに保たれるため、ファイル全体を一度に読み込む必要はありません。

ストリームの種類#

Node.js には、Readable(読み込み可能なストリーム)Writable(書き込み可能なストリーム)、Duplex(双方向ストリーム)、Transform(変換ストリーム)の 4 つの基本的なストリームタイプがあります。

  • Readable(読み込み可能なストリーム)は、データを消費できるソースの抽象化です。例えば、fs.createReadStream メソッドがあります。
  • Writable(書き込み可能なストリーム)は、データを書き込むことができるターゲットの抽象化です。例えば、fs.createWriteStream メソッドがあります。
  • Duplex(双方向ストリーム)は、読み込みと書き込みの両方が可能なストリームです。例えば、TCP ソケットがあります。
  • Transform(変換ストリーム)は、双方向ストリームを基にしており、データの変更や変換を読み込みまたは書き込み時に行うために使用されます。例えば、zlib.createGzip は gzip アルゴリズムを使用してデータを圧縮するために使用されます。変換ストリームは、入力が書き込み可能なストリームであり、出力が読み込み可能なストリームであると考えることができます。変換ストリームは「スルーストリーム」とも呼ばれることがあります。

すべてのストリームは EventEmitter のインスタンスです。データの読み書きによってイベントがトリガされますが、pipe メソッドを使用してストリームのデータを消費することもできます。

pipe メソッド#

使用方法:

readableSrc.pipe(writableDest)

pipe メソッドは、ターゲットストリームを返し、pipe をチェーンできるようにします。

pipe メソッドは、ストリームのデータを消費するための最も簡単な方法です。通常、ストリームのデータを消費するためには、pipe メソッドまたはイベントを使用することが推奨されます。

イベント#

ストリームはイベントを直接消費することができます。まず、pipe メソッドと等価なイベントを使用してストリームのデータを消費する実装を見てみましょう。

# readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
  // chunkはBufferです。処理を行う場合は、まずtoString()メソッドを使用して文字列に変換する必要があります。
  // 例: JSON.parse(chunk.toString())
});
readable.on('end', () => {
  writable.end();
});

ここでは、data イベントと end イベントを使用しています。

読み込み可能なストリームで最も重要なイベントは次のとおりです:

  • data イベントは、ストリームがデータチャンクを消費者に渡すときにトリガされます。
  • end イベントは、ストリーム内に消費できるデータがない場合にトリガされます。

書き込み可能なストリームで最も重要なイベントは次のとおりです:

  • drain イベントは、書き込み可能なストリームがさらにデータを受け入れることができることを示すフラグです。
  • finish イベントは、すべてのデータがシステムに書き込まれたときにトリガされます。

HTTP ストリーム#

HTTPリクエストHTTPレスポンス の両方でストリーム入出力を使用できます。

HTTPレスポンス はクライアント側では読み込み可能なストリームであり、サーバー側では書き込み可能なストリームです。HTTPリクエスト はその逆です。

サンプル#

以下は、サーバーがストリーミング出力の API をリクエストし、それをクライアントにストリーミング出力する例です - これはあまり意味がありませんが、HTTPリクエストHTTPレスポンス の処理を含んでおり、さらなる処理に発展させることができます。

最もシンプルな例は、pipe メソッドを直接使用することです:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.body.pipe(res);
  } catch(e) {
    // ...
  }
})

⚠️ node-fetch をインポートすることを忘れないでください。

pipe メソッドを直接使用すると、ストリームデータをさらに処理することはできません。したがって、前述の イベント に基づいて等価な形式に変更してみましょう:

app.post('/stream', async (req, res) => {
	try {
    const response = await fetch('http://another.stream/');
    response.on('data', (chunk) => {
      res.write(chunk);
    });
    response.on('end', () => {
      res.end();
    });
  } catch(e) {
    // ...
  }
})

これで、さらなる処理が可能になります。

例:

Github

以下のコードは、Difyというオープンソースの LLMOps プラットフォームの API(広告ではありません)をリクエストし、ストリーミング出力の API の返り値を OpenAI API と同じ形式に変換してからストリーミング出力するものです。これにより、OpenAI API を使用して開発されたさまざまなアプリケーションに適応できます。

// const stream = await fetch(...)
stream.on('data', (chunk) => {
    console.log(`Received chunk: ${chunk}`);
    if (!chunk.toString().startsWith('data:')) return;
    const chunkObj = JSON.parse(chunk.toString().split("data: ")[1]);
    if (chunkObj.event != 'message') {
        console.log('Not a message, skip.');
        return;
    }
    const chunkContent = chunkObj.answer;
    const chunkId = chunkObj.conversation_id;
    const chunkCreate = chunkObj.created_at;
    res.write("data: " + JSON.stringify({
        "id": chunkId,
        "object": "chat.completion.chunk",
        "created": chunkCreate,
        "model": data.model,
        "choices": [{
            "index": 0,
            "delta": {
                "content": chunkContent
            },
            "finish_reason": null
        }]
    }) + "\n\n");
})
stream.on('end', () => {
    console.log('end event detected...')
    res.write("data: [DONE]\n\n");
    res.end();
})
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。