温馨提示:本站仅提供公开网络链接索引服务,不存储、不篡改任何第三方内容,所有内容版权归原作者所有
AI智能索引来源:http://www.bun.com/reference/node/stream/promises
点击访问原文链接

Node.js stream/promises module | API Reference | Bun

Node.js stream/promises module | API Reference | BunBuildDocsReferenceGuidesBlogDiscord/node:stream/promisesFfinishedFpipeline

Search the reference...

/

BuildDocsReferenceGuidesBlogDiscord/node:stream/promisesFfinishedFpipeline

Node.js module

stream/promises

The 'node:stream/promises' submodule provides Promise-based stream utility functions such as pipeline and finished, enabling async/await syntax for stream completion and pipeline composition.

Use it to coordinate multiple streams and handle errors cleanly in async code.

function finished(stream: ReadableStream | WritableStream | ReadableStreamany> | WritableStreamany>,options?: FinishedOptions): Promisevoid>;
import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('archive.tar');

async function run() {
await finished(rs);
console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.

The finished API also provides a callback version.

stream.finished() leaves dangling event listeners (in particular 'error', 'end', 'finish' and 'close') after the returned promise is resolved or rejected. The reason for this is so that unexpected 'error' events (due to incorrect stream implementations) do not cause unexpected crashes. If this is unwanted behavior then options.cleanup should be set to true:

await finished(rs, { cleanup: true });
@returns

Fulfills when the stream is no longer readable or writable.

ac.abort()); try { await pipeline( createReadStream('archive.tar'), createGzip(), createWriteStream('archive.tar.gz'), { signal }, ); } catch (err) { console.error(err); // AbortError } ``` The `pipeline` API also supports async generators: ```js import { pipeline } from 'node:stream/promises'; import { createReadStream, createWriteStream } from 'node:fs'; await pipeline( createReadStream('lowercase.txt'), async function* (source, { signal }) { source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. for await (const chunk of source) { yield await processChunk(chunk, { signal }); } }, createWriteStream('uppercase.txt'), ); console.log('Pipeline succeeded.'); ``` Remember to handle the `signal` argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete. ```js import { pipeline } from 'node:stream/promises'; import fs from 'node:fs'; await pipeline( async function* ({ signal }) { await someLongRunningfn({ signal }); yield 'asd'; }, fs.createWriteStream('uppercase.txt'), ); console.log('Pipeline succeeded.'); ``` The `pipeline` API provides [callback version](https://nodejs.org/docs/latest-v26.x/api/stream.html#streampipelinesource-transforms-destination-callback):" data-algolia-static="false" data-algolia-merged="false" data-type="Function">function pipelineA extends PipelineSourceany>, B extends WritableStream | WritableStreamany> | TransformStreamany, any> | WritableStreamstring | BufferArrayBufferLike>> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineDestinationFunctionReadableStream, any> | PipelineDestinationFunctionReadableStreamany>, any> | PipelineDestinationFunctionTransformStreamany, any>, any> | PipelineDestinationFunctionIterableany, any, any>, any> | PipelineDestinationFunctionAsyncIterableany, any, any>, any> | PipelineDestinationFunctionPipelineSourceFunctionany>, any>>(source: A,destination: B,options?: PipelineOptions): PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipelineA extends PipelineSourceany>, T1 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorReadableStream, any> | PipelineTransformGeneratorReadableStreamany>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorIterableany, any, any>, any> | PipelineTransformGeneratorAsyncIterableany, any, any>, any> | PipelineTransformGeneratorPipelineSourceFunctionany>, any>, B extends WritableStream | WritableStreamany> | TransformStreamany, any> | WritableStreamstring | BufferArrayBufferLike>> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineDestinationFunctionTransformStreamany, any>, any> | PipelineDestinationFunctionReadWriteStream, any> | PipelineDestinationFunctionTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorReadableStream, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorReadableStreamany>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorIterableany, any, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>>(source: A,transform1: T1,destination: B,options?: PipelineOptions): PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipelineA extends PipelineSourceany>, T1 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorReadableStream, any> | PipelineTransformGeneratorReadableStreamany>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorIterableany, any, any>, any> | PipelineTransformGeneratorAsyncIterableany, any, any>, any> | PipelineTransformGeneratorPipelineSourceFunctionany>, any>, T2 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorReadWriteStream, any> | PipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, B extends WritableStream | WritableStreamany> | TransformStreamany, any> | WritableStreamstring | BufferArrayBufferLike>> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineDestinationFunctionTransformStreamany, any>, any> | PipelineDestinationFunctionReadWriteStream, any> | PipelineDestinationFunctionTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorReadWriteStream, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>>(source: A,transform1: T1,transform2: T2,destination: B,options?: PipelineOptions): PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipelineA extends PipelineSourceany>, T1 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorReadableStream, any> | PipelineTransformGeneratorReadableStreamany>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorIterableany, any, any>, any> | PipelineTransformGeneratorAsyncIterableany, any, any>, any> | PipelineTransformGeneratorPipelineSourceFunctionany>, any>, T2 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorReadWriteStream, any> | PipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, T3 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorReadWriteStream, any> | PipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, B extends WritableStream | WritableStreamany> | TransformStreamany, any> | WritableStreamstring | BufferArrayBufferLike>> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineDestinationFunctionTransformStreamany, any>, any> | PipelineDestinationFunctionReadWriteStream, any> | PipelineDestinationFunctionTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorReadWriteStream, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, any>>(source: A,transform1: T1,transform2: T2,transform3: T3,destination: B,options?: PipelineOptions): PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipelineA extends PipelineSourceany>, T1 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorReadableStream, any> | PipelineTransformGeneratorReadableStreamany>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorIterableany, any, any>, any> | PipelineTransformGeneratorAsyncIterableany, any, any>, any> | PipelineTransformGeneratorPipelineSourceFunctionany>, any>, T2 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorReadWriteStream, any> | PipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, T3 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorReadWriteStream, any> | PipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, T4 extends ReadWriteStream | TransformStreamany, any> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineTransformGeneratorTransformStreamany, any>, any> | PipelineTransformGeneratorReadWriteStream, any> | PipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any>, any> | PipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, any>, B extends WritableStream | WritableStreamany> | TransformStreamany, any> | WritableStreamstring | BufferArrayBufferLike>> | TransformStreamstring | BufferArrayBufferLike>, any> | PipelineDestinationFunctionTransformStreamany, any>, any> | PipelineDestinationFunctionReadWriteStream, any> | PipelineDestinationFunctionTransformStreamstring | BufferArrayBufferLike>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamany, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorReadWriteStream, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadWriteStream, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamstring | BufferArrayBufferLike>, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStream, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorReadableStreamany>, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorTransformStreamany, any>, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorIterableany, any, any>, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorAsyncIterableany, any, any>, any>, any>, any>, any>, any> | PipelineDestinationFunctionPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineTransformGeneratorPipelineSourceFunctionany>, any>, any>, any>, any>, any>>(source: A,transform1: T1,transform2: T2,transform3: T3,transform4: T4,destination: B,options?: PipelineOptions): PipelineResultB>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipeline(streams: readonly [PipelineSourceany>, PipelineTransformStreamsunknown, any> | PipelineTransformGeneratorany, any>, WritableStream | TransformStreamunknown, any> | WritableStreamunknown> | PipelineDestinationFunctionany, any>],options?: PipelineOptions): Promisevoid>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipeline(...streams: [PipelineSourceany>, ...PipelineTransformStreamsunknown, any> | PipelineTransformGeneratorany, any>[], WritableStream | TransformStreamunknown, any> | WritableStreamunknown> | PipelineDestinationFunctionany, any>]): Promisevoid>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

function pipeline(...streams: [PipelineSourceany>, ...PipelineTransformStreamsunknown, any> | PipelineTransformGeneratorany, any>[], WritableStream | TransformStreamunknown, any> | WritableStreamunknown> | PipelineDestinationFunctionany, any>, options: PipelineOptions]): Promisevoid>;
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
await pipeline(
createReadStream('archive.tar'),
createGzip(),
createWriteStream('archive.tar.gz'),
{ signal },
);
} catch (err) {
console.error(err); // AbortError
}

The pipeline API also supports async generators:

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.

import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

The pipeline API provides callback version:

@returns

Fulfills when the pipeline is complete.

Type definitionsinterface FinishedOptionscleanup?: boolean

If true, removes the listeners registered by this function before the promise is fulfilled.

error?: booleanreadable?: booleansignal?: AbortSignalwritable?: booleaninterface PipelineOptionsend?: booleansignal?: AbortSignaltype PipelineResultS extends PipelineDestinationany, any>> = S extends (...args: any[]) => PromiseLikeinfer R> ? PromiseR> : Promisevoid>

Resources

ReferenceDocsGuidesDiscordMerch StoreGitHubBlog 

Toolkit

RuntimePackage managerTest runnerBundlerPackage runner

Project

Bun 1.0Bun 1.1Bun 1.2Bun 1.3RoadmapContributingLicense

Baked with ❤️ in San Francisco

We're hiring →

Node.js stream/promises module | API Reference | Bun,AI智能索引,全网链接索引,智能导航,网页索引

    The