← ブログ一覧

Node.js Stream で大容量データを効率的に処理する実践ガイド

メモリ枯渇を防ぎ、大容量ファイルや API レスポンスを効率的に処理する Node.js Stream の実装方法を解説。Readable・Transform・Pipeline の使い分けから CSV パース・バッチ処理まで網羅した実務ガイド。

#Node.js#JavaScript#TypeScript#パフォーマンス
Node.js Stream で大容量データを効率的に処理する実践ガイド

Node.js Stream で大容量データを効率的に処理する実践ガイド

大容量の CSV ファイルを読み込もうとしてメモリ不足エラーが出た、API から大量のデータを取得する際にタイムアウトする——Node.js で大きなデータを扱う際、こうした問題に直面することは少なくありません。

Node.js の Stream API を使えば、データを小さなチャンク(塊)に分割して処理することで、メモリ使用量を抑えつつ効率的にデータを扱えます。本記事では、受託開発の現場で即使える Stream の実装パターンを、具体的なコード例とともに解説します。


1. Stream が必要になる場面

典型的な問題シナリオ

以下のようなケースで、通常のファイル読み込みやデータ処理ではメモリ不足やパフォーマンス問題が発生します。

| シナリオ | 問題 | Stream による解決 | |---------|------|------------------| | 数百 MB の CSV ファイル処理 | fs.readFileSync() で全体を読み込むとメモリ不足 | 行単位で読み込み・処理 | | 大量の DB レコードを JSON で返す | 全件取得後にレスポンスするとタイムアウト | 取得しながらストリーミングレスポンス | | 外部 API から大量データ取得 | 全体の受信完了を待つと時間がかかる | チャンク単位で処理開始 | | ログファイルの集計・フィルタリング | ファイル全体をメモリに載せると重い | 行単位で条件判定・集計 |

メモリ使用量の比較

// ❌ 悪い例:ファイル全体をメモリに読み込む
import fs from 'fs';

const data = fs.readFileSync('large-file.csv', 'utf-8');
const lines = data.split('\n');
// 500MB のファイルなら、メモリ上に 500MB + 配列メモリが必要

// ✅ 良い例:Stream で行単位に処理
import fs from 'fs';
import readline from 'readline';

const stream = fs.createReadStream('large-file.csv');
const rl = readline.createInterface({ input: stream });

rl.on('line', (line) => {
  // 1行ずつ処理(メモリ使用量はほぼ一定)
});

2. Stream の基本:4つの種類

Node.js の Stream には 4 つの種類があり、それぞれ用途が異なります。

Stream の種類と用途

| 種類 | 説明 | 用途例 | |------|------|--------| | Readable | データを読み取る | ファイル読み込み、HTTP リクエスト受信 | | Writable | データを書き込む | ファイル書き込み、HTTP レスポンス送信 | | Duplex | 読み書き両方可能 | TCP ソケット、WebSocket | | Transform | 読み取りながらデータを変換 | CSV → JSON 変換、圧縮、暗号化 |

実務で最もよく使うパターン

受託開発の現場では、以下の組み合わせが頻出します。

// パターン 1:ファイル → 変換 → ファイル
fs.createReadStream('input.csv')
  .pipe(transformStream)  // CSV を JSON に変換
  .pipe(fs.createWriteStream('output.json'));

// パターン 2:ファイル → 変換 → HTTP レスポンス
app.get('/download', (req, res) => {
  fs.createReadStream('data.csv')
    .pipe(transformToJSON)
    .pipe(res);  // クライアントにストリーミング送信
});

3. 実装パターン 1:CSV ファイルを行単位で処理

基本実装:readline を使った行読み込み

import fs from 'fs';
import readline from 'readline';

interface CSVRow {
  id: string;
  name: string;
  email: string;
}

async function processLargeCSV(filePath: string): Promise<void> {
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity,  // \r\n を1つの改行として扱う
  });

  let lineNumber = 0;
  const results: CSVRow[] = [];

  for await (const line of rl) {
    lineNumber++;
    
    // ヘッダー行をスキップ
    if (lineNumber === 1) continue;

    const [id, name, email] = line.split(',');
    
    // 条件にマッチする行だけ処理
    if (email.includes('@example.com')) {
      results.push({ id, name, email });
    }

    // 1000件ごとにバッチ処理
    if (results.length >= 1000) {
      await saveToDB(results);
      results.length = 0;  // 配列をクリア
    }
  }

  // 残りを処理
  if (results.length > 0) {
    await saveToDB(results);
  }
}

async function saveToDB(rows: CSVRow[]): Promise<void> {
  // DB への一括挿入処理
  console.log(`Saving ${rows.length} rows...`);
}

エラーハンドリングの追加

async function processLargeCSVWithErrorHandling(
  filePath: string
): Promise<void> {
  const fileStream = fs.createReadStream(filePath);
  const rl = readline.createInterface({ input: fileStream });

  fileStream.on('error', (err) => {
    console.error('File read error:', err);
    rl.close();
  });

  let lineNumber = 0;
  const errors: { line: number; error: string }[] = [];

  for await (const line of rl) {
    lineNumber++;
    
    try {
      const [id, name, email] = line.split(',');
      
      // バリデーション
      if (!email || !email.includes('@')) {
        throw new Error('Invalid email format');
      }
      
      // 処理...
    } catch (err) {
      errors.push({
        line: lineNumber,
        error: err instanceof Error ? err.message : String(err),
      });
    }
  }

  // エラーサマリーをログ出力
  if (errors.length > 0) {
    console.error(`${errors.length} errors occurred:`);
    errors.slice(0, 10).forEach((e) => {
      console.error(`Line ${e.line}: ${e.error}`);
    });
  }
}

4. 実装パターン 2:Transform Stream でデータ変換

カスタム Transform Stream の作成

import { Transform, TransformCallback } from 'stream';

interface CSVRow {
  id: string;
  name: string;
  email: string;
}

class CSVToJSONTransform extends Transform {
  private headerProcessed = false;
  private headers: string[] = [];

  constructor() {
    super({ objectMode: true });
  }

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: TransformCallback
  ): void {
    const lines = chunk.toString().split('\n');

    for (const line of lines) {
      if (!line.trim()) continue;

      if (!this.headerProcessed) {
        // 最初の行をヘッダーとして保存
        this.headers = line.split(',').map((h) => h.trim());
        this.headerProcessed = true;
        continue;
      }

      // データ行を JSON オブジェクトに変換
      const values = line.split(',');
      const obj: Record<string, string> = {};
      
      this.headers.forEach((header, index) => {
        obj[header] = values[index]?.trim() || '';
      });

      // オブジェクトを次の Stream に渡す
      this.push(JSON.stringify(obj) + '\n');
    }

    callback();
  }
}

// 使用例
import fs from 'fs';

fs.createReadStream('data.csv')
  .pipe(new CSVToJSONTransform())
  .pipe(fs.createWriteStream('data.jsonl'));  // JSON Lines 形式

フィルタリング機能を追加した Transform

class FilteredCSVTransform extends Transform {
  private headerProcessed = false;
  private headers: string[] = [];
  private filterFn: (row: Record<string, string>) => boolean;

  constructor(filterFn: (row: Record<string, string>) => boolean) {
    super({ objectMode: true });
    this.filterFn = filterFn;
  }

  _transform(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: TransformCallback
  ): void {
    const lines = chunk.toString().split('\n');

    for (const line of lines) {
      if (!line.trim()) continue;

      if (!this.headerProcessed) {
        this.headers = line.split(',').map((h) => h.trim());
        this.headerProcessed = true;
        continue;
      }

      const values = line.split(',');
      const obj: Record<string, string> = {};
      
      this.headers.forEach((header, index) => {
        obj[header] = values[index]?.trim() || '';
      });

      // フィルタ条件を満たす行だけ出力
      if (this.filterFn(obj)) {
        this.push(JSON.stringify(obj) + '\n');
      }
    }

    callback();
  }
}

// 使用例:メールアドレスが特定ドメインの行だけ抽出
fs.createReadStream('users.csv')
  .pipe(
    new FilteredCSVTransform(
      (row) => row.email?.endsWith('@example.com') || false
    )
  )
  .pipe(fs.createWriteStream('filtered-users.jsonl'));

5. 実装パターン 3:HTTP レスポンスをストリーミング送信

Express でのストリーミングレスポンス

import express from 'express';
import fs from 'fs';
import { pipeline } from 'stream/promises';

const app = express();

app.get('/download/csv', async (req, res) => {
  try {
    res.setHeader('Content-Type', 'text/csv');
    res.setHeader('Content-Disposition', 'attachment; filename="data.csv"');

    const readStream = fs.createReadStream('large-data.csv');
    
    // pipeline を使うとエラーハンドリングが簡潔
    await pipeline(readStream, res);
  } catch (err) {
    console.error('Stream error:', err);
    if (!res.headersSent) {
      res.status(500).json({ error: 'Download failed' });
    }
  }
});

// JSON 形式でストリーミング
app.get('/api/users/stream', async (req, res) => {
  try {
    res.setHeader('Content-Type', 'application/json');
    res.write('[');  // JSON 配列の開始

    const readStream = fs.createReadStream('users.csv');
    const transform = new CSVToJSONTransform();

    let first = true;
    transform.on('data', (chunk) => {
      if (!first) res.write(',');
      res.write(chunk);
      first = false;
    });

    await pipeline(readStream, transform);
    
    res.write(']');  // JSON 配列の終了
    res.end();
  } catch (err) {
    console.error('Stream error:', err);
    if (!res.headersSent) {
      res.status(500).json({ error: 'Stream failed' });
    }
  }
});

DB クエリ結果をストリーミング(PostgreSQL)

import { Pool } from 'pg';
import { Transform } from 'stream';
import { pipeline } from 'stream/promises';

const pool = new Pool({
  connectionString: process.env.DATABASE_URL,
});

app.get('/api/large-dataset', async (req, res) => {
  const client = await pool.connect();
  
  try {
    res.setHeader('Content-Type', 'application/json');
    res.write('[');

    // PostgreSQL の CURSOR を使ったストリーミング
    const query = new QueryStream(
      'SELECT * FROM large_table WHERE created_at > $1',
      [req.query.since]
    );
    
    const stream = client.query(query);
    
    let first = true;
    const transformToJSON = new Transform({
      objectMode: true,
      transform(row, encoding, callback) {
        const prefix = first ? '' : ',';
        first = false;
        callback(null, prefix + JSON.stringify(row));
      },
    });

    await pipeline(stream, transformToJSON, res);
    
    res.write(']');
    res.end();
  } catch (err) {
    console.error('Query stream error:', err);
    if (!res.headersSent) {
      res.status(500).json({ error: 'Query failed' });
    }
  } finally {
    client.release();
  }
});

6. 実装パターン 4:バックプレッシャー(背圧)制御

バックプレッシャーとは

Stream の処理速度が異なる場合(例:読み込みが書き込みより速い)、メモリに未処理データが溜まってしまう問題を バックプレッシャー と呼びます。

import { Writable } from 'stream';

class SlowWriter extends Writable {
  _write(
    chunk: Buffer,
    encoding: BufferEncoding,
    callback: (error?: Error | null) => void
  ): void {
    // 意図的に遅い書き込みをシミュレート
    setTimeout(() => {
      console.log('Writing chunk:', chunk.length, 'bytes');
      callback();
    }, 100);
  }
}

// ❌ 悪い例:バックプレッシャーを無視
const reader = fs.createReadStream('large-file.dat');
const writer = new SlowWriter();

reader.on('data', (chunk) => {
  writer.write(chunk);  // writer が遅いと、メモリに溜まり続ける
});

// ✅ 良い例:pipe を使うと自動的に制御される
fs.createReadStream('large-file.dat').pipe(new SlowWriter());

手動でバックプレッシャーを制御

const reader = fs.createReadStream('large-file.dat');
const writer = new SlowWriter();

reader.on('data', (chunk) => {
  const canContinue = writer.write(chunk);
  
  if (!canContinue) {
    // writer のバッファがいっぱいになったら読み込みを一時停止
    reader.pause();
    console.log('Reader paused due to backpressure');
  }
});

writer.on('drain', () => {
  // バッファが空になったら読み込みを再開
  console.log('Writer drained, resuming reader');
  reader.resume();
});

reader.on('end', () => {
  writer.end();
});

7. パフォーマンス最適化のチェックリスト

必須項目

| 項目 | 推奨設定 | 理由 | |------|----------|------| | highWaterMark | 64KB〜1MB | デフォルト(16KB)より大きくすると I/O 効率向上 | | objectMode | 必要な場合のみ true | オブジェクト単位の処理が必要な Transform のみ | | encoding | 'utf8' を明示 | 文字列処理時は必ず指定 | | pipeline 使用 | エラーハンドリング時 | pipe() より安全 |

実装例:highWaterMark の調整

// デフォルト(16KB)
const defaultStream = fs.createReadStream('file.dat');

// 大きなファイルの場合は highWaterMark を増やす
const optimizedStream = fs.createReadStream('large-file.dat', {
  highWaterMark: 1024 * 1024,  // 1MB
});

// ベンチマーク例
import { performance } from 'perf_hooks';

async function benchmark(highWaterMark: number): Promise<number> {
  const start = performance.now();
  
  const stream = fs.createReadStream('test-file.dat', { highWaterMark });
  
  return new Promise((resolve) => {
    let bytes = 0;
    stream.on('data', (chunk) => {
      bytes += chunk.length;
    });
    stream.on('end', () => {
      const duration = performance.now() - start;
      console.log(
        `highWaterMark: ${highWaterMark}, Duration: ${duration.toFixed(2)}ms`
      );
      resolve(duration);
    });
  });
}

// 実行
await benchmark(16 * 1024);      // 16KB(デフォルト)
await benchmark(64 * 1024);      // 64KB
await benchmark(1024 * 1024);   // 1MB

8. トラブルシューティング:よくあるエラーと対処法

エラー 1:EMFILE: too many open files

原因:同時に開いているファイルディスクリプタが OS の上限を超えた。

// ❌ 悪い例:一度に大量のファイルを開く
const files = await fs.promises.readdir('logs/');
files.forEach((file) => {
  fs.createReadStream(`logs/${file}`).pipe(processStream);
});

// ✅ 良い例:並列数を制限
import pLimit from 'p-limit';

const limit = pLimit(10);  // 同時に 10 ファイルまで

const files = await fs.promises.readdir('logs/');
await Promise.all(
  files.map((file) =>
    limit(() => processFile(`logs/${file}`))
  )
);

async function processFile(path: string): Promise<void> {
  return pipeline(
    fs.createReadStream(path),
    transformStream,
    writeStream
  );
}

エラー 2:ERR_STREAM_PREMATURE_CLOSE

原因:Stream が正常に終了する前に切断された。

// ✅ 対処法:finished イベントを使う
import { finished } from 'stream/promises';

const stream = fs.createReadStream('file.dat');

try {
  stream.pipe(processStream);
  await finished(stream);
  console.log('Stream completed successfully');
} catch (err) {
  console.error('Stream error:', err);
}

エラー 3:メモリリークの検出

// メモリ使用量をモニタリング
function logMemoryUsage(): void {
  const usage = process.memoryUsage();
  console.log({
    rss: `${(usage.rss / 1024 / 1024).toFixed(2)} MB`,
    heapUsed: `${(usage.heapUsed / 1024 / 1024).toFixed(2)} MB`,
  });
}

setInterval(logMemoryUsage, 5000);  // 5秒ごとに出力

// Stream 処理中
const stream = fs.createReadStream('large-file.dat');
stream.on('data', (chunk) => {
  // 処理...
});

まとめ

Node.js の Stream API を使うことで、大容量データを効率的に処理できます。本記事で紹介した実装パターンを活用すれば、メモリ使用量を抑えつつ高速な処理が可能になります。

重要ポイント

  • 小さなチャンクに分割:ファイル全体をメモリに読み込まない
  • pipeline を活用:エラーハンドリングが簡潔になる
  • バックプレッシャーを意識pipe() を使えば自動制御される
  • highWaterMark を調整:ファイルサイズに応じて最適化

実務での適用シーン

  • CSV / JSON の大量データ処理
  • ログファイルの集計・分析
  • DB クエリ結果の API レスポンス
  • ファイルアップロード・ダウンロード

Yureate では、Node.js を使った高速・スケーラブルなバックエンド開発を支援しています。大容量データ処理の最適化、パフォーマンス改善など、お気軽にご相談ください。

お問い合わせはこちら

この内容について相談する他の記事を見る