Node.js Stream で大容量データを効率的に処理する実践ガイド
メモリ枯渇を防ぎ、大容量ファイルや API レスポンスを効率的に処理する Node.js Stream の実装方法を解説。Readable・Transform・Pipeline の使い分けから CSV パース・バッチ処理まで網羅した実務ガイド。

Node.js Stream で大容量データを効率的に処理する実践ガイド
大容量の CSV ファイルを読み込もうとしてメモリ不足エラーが出た、API から大量のデータを取得する際にタイムアウトする——Node.js で大きなデータを扱う際、こうした問題に直面することは少なくありません。
Node.js の Stream API を使えば、データを小さなチャンク(塊)に分割して処理することで、メモリ使用量を抑えつつ効率的にデータを扱えます。本記事では、受託開発の現場で即使える Stream の実装パターンを、具体的なコード例とともに解説します。
1. Stream が必要になる場面
典型的な問題シナリオ
以下のようなケースで、通常のファイル読み込みやデータ処理ではメモリ不足やパフォーマンス問題が発生します。
| シナリオ | 問題 | Stream による解決 |
|---------|------|------------------|
| 数百 MB の CSV ファイル処理 | fs.readFileSync() で全体を読み込むとメモリ不足 | 行単位で読み込み・処理 |
| 大量の DB レコードを JSON で返す | 全件取得後にレスポンスするとタイムアウト | 取得しながらストリーミングレスポンス |
| 外部 API から大量データ取得 | 全体の受信完了を待つと時間がかかる | チャンク単位で処理開始 |
| ログファイルの集計・フィルタリング | ファイル全体をメモリに載せると重い | 行単位で条件判定・集計 |
メモリ使用量の比較
// ❌ 悪い例:ファイル全体をメモリに読み込む
import fs from 'fs';
const data = fs.readFileSync('large-file.csv', 'utf-8');
const lines = data.split('\n');
// 500MB のファイルなら、メモリ上に 500MB + 配列メモリが必要
// ✅ 良い例:Stream で行単位に処理
import fs from 'fs';
import readline from 'readline';
const stream = fs.createReadStream('large-file.csv');
const rl = readline.createInterface({ input: stream });
rl.on('line', (line) => {
// 1行ずつ処理(メモリ使用量はほぼ一定)
});
2. Stream の基本:4つの種類
Node.js の Stream には 4 つの種類があり、それぞれ用途が異なります。
Stream の種類と用途
| 種類 | 説明 | 用途例 | |------|------|--------| | Readable | データを読み取る | ファイル読み込み、HTTP リクエスト受信 | | Writable | データを書き込む | ファイル書き込み、HTTP レスポンス送信 | | Duplex | 読み書き両方可能 | TCP ソケット、WebSocket | | Transform | 読み取りながらデータを変換 | CSV → JSON 変換、圧縮、暗号化 |
実務で最もよく使うパターン
受託開発の現場では、以下の組み合わせが頻出します。
// パターン 1:ファイル → 変換 → ファイル
fs.createReadStream('input.csv')
.pipe(transformStream) // CSV を JSON に変換
.pipe(fs.createWriteStream('output.json'));
// パターン 2:ファイル → 変換 → HTTP レスポンス
app.get('/download', (req, res) => {
fs.createReadStream('data.csv')
.pipe(transformToJSON)
.pipe(res); // クライアントにストリーミング送信
});
3. 実装パターン 1:CSV ファイルを行単位で処理
基本実装:readline を使った行読み込み
import fs from 'fs';
import readline from 'readline';
interface CSVRow {
id: string;
name: string;
email: string;
}
async function processLargeCSV(filePath: string): Promise<void> {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity, // \r\n を1つの改行として扱う
});
let lineNumber = 0;
const results: CSVRow[] = [];
for await (const line of rl) {
lineNumber++;
// ヘッダー行をスキップ
if (lineNumber === 1) continue;
const [id, name, email] = line.split(',');
// 条件にマッチする行だけ処理
if (email.includes('@example.com')) {
results.push({ id, name, email });
}
// 1000件ごとにバッチ処理
if (results.length >= 1000) {
await saveToDB(results);
results.length = 0; // 配列をクリア
}
}
// 残りを処理
if (results.length > 0) {
await saveToDB(results);
}
}
async function saveToDB(rows: CSVRow[]): Promise<void> {
// DB への一括挿入処理
console.log(`Saving ${rows.length} rows...`);
}
エラーハンドリングの追加
async function processLargeCSVWithErrorHandling(
filePath: string
): Promise<void> {
const fileStream = fs.createReadStream(filePath);
const rl = readline.createInterface({ input: fileStream });
fileStream.on('error', (err) => {
console.error('File read error:', err);
rl.close();
});
let lineNumber = 0;
const errors: { line: number; error: string }[] = [];
for await (const line of rl) {
lineNumber++;
try {
const [id, name, email] = line.split(',');
// バリデーション
if (!email || !email.includes('@')) {
throw new Error('Invalid email format');
}
// 処理...
} catch (err) {
errors.push({
line: lineNumber,
error: err instanceof Error ? err.message : String(err),
});
}
}
// エラーサマリーをログ出力
if (errors.length > 0) {
console.error(`${errors.length} errors occurred:`);
errors.slice(0, 10).forEach((e) => {
console.error(`Line ${e.line}: ${e.error}`);
});
}
}
4. 実装パターン 2:Transform Stream でデータ変換
カスタム Transform Stream の作成
import { Transform, TransformCallback } from 'stream';
interface CSVRow {
id: string;
name: string;
email: string;
}
class CSVToJSONTransform extends Transform {
private headerProcessed = false;
private headers: string[] = [];
constructor() {
super({ objectMode: true });
}
_transform(
chunk: Buffer,
encoding: BufferEncoding,
callback: TransformCallback
): void {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
if (!this.headerProcessed) {
// 最初の行をヘッダーとして保存
this.headers = line.split(',').map((h) => h.trim());
this.headerProcessed = true;
continue;
}
// データ行を JSON オブジェクトに変換
const values = line.split(',');
const obj: Record<string, string> = {};
this.headers.forEach((header, index) => {
obj[header] = values[index]?.trim() || '';
});
// オブジェクトを次の Stream に渡す
this.push(JSON.stringify(obj) + '\n');
}
callback();
}
}
// 使用例
import fs from 'fs';
fs.createReadStream('data.csv')
.pipe(new CSVToJSONTransform())
.pipe(fs.createWriteStream('data.jsonl')); // JSON Lines 形式
フィルタリング機能を追加した Transform
class FilteredCSVTransform extends Transform {
private headerProcessed = false;
private headers: string[] = [];
private filterFn: (row: Record<string, string>) => boolean;
constructor(filterFn: (row: Record<string, string>) => boolean) {
super({ objectMode: true });
this.filterFn = filterFn;
}
_transform(
chunk: Buffer,
encoding: BufferEncoding,
callback: TransformCallback
): void {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (!line.trim()) continue;
if (!this.headerProcessed) {
this.headers = line.split(',').map((h) => h.trim());
this.headerProcessed = true;
continue;
}
const values = line.split(',');
const obj: Record<string, string> = {};
this.headers.forEach((header, index) => {
obj[header] = values[index]?.trim() || '';
});
// フィルタ条件を満たす行だけ出力
if (this.filterFn(obj)) {
this.push(JSON.stringify(obj) + '\n');
}
}
callback();
}
}
// 使用例:メールアドレスが特定ドメインの行だけ抽出
fs.createReadStream('users.csv')
.pipe(
new FilteredCSVTransform(
(row) => row.email?.endsWith('@example.com') || false
)
)
.pipe(fs.createWriteStream('filtered-users.jsonl'));
5. 実装パターン 3:HTTP レスポンスをストリーミング送信
Express でのストリーミングレスポンス
import express from 'express';
import fs from 'fs';
import { pipeline } from 'stream/promises';
const app = express();
app.get('/download/csv', async (req, res) => {
try {
res.setHeader('Content-Type', 'text/csv');
res.setHeader('Content-Disposition', 'attachment; filename="data.csv"');
const readStream = fs.createReadStream('large-data.csv');
// pipeline を使うとエラーハンドリングが簡潔
await pipeline(readStream, res);
} catch (err) {
console.error('Stream error:', err);
if (!res.headersSent) {
res.status(500).json({ error: 'Download failed' });
}
}
});
// JSON 形式でストリーミング
app.get('/api/users/stream', async (req, res) => {
try {
res.setHeader('Content-Type', 'application/json');
res.write('['); // JSON 配列の開始
const readStream = fs.createReadStream('users.csv');
const transform = new CSVToJSONTransform();
let first = true;
transform.on('data', (chunk) => {
if (!first) res.write(',');
res.write(chunk);
first = false;
});
await pipeline(readStream, transform);
res.write(']'); // JSON 配列の終了
res.end();
} catch (err) {
console.error('Stream error:', err);
if (!res.headersSent) {
res.status(500).json({ error: 'Stream failed' });
}
}
});
DB クエリ結果をストリーミング(PostgreSQL)
import { Pool } from 'pg';
import { Transform } from 'stream';
import { pipeline } from 'stream/promises';
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});
app.get('/api/large-dataset', async (req, res) => {
const client = await pool.connect();
try {
res.setHeader('Content-Type', 'application/json');
res.write('[');
// PostgreSQL の CURSOR を使ったストリーミング
const query = new QueryStream(
'SELECT * FROM large_table WHERE created_at > $1',
[req.query.since]
);
const stream = client.query(query);
let first = true;
const transformToJSON = new Transform({
objectMode: true,
transform(row, encoding, callback) {
const prefix = first ? '' : ',';
first = false;
callback(null, prefix + JSON.stringify(row));
},
});
await pipeline(stream, transformToJSON, res);
res.write(']');
res.end();
} catch (err) {
console.error('Query stream error:', err);
if (!res.headersSent) {
res.status(500).json({ error: 'Query failed' });
}
} finally {
client.release();
}
});
6. 実装パターン 4:バックプレッシャー(背圧)制御
バックプレッシャーとは
Stream の処理速度が異なる場合(例:読み込みが書き込みより速い)、メモリに未処理データが溜まってしまう問題を バックプレッシャー と呼びます。
import { Writable } from 'stream';
class SlowWriter extends Writable {
_write(
chunk: Buffer,
encoding: BufferEncoding,
callback: (error?: Error | null) => void
): void {
// 意図的に遅い書き込みをシミュレート
setTimeout(() => {
console.log('Writing chunk:', chunk.length, 'bytes');
callback();
}, 100);
}
}
// ❌ 悪い例:バックプレッシャーを無視
const reader = fs.createReadStream('large-file.dat');
const writer = new SlowWriter();
reader.on('data', (chunk) => {
writer.write(chunk); // writer が遅いと、メモリに溜まり続ける
});
// ✅ 良い例:pipe を使うと自動的に制御される
fs.createReadStream('large-file.dat').pipe(new SlowWriter());
手動でバックプレッシャーを制御
const reader = fs.createReadStream('large-file.dat');
const writer = new SlowWriter();
reader.on('data', (chunk) => {
const canContinue = writer.write(chunk);
if (!canContinue) {
// writer のバッファがいっぱいになったら読み込みを一時停止
reader.pause();
console.log('Reader paused due to backpressure');
}
});
writer.on('drain', () => {
// バッファが空になったら読み込みを再開
console.log('Writer drained, resuming reader');
reader.resume();
});
reader.on('end', () => {
writer.end();
});
7. パフォーマンス最適化のチェックリスト
必須項目
| 項目 | 推奨設定 | 理由 |
|------|----------|------|
| highWaterMark | 64KB〜1MB | デフォルト(16KB)より大きくすると I/O 効率向上 |
| objectMode | 必要な場合のみ true | オブジェクト単位の処理が必要な Transform のみ |
| encoding | 'utf8' を明示 | 文字列処理時は必ず指定 |
| pipeline 使用 | エラーハンドリング時 | pipe() より安全 |
実装例:highWaterMark の調整
// デフォルト(16KB)
const defaultStream = fs.createReadStream('file.dat');
// 大きなファイルの場合は highWaterMark を増やす
const optimizedStream = fs.createReadStream('large-file.dat', {
highWaterMark: 1024 * 1024, // 1MB
});
// ベンチマーク例
import { performance } from 'perf_hooks';
async function benchmark(highWaterMark: number): Promise<number> {
const start = performance.now();
const stream = fs.createReadStream('test-file.dat', { highWaterMark });
return new Promise((resolve) => {
let bytes = 0;
stream.on('data', (chunk) => {
bytes += chunk.length;
});
stream.on('end', () => {
const duration = performance.now() - start;
console.log(
`highWaterMark: ${highWaterMark}, Duration: ${duration.toFixed(2)}ms`
);
resolve(duration);
});
});
}
// 実行
await benchmark(16 * 1024); // 16KB(デフォルト)
await benchmark(64 * 1024); // 64KB
await benchmark(1024 * 1024); // 1MB
8. トラブルシューティング:よくあるエラーと対処法
エラー 1:EMFILE: too many open files
原因:同時に開いているファイルディスクリプタが OS の上限を超えた。
// ❌ 悪い例:一度に大量のファイルを開く
const files = await fs.promises.readdir('logs/');
files.forEach((file) => {
fs.createReadStream(`logs/${file}`).pipe(processStream);
});
// ✅ 良い例:並列数を制限
import pLimit from 'p-limit';
const limit = pLimit(10); // 同時に 10 ファイルまで
const files = await fs.promises.readdir('logs/');
await Promise.all(
files.map((file) =>
limit(() => processFile(`logs/${file}`))
)
);
async function processFile(path: string): Promise<void> {
return pipeline(
fs.createReadStream(path),
transformStream,
writeStream
);
}
エラー 2:ERR_STREAM_PREMATURE_CLOSE
原因:Stream が正常に終了する前に切断された。
// ✅ 対処法:finished イベントを使う
import { finished } from 'stream/promises';
const stream = fs.createReadStream('file.dat');
try {
stream.pipe(processStream);
await finished(stream);
console.log('Stream completed successfully');
} catch (err) {
console.error('Stream error:', err);
}
エラー 3:メモリリークの検出
// メモリ使用量をモニタリング
function logMemoryUsage(): void {
const usage = process.memoryUsage();
console.log({
rss: `${(usage.rss / 1024 / 1024).toFixed(2)} MB`,
heapUsed: `${(usage.heapUsed / 1024 / 1024).toFixed(2)} MB`,
});
}
setInterval(logMemoryUsage, 5000); // 5秒ごとに出力
// Stream 処理中
const stream = fs.createReadStream('large-file.dat');
stream.on('data', (chunk) => {
// 処理...
});
まとめ
Node.js の Stream API を使うことで、大容量データを効率的に処理できます。本記事で紹介した実装パターンを活用すれば、メモリ使用量を抑えつつ高速な処理が可能になります。
重要ポイント
- 小さなチャンクに分割:ファイル全体をメモリに読み込まない
- pipeline を活用:エラーハンドリングが簡潔になる
- バックプレッシャーを意識:
pipe()を使えば自動制御される - highWaterMark を調整:ファイルサイズに応じて最適化
実務での適用シーン
- CSV / JSON の大量データ処理
- ログファイルの集計・分析
- DB クエリ結果の API レスポンス
- ファイルアップロード・ダウンロード
Yureate では、Node.js を使った高速・スケーラブルなバックエンド開発を支援しています。大容量データ処理の最適化、パフォーマンス改善など、お気軽にご相談ください。
