Effection Logo
@effectionx/nodev0.2.0thefrontside/effectionx
NPM Badge with published version
import { } from "@effectionx/node"
import { } from "@effectionx/node/stream"
import { } from "@effectionx/node/events"

Node

Node.js-specific utilities for Effection programs. This package provides adapters for working with Node.js streams and event emitters using structured concurrency.

Installation

npm install @effectionx/node

Modules

This package provides two sub-modules:

  • @effectionx/node/stream - Stream utilities for Node.js
  • @effectionx/node/events - Event utilities for Node.js EventEmitters

You can also import everything from the main module:

import { fromReadable, on, once } from "@effectionx/node";

Stream Utilities

fromReadable()

Convert a Node.js Readable stream to an Effection Stream.

import fs from "node:fs";
import { each, main } from "effection";
import { fromReadable } from "@effectionx/node/stream";

await main(function* () {
  const fileStream = fs.createReadStream("./data.txt");

  for (const chunk of yield* each(fromReadable(fileStream))) {
    console.log(new TextDecoder().decode(chunk));
    yield* each.next();
  }
});

The returned stream emits Uint8Array chunks and automatically cleans up event listeners when the stream is closed or the operation is shut down.

Event Utilities

on()

Create a Stream of events from any EventEmitter or EventTarget-like object.

This works with:

  • Node.js EventEmitters (using on/off)
  • DOM EventTargets (using addEventListener/removeEventListener)
  • Web Worker's global self object
import { each, main } from "effection";
import { on } from "@effectionx/node/events";

await main(function* () {
  // With Node.js EventEmitter
  for (const [chunk] of yield* each(on(stream, "data"))) {
    console.log("data:", chunk);
    yield* each.next();
  }

  // In a worker thread (EventTarget style)
  for (const [event] of yield* each(on(self, "message"))) {
    console.log("received:", event.data);
    yield* each.next();
  }
});

For EventEmitters, events are emitted as arrays of arguments. For EventTargets, events are emitted as single-element arrays containing the event object.

once()

Create an Operation that yields the next event to be emitted by an EventEmitter or EventTarget-like object.

import { main } from "effection";
import { once } from "@effectionx/node/events";

await main(function* () {
  // Wait for a single message (EventTarget style)
  const [event] = yield* once(self, "message");
  console.log(event.data);

  // Wait for a single event (EventEmitter style)
  const [code] = yield* once(process, "exit");
  console.log("Process exited with code:", code);
});

TypeScript Support

All exports include TypeScript type definitions. The event functions support generic type parameters for type-safe event handling:

import { once, on } from "@effectionx/node/events";

// Type the event arguments
const [code] = yield* once<[number]>(process, "exit");

// Type the stream events
for (const [data] of yield* each(on<[Buffer]>(stream, "data"))) {
  // data is typed as Buffer
  yield* each.next();
}

Interfaces

The event utilities work with any object that implements these interfaces:

// Node.js EventEmitter style
interface EventEmitterLike {
  on(event: string, listener: (...args: unknown[]) => void): void;
  off(event: string, listener: (...args: unknown[]) => void): void;
}

// DOM EventTarget style
interface EventTargetLike {
  addEventListener(event: string, listener: (event: unknown) => void): void;
  removeEventListener(event: string, listener: (event: unknown) => void): void;
}

API Reference

function fromReadable(target: Readable): Stream<Uint8Array, void>

Convert a Node.js Readable stream to an Effection Stream.

Examples

Example 1
import fs from "node:fs";
import { fromReadable } from "@effectionx/node/stream";
import { each } from "effection";

const fileStream = fs.createReadStream("./data.txt");

for (const chunk of yield* each(fromReadable(fileStream))) {
  console.log(new TextDecoder().decode(chunk));
  yield* each.next();
}

Parameters

target: Readable

Return Type

Stream<Uint8Array, void>

interface EventEmitterLike

Interface for objects that support Node.js EventEmitter-style event handling. This includes Node.js EventEmitter and worker_threads MessagePort.

Methods

on
(event: string, listener: (...args: unknown[]) => void): void

No documentation available.

off
(event: string, listener: (...args: unknown[]) => void): void

No documentation available.

interface EventTargetLike

Interface for objects that support DOM EventTarget-style event handling. This includes browser EventTarget and web-worker's global self.

Methods

addEventListener
(event: string, listener: (event: unknown) => void): void

No documentation available.

removeEventListener
(event: string, listener: (event: unknown) => void): void

No documentation available.

type EventSourceLike = EventEmitterLike | EventTargetLike

Union type for objects that support either EventEmitter or EventTarget style.

function on<T extends unknown[]>(target: EventSourceLike | null, eventName: string): Stream<T, never>

Create a Stream of events from any EventEmitter or EventTarget-like object.

This works with:

  • Node.js EventEmitters (using on/off)
  • DOM EventTargets (using addEventListener/removeEventListener)
  • web-worker's global self object

For EventEmitters, events are emitted as arrays of arguments. For EventTargets, events are emitted as single-element arrays containing the event object.

See the guide on Streams and Subscriptions for details on how to use streams.

Examples

Example 1
import { on } from "@effectionx/node/events";
import { each } from "effection";

// In a worker thread (EventTarget style)
for (const [event] of yield* each(on(self, "message"))) {
  console.log("received:", event.data);
  yield* each.next();
}

// With Node.js EventEmitter
for (const [chunk] of yield* each(on(stream, "data"))) {
  console.log("data:", chunk);
  yield* each.next();
}

Type Parameters

T extends unknown[]

Parameters

target: EventSourceLike | null

  • the event source whose events will be streamed

eventName: string

  • the name of the event to stream. E.g. "message", "data"

Return Type

Stream<T, never>

a stream that will see one item for each event

function once<TArgs extends unknown[] = unknown[]>(target: EventSourceLike | null, eventName: string): Operation<TArgs>

Create an Operation that yields the next event to be emitted by an EventEmitter or EventTarget-like object.

This works with:

  • Node.js EventEmitters (using on/off)
  • DOM EventTargets (using addEventListener/removeEventListener)
  • web-worker's global self object

For EventEmitters, returns an array of arguments. For EventTargets, returns a single-element array containing the event object.

Examples

Example 1
import { once } from "@effectionx/node/events";

// Wait for a single message (EventTarget style)
const [event] = yield* once(self, "message");
console.log(event.data);

Type Parameters

TArgs extends unknown[] = unknown[]

Parameters

target: EventSourceLike | null

  • the event source to be watched

eventName: string

  • the name of the event to watch. E.g. "message", "close"

Return Type

Operation<TArgs>

an Operation that yields the next emitted event

./stream

function fromReadable(target: Readable): Stream<Uint8Array, void>

Convert a Node.js Readable stream to an Effection Stream.

Examples

Example 1
import fs from "node:fs";
import { fromReadable } from "@effectionx/node/stream";
import { each } from "effection";

const fileStream = fs.createReadStream("./data.txt");

for (const chunk of yield* each(fromReadable(fileStream))) {
  console.log(new TextDecoder().decode(chunk));
  yield* each.next();
}

Parameters

target: Readable

Return Type

Stream<Uint8Array, void>

./events

interface EventEmitterLike

Interface for objects that support Node.js EventEmitter-style event handling. This includes Node.js EventEmitter and worker_threads MessagePort.

Methods

on
(event: string, listener: (...args: unknown[]) => void): void

No documentation available.

off
(event: string, listener: (...args: unknown[]) => void): void

No documentation available.

interface EventTargetLike

Interface for objects that support DOM EventTarget-style event handling. This includes browser EventTarget and web-worker's global self.

Methods

addEventListener
(event: string, listener: (event: unknown) => void): void

No documentation available.

removeEventListener
(event: string, listener: (event: unknown) => void): void

No documentation available.

type EventSourceLike = EventEmitterLike | EventTargetLike

Union type for objects that support either EventEmitter or EventTarget style.

function on<T extends unknown[]>(target: EventSourceLike | null, eventName: string): Stream<T, never>

Create a Stream of events from any EventEmitter or EventTarget-like object.

This works with:

  • Node.js EventEmitters (using on/off)
  • DOM EventTargets (using addEventListener/removeEventListener)
  • web-worker's global self object

For EventEmitters, events are emitted as arrays of arguments. For EventTargets, events are emitted as single-element arrays containing the event object.

See the guide on Streams and Subscriptions for details on how to use streams.

Examples

Example 1
import { on } from "@effectionx/node/events";
import { each } from "effection";

// In a worker thread (EventTarget style)
for (const [event] of yield* each(on(self, "message"))) {
  console.log("received:", event.data);
  yield* each.next();
}

// With Node.js EventEmitter
for (const [chunk] of yield* each(on(stream, "data"))) {
  console.log("data:", chunk);
  yield* each.next();
}

Type Parameters

T extends unknown[]

Parameters

target: EventSourceLike | null

  • the event source whose events will be streamed

eventName: string

  • the name of the event to stream. E.g. "message", "data"

Return Type

Stream<T, never>

a stream that will see one item for each event

function once<TArgs extends unknown[] = unknown[]>(target: EventSourceLike | null, eventName: string): Operation<TArgs>

Create an Operation that yields the next event to be emitted by an EventEmitter or EventTarget-like object.

This works with:

  • Node.js EventEmitters (using on/off)
  • DOM EventTargets (using addEventListener/removeEventListener)
  • web-worker's global self object

For EventEmitters, returns an array of arguments. For EventTargets, returns a single-element array containing the event object.

Examples

Example 1
import { once } from "@effectionx/node/events";

// Wait for a single message (EventTarget style)
const [event] = yield* once(self, "message");
console.log(event.data);

Type Parameters

TArgs extends unknown[] = unknown[]

Parameters

target: EventSourceLike | null

  • the event source to be watched

eventName: string

  • the name of the event to watch. E.g. "message", "close"

Return Type

Operation<TArgs>

an Operation that yields the next emitted event