Streams & Backpressure

intermediate streams backpressure pipe pipeline

Streams are how Node handles data we can’t (or don’t want to) hold all in memory at once. A 50GB log file, an HTTP request body, a video upload — we process it in chunks as it flows. In simple language, a stream is an iterable that emits pieces over time.

The four stream types

Readable
We read FROM it. Examples: fs.createReadStream, an HTTP request body, process.stdin.
Writable
We write TO it. Examples: fs.createWriteStream, an HTTP response, process.stdout.
Duplex
Both readable and writable, independent. Example: a TCP socket.
Transform
Duplex where output is computed from input. Examples: zlib.createGzip(), crypto.createCipher().

Reading a file with streams

The classic example. Reading a 10GB file with fs.readFile would blow up our memory. With streams, we process it 64KB at a time:

const fs = require("node:fs");

const stream = fs.createReadStream("./huge.log", { encoding: "utf8" });

stream.on("data", (chunk) => {
  console.log(`got ${chunk.length} bytes`);
});

stream.on("end", () => console.log("done"));
stream.on("error", (err) => console.error(err));

The internal buffer (the highWaterMark, default 64KB for byte streams) fills up, emits 'data', drains, fills again. Memory stays bounded no matter how big the file is.

Piping — connecting streams

Most of the time we don’t want to handle chunks manually. We chain streams with .pipe():

const fs = require("node:fs");
const zlib = require("node:zlib");

// Read → gzip → write — entire pipeline streamed
fs.createReadStream("./access.log")
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream("./access.log.gz"));

Three streams, zero buffering of the whole file. Each chunk flows through the chain.

Backpressure — the most important concept

Backpressure is what makes streams safe. In simple language — when the downstream is slow, the upstream needs to pause until the downstream catches up. Otherwise the slow side’s internal buffer grows without bound and we run out of memory.

Backpressure in action
Readable
fast disk
→ chunks →
Transform
gzip (slow!)
Writable
network
If gzip's buffer fills → write() returns false → readable PAUSES until 'drain' event.

When we call writable.write(chunk), it returns a boolean:

  • true — buffer has room, keep writing.
  • false — buffer is full, wait for the 'drain' event before writing more.

pipe() handles all this for us automatically. If we write streams manually, we have to respect that return value.

function pumpManually(readable, writable) {
  readable.on("data", (chunk) => {
    const ok = writable.write(chunk);
    if (!ok) {
      readable.pause(); // STOP reading
      writable.once("drain", () => readable.resume()); // resume when ready
    }
  });
}

pipeline() — the modern, safe way

pipe() has a famous flaw — if any stream in the middle errors out, the others don’t get destroyed and we leak. stream.pipeline() fixes that with proper error and cleanup handling:

const { pipeline } = require("node:stream/promises");
const fs = require("node:fs");
const zlib = require("node:zlib");

async function gzipFile(input, output) {
  await pipeline(
    fs.createReadStream(input),
    zlib.createGzip(),
    fs.createWriteStream(output)
  );
  console.log("done");
}

gzipFile("./access.log", "./access.log.gz").catch(console.error);

Always prefer pipeline over pipe for production code.

Async iteration

Modern Node lets us treat streams as async iterables — much cleaner than event listeners:

const fs = require("node:fs");

async function countLines(path) {
  const stream = fs.createReadStream(path, { encoding: "utf8" });
  let count = 0;

  for await (const chunk of stream) {
    count += (chunk.match(/\n/g) || []).length;
  }

  return count;
}

Object mode

By default streams move Buffers or strings. Set { objectMode: true } and we can pass arbitrary JS objects — useful for record-by-record processing pipelines (CSV rows, JSON lines, DB rows).

const { Transform } = require("node:stream");

const toUpper = new Transform({
  objectMode: true,
  transform(record, _enc, cb) {
    cb(null, { ...record, name: record.name.toUpperCase() });
  },
});

Common real-world uses

  • HTTP serversreq is a Readable, res is a Writable. Streaming a big response means streaming directly from a file or DB.
  • File uploads — pipe req through a parser, straight to S3 or disk.
  • Log processing — read a multi-GB log line by line with readline.
  • Data ETL — read DB rows as a stream, transform, write to another store.

Quick rules

  • Use pipeline() for any non-trivial chain.
  • Respect backpressure if you write streams manually.
  • Don’t JSON.stringify a 1GB object then write it — stream it.
  • For line-by-line text, use readline.createInterface({ input: stream }).