stream

Namespace with methods that implement stream operations, and read is the only method currently supported.

Synchronous Stream Processing

const stream = require('spex')(Promise).stream;
const fs = require('fs');

const rs = fs.createReadStream('values.txt');

function receiver(index, data, delay) {
   console.log('RECEIVED:', index, data, delay);
}

stream.read(rs, receiver)
    .then(data => {
        console.log('DATA:', data);
    })
    .catch(error => {
        console.log('ERROR:', error);
    });

Asynchronous Stream Processing

const stream = require('spex')(Promise).stream;
const fs = require('fs');

const rs = fs.createReadStream('values.txt');

function receiver(index, data, delay) {
   return new Promise(resolve => {
       console.log('RECEIVED:', index, data, delay);
       resolve();
   });
}

stream.read(rs, receiver)
    .then(data => {
        console.log('DATA:', data);
    })
    .catch(error => {
        console.log('ERROR:', error);
   });
Properties:
Name Type Description
stream.read function

Consumes and processes data from a Readable stream.

Source:

Methods

(static) read(stream, receiver, optionsopt) → {external:Promise}

Consumes and processes data from a Readable stream.

It reads the entire stream, using either paused mode (default), or in chunks (see options.readChunks) with support for both synchronous and asynchronous data processing.

NOTE: Once the method has finished, the onus is on the caller to release the stream according to its protocol.

Parameters:
Name Type Attributes Description
stream Object

Readable stream object.

Passing in anything else will throw Readable stream is required.

receiver function | generator

Data processing callback (or generator).

Passing in anything else will throw Invalid stream receiver.

Parameters:

  • index = index of the call made to the function
  • data = array of all data reads from the stream's buffer
  • delay = number of milliseconds since the last call (undefined when index=0)

The function is called with the same this context as the calling method.

It can optionally return a promise object, if data processing is asynchronous. And if a promise is returned, the method will not read data from the stream again, until the promise has been resolved.

If the function throws an error or returns a rejected promise, the method rejects with the same error / rejection reason.

options Object <optional>

Optional Parameters.

Properties
Name Type Attributes Default Description
closable Boolean <optional>
false

Instructs the method to resolve on event close supported by the stream, as opposed to event end that's used by default.

readChunks Boolean <optional>
false

By default, the method handles event readable of the stream to consume data in a simplified form, item by item. If you enable this option, the method will instead handle event data of the stream, to consume chunks of data.

readSize Number <optional>

When the value is greater than 0, it sets the read size from the stream's buffer when the next data is available. By default, the method uses as few reads as possible to get all the data currently available in the buffer.

NOTE: This option is ignored when option readChunks is enabled.

Source:
Returns:

When finished successfully, resolves with object {calls, reads, length, duration}:

  • calls = number of calls made into the receiver
  • reads = number of successful reads from the stream
  • length = total length for all the data reads from the stream
  • duration = number of milliseconds consumed by the method

When it fails, the method rejects with the error/reject specified, which can happen as a result of:

  • event error emitted by the stream
  • receiver throws an error or returns a rejected promise
Type
external:Promise