Node.js stream/promises module | API Reference | Bun
BuildDocsReferenceGuidesBlogDiscord/
node:stream/promisesFfinishedFpipelineSearch the reference...
/
BuildDocsReferenceGuidesBlogDiscord/
node:stream/promisesFfinishedFpipelineNode.js module
stream/promisesThe 'node:stream/promises' submodule provides Promise-based stream utility functions such as pipeline and finished, enabling async/await syntax for stream completion and pipeline composition.
Use it to coordinate multiple streams and handle errors cleanly in async code.
function
finished(stream: ReadableStream | WritableStream | ReadableStreamany> |
WritableStreamany>,options?:
FinishedOptions): Promisevoid>;
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';
const rs = createReadStream('archive.tar');
async function run() {
await finished(rs);
console.log('Stream is done reading.');
}
run().catch(console.error);
rs.resume(); // Drain the stream.
The finished API also provides a callback version.
stream.finished() leaves dangling event listeners (in particular 'error', 'end', 'finish' and 'close') after the returned promise is resolved or rejected. The reason for this is so that unexpected 'error' events (due to incorrect stream implementations) do not cause unexpected crashes. If this is unwanted behavior then options.cleanup should be set to true:
await finished(rs, { cleanup: true });
@returns
Fulfills when the stream is no longer readable or writable.
ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
```
The `pipeline` API also supports async generators:
```js
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
```
Remember to handle the `signal` argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.
```js
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
```
The `pipeline` API provides [callback version](https://nodejs.org/docs/latest-v26.x/api/stream.html#streampipelinesource-transforms-destination-callback):" data-algolia-static="false" data-algolia-merged="false" data-type="Function">function
pipelineA extends
PipelineSourceany>, B extends WritableStream |
WritableStreamany> |
TransformStreamany, any> |
WritableStreamstring |
BufferArrayBufferLike>> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineDestinationFunctionReadableStream, any> |
PipelineDestinationFunctionReadableStreamany>, any> |
PipelineDestinationFunctionTransformStreamany, any>, any> |
PipelineDestinationFunctionIterableany, any, any>, any> |
PipelineDestinationFunctionAsyncIterableany, any, any>, any> |
PipelineDestinationFunctionPipelineSourceFunctionany>, any>>(source: A,destination: B,options?:
PipelineOptions):
PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipelineA extends
PipelineSourceany>, T1 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorReadableStream, any> |
PipelineTransformGeneratorReadableStreamany>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorIterableany, any, any>, any> |
PipelineTransformGeneratorAsyncIterableany, any, any>, any> |
PipelineTransformGeneratorPipelineSourceFunctionany>, any>, B extends WritableStream |
WritableStreamany> |
TransformStreamany, any> |
WritableStreamstring |
BufferArrayBufferLike>> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineDestinationFunctionTransformStreamany, any>, any> |
PipelineDestinationFunctionReadWriteStream, any> |
PipelineDestinationFunctionTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorReadableStream, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorReadableStreamany>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorIterableany, any, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>>(source: A,transform1: T1,destination: B,options?:
PipelineOptions):
PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipelineA extends
PipelineSourceany>, T1 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorReadableStream, any> |
PipelineTransformGeneratorReadableStreamany>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorIterableany, any, any>, any> |
PipelineTransformGeneratorAsyncIterableany, any, any>, any> |
PipelineTransformGeneratorPipelineSourceFunctionany>, any>, T2 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorReadWriteStream, any> |
PipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, B extends WritableStream |
WritableStreamany> |
TransformStreamany, any> |
WritableStreamstring |
BufferArrayBufferLike>> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineDestinationFunctionTransformStreamany, any>, any> |
PipelineDestinationFunctionReadWriteStream, any> |
PipelineDestinationFunctionTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorReadWriteStream, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>>(source: A,transform1: T1,transform2: T2,destination: B,options?:
PipelineOptions):
PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipelineA extends
PipelineSourceany>, T1 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorReadableStream, any> |
PipelineTransformGeneratorReadableStreamany>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorIterableany, any, any>, any> |
PipelineTransformGeneratorAsyncIterableany, any, any>, any> |
PipelineTransformGeneratorPipelineSourceFunctionany>, any>, T2 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorReadWriteStream, any> |
PipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, T3 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorReadWriteStream, any> |
PipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, B extends WritableStream |
WritableStreamany> |
TransformStreamany, any> |
WritableStreamstring |
BufferArrayBufferLike>> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineDestinationFunctionTransformStreamany, any>, any> |
PipelineDestinationFunctionReadWriteStream, any> |
PipelineDestinationFunctionTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorReadWriteStream, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, any>>(source: A,transform1: T1,transform2: T2,transform3: T3,destination: B,options?:
PipelineOptions):
PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipelineA extends
PipelineSourceany>, T1 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorReadableStream, any> |
PipelineTransformGeneratorReadableStreamany>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorIterableany, any, any>, any> |
PipelineTransformGeneratorAsyncIterableany, any, any>, any> |
PipelineTransformGeneratorPipelineSourceFunctionany>, any>, T2 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorReadWriteStream, any> |
PipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, T3 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorReadWriteStream, any> |
PipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, T4 extends ReadWriteStream |
TransformStreamany, any> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineTransformGeneratorTransformStreamany, any>, any> |
PipelineTransformGeneratorReadWriteStream, any> |
PipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any>, any> |
PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, any>, B extends WritableStream |
WritableStreamany> |
TransformStreamany, any> |
WritableStreamstring |
BufferArrayBufferLike>> |
TransformStreamstring |
BufferArrayBufferLike>, any> |
PipelineDestinationFunctionTransformStreamany, any>, any> |
PipelineDestinationFunctionReadWriteStream, any> |
PipelineDestinationFunctionTransformStreamstring |
BufferArrayBufferLike>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorReadWriteStream, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring |
BufferArrayBufferLike>, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any>, any>, any> |
PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, any>, any>>(source: A,transform1: T1,transform2: T2,transform3: T3,transform4: T4,destination: B,options?:
PipelineOptions):
PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipeline(streams: readonly [
PipelineSourceany>,
PipelineTransformStreamsunknown, any> |
PipelineTransformGeneratorany, any>, WritableStream |
TransformStreamunknown, any> |
WritableStreamunknown> |
PipelineDestinationFunctionany, any>],options?:
PipelineOptions): Promisevoid>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipeline(...streams: [
PipelineSourceany>, ...
PipelineTransformStreamsunknown, any> |
PipelineTransformGeneratorany, any>[], WritableStream |
TransformStreamunknown, any> |
WritableStreamunknown> |
PipelineDestinationFunctionany, any>]): Promisevoid>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
function
pipeline(...streams: [
PipelineSourceany>, ...
PipelineTransformStreamsunknown, any> |
PipelineTransformGeneratorany, any>[], WritableStream |
TransformStreamunknown, any> |
WritableStreamunknown> |
PipelineDestinationFunctionany, any>, options:
PipelineOptions]): Promisevoid>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}
The pipeline API also supports async generators:
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.
import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');
The pipeline API provides callback version:
@returns
Fulfills when the pipeline is complete.
Type definitionsinterface
FinishedOptionscleanup?: boolean
If true, removes the listeners registered by this function before the promise is fulfilled.
error?: boolean
readable?: boolean
signal?:
AbortSignalwritable?: booleaninterface
PipelineOptionsend?: boolean
signal?:
AbortSignaltype
PipelineResultS extends
PipelineDestinationany, any>> = S extends (...args: any[]) => PromiseLikeinfer R> ? PromiseR> : Promisevoid>
Resources
ReferenceDocsGuidesDiscordMerch StoreGitHubBlog Toolkit
RuntimePackage managerTest runnerBundlerPackage runnerProject
Bun 1.0Bun 1.1Bun 1.2Bun 1.3RoadmapContributingLicenseBaked with ❤️ in San Francisco
We're hiring →