import { } from "@effectionx/node"import { } from "@effectionx/node/stream"import { } from "@effectionx/node/events"Node
Node.js-specific utilities for Effection programs. This package provides adapters for working with Node.js streams and event emitters using structured concurrency.
Installation
npm install @effectionx/node
Modules
This package provides two sub-modules:
@effectionx/node/stream- Stream utilities for Node.js@effectionx/node/events- Event utilities for Node.js EventEmitters
You can also import everything from the main module:
import { fromReadable, on, once } from "@effectionx/node";
Stream Utilities
fromReadable()
Convert a Node.js Readable stream to an Effection Stream.
import fs from "node:fs";
import { each, main } from "effection";
import { fromReadable } from "@effectionx/node/stream";
await main(function* () {
const fileStream = fs.createReadStream("./data.txt");
for (const chunk of yield* each(fromReadable(fileStream))) {
console.log(new TextDecoder().decode(chunk));
yield* each.next();
}
});
The returned stream emits Uint8Array chunks and automatically cleans up
event listeners when the stream is closed or the operation is shut down.
Event Utilities
on()
Create a Stream of events from any EventEmitter or EventTarget-like object.
This works with:
- Node.js EventEmitters (using
on/off) - DOM EventTargets (using
addEventListener/removeEventListener) - Web Worker's global
selfobject
import { each, main } from "effection";
import { on } from "@effectionx/node/events";
await main(function* () {
// With Node.js EventEmitter
for (const [chunk] of yield* each(on(stream, "data"))) {
console.log("data:", chunk);
yield* each.next();
}
// In a worker thread (EventTarget style)
for (const [event] of yield* each(on(self, "message"))) {
console.log("received:", event.data);
yield* each.next();
}
});
For EventEmitters, events are emitted as arrays of arguments. For EventTargets, events are emitted as single-element arrays containing the event object.
once()
Create an Operation that yields the next event to be emitted by an EventEmitter or EventTarget-like object.
import { main } from "effection";
import { once } from "@effectionx/node/events";
await main(function* () {
// Wait for a single message (EventTarget style)
const [event] = yield* once(self, "message");
console.log(event.data);
// Wait for a single event (EventEmitter style)
const [code] = yield* once(process, "exit");
console.log("Process exited with code:", code);
});
TypeScript Support
All exports include TypeScript type definitions. The event functions support generic type parameters for type-safe event handling:
import { once, on } from "@effectionx/node/events";
// Type the event arguments
const [code] = yield* once<[number]>(process, "exit");
// Type the stream events
for (const [data] of yield* each(on<[Buffer]>(stream, "data"))) {
// data is typed as Buffer
yield* each.next();
}
Interfaces
The event utilities work with any object that implements these interfaces:
// Node.js EventEmitter style
interface EventEmitterLike {
on(event: string, listener: (...args: unknown[]) => void): void;
off(event: string, listener: (...args: unknown[]) => void): void;
}
// DOM EventTarget style
interface EventTargetLike {
addEventListener(event: string, listener: (event: unknown) => void): void;
removeEventListener(event: string, listener: (event: unknown) => void): void;
}
API Reference
Convert a Node.js Readable stream to an Effection Stream.
Examples
Example 1
import fs from "node:fs";
import { fromReadable } from "@effectionx/node/stream";
import { each } from "effection";
const fileStream = fs.createReadStream("./data.txt");
for (const chunk of yield* each(fromReadable(fileStream))) {
console.log(new TextDecoder().decode(chunk));
yield* each.next();
}
Parameters
target: Readable
Return Type
Stream<Uint8Array, void>
Interface for objects that support Node.js EventEmitter-style event handling. This includes Node.js EventEmitter and worker_threads MessagePort.
Methods
on
(event: string, listener: (...args: unknown[]) => void): voidNo documentation available.
off
(event: string, listener: (...args: unknown[]) => void): voidNo documentation available.
Interface for objects that support DOM EventTarget-style event handling.
This includes browser EventTarget and web-worker's global self.
Methods
addEventListener
(event: string, listener: (event: unknown) => void): voidNo documentation available.
removeEventListener
(event: string, listener: (event: unknown) => void): voidNo documentation available.
Union type for objects that support either EventEmitter or EventTarget style.
function on<T extends unknown[]>(target: EventSourceLike | null, eventName: string): Stream<T, never>
Create a Stream of events from any EventEmitter or EventTarget-like object.
This works with:
- Node.js EventEmitters (using
on/off) - DOM EventTargets (using
addEventListener/removeEventListener) - web-worker's global
selfobject
For EventEmitters, events are emitted as arrays of arguments. For EventTargets, events are emitted as single-element arrays containing the event object.
See the guide on Streams and Subscriptions for details on how to use streams.
Examples
Example 1
import { on } from "@effectionx/node/events";
import { each } from "effection";
// In a worker thread (EventTarget style)
for (const [event] of yield* each(on(self, "message"))) {
console.log("received:", event.data);
yield* each.next();
}
// With Node.js EventEmitter
for (const [chunk] of yield* each(on(stream, "data"))) {
console.log("data:", chunk);
yield* each.next();
}
Type Parameters
T extends unknown[]
Parameters
target: EventSourceLike | null
- the event source whose events will be streamed
eventName: string
- the name of the event to stream. E.g. "message", "data"
Return Type
Stream<T, never>
a stream that will see one item for each event
function once<TArgs extends unknown[] = unknown[]>(target: EventSourceLike | null, eventName: string): Operation<TArgs>
Create an Operation that yields the next event to be emitted by an EventEmitter or EventTarget-like object.
This works with:
- Node.js EventEmitters (using
on/off) - DOM EventTargets (using
addEventListener/removeEventListener) - web-worker's global
selfobject
For EventEmitters, returns an array of arguments. For EventTargets, returns a single-element array containing the event object.
Examples
Example 1
import { once } from "@effectionx/node/events";
// Wait for a single message (EventTarget style)
const [event] = yield* once(self, "message");
console.log(event.data);
Type Parameters
TArgs extends unknown[] = unknown[]
Parameters
target: EventSourceLike | null
- the event source to be watched
eventName: string
- the name of the event to watch. E.g. "message", "close"
Return Type
Operation<TArgs>
an Operation that yields the next emitted event
./stream
Convert a Node.js Readable stream to an Effection Stream.
Examples
Example 1
import fs from "node:fs";
import { fromReadable } from "@effectionx/node/stream";
import { each } from "effection";
const fileStream = fs.createReadStream("./data.txt");
for (const chunk of yield* each(fromReadable(fileStream))) {
console.log(new TextDecoder().decode(chunk));
yield* each.next();
}
Parameters
target: Readable
Return Type
Stream<Uint8Array, void>
./events
Interface for objects that support Node.js EventEmitter-style event handling. This includes Node.js EventEmitter and worker_threads MessagePort.
Methods
on
(event: string, listener: (...args: unknown[]) => void): voidNo documentation available.
off
(event: string, listener: (...args: unknown[]) => void): voidNo documentation available.
Interface for objects that support DOM EventTarget-style event handling.
This includes browser EventTarget and web-worker's global self.
Methods
addEventListener
(event: string, listener: (event: unknown) => void): voidNo documentation available.
removeEventListener
(event: string, listener: (event: unknown) => void): voidNo documentation available.
Union type for objects that support either EventEmitter or EventTarget style.
function on<T extends unknown[]>(target: EventSourceLike | null, eventName: string): Stream<T, never>
Create a Stream of events from any EventEmitter or EventTarget-like object.
This works with:
- Node.js EventEmitters (using
on/off) - DOM EventTargets (using
addEventListener/removeEventListener) - web-worker's global
selfobject
For EventEmitters, events are emitted as arrays of arguments. For EventTargets, events are emitted as single-element arrays containing the event object.
See the guide on Streams and Subscriptions for details on how to use streams.
Examples
Example 1
import { on } from "@effectionx/node/events";
import { each } from "effection";
// In a worker thread (EventTarget style)
for (const [event] of yield* each(on(self, "message"))) {
console.log("received:", event.data);
yield* each.next();
}
// With Node.js EventEmitter
for (const [chunk] of yield* each(on(stream, "data"))) {
console.log("data:", chunk);
yield* each.next();
}
Type Parameters
T extends unknown[]
Parameters
target: EventSourceLike | null
- the event source whose events will be streamed
eventName: string
- the name of the event to stream. E.g. "message", "data"
Return Type
Stream<T, never>
a stream that will see one item for each event
function once<TArgs extends unknown[] = unknown[]>(target: EventSourceLike | null, eventName: string): Operation<TArgs>
Create an Operation that yields the next event to be emitted by an EventEmitter or EventTarget-like object.
This works with:
- Node.js EventEmitters (using
on/off) - DOM EventTargets (using
addEventListener/removeEventListener) - web-worker's global
selfobject
For EventEmitters, returns an array of arguments. For EventTargets, returns a single-element array containing the event object.
Examples
Example 1
import { once } from "@effectionx/node/events";
// Wait for a single message (EventTarget style)
const [event] = yield* once(self, "message");
console.log(event.data);
Type Parameters
TArgs extends unknown[] = unknown[]
Parameters
target: EventSourceLike | null
- the event source to be watched
eventName: string
- the name of the event to watch. E.g. "message", "close"
Return Type
Operation<TArgs>
an Operation that yields the next emitted event