Any-Iterable Type.
Any Iterable Iterator.
Any-Iterator Type.
Any synchronous value type.
Iteration Session State.
An object with random properties, shared between callbacks during an iteration session, for any intermediate processing/tracking data that the callback logic may require.
Pipes a synchronous Iterable
through the list of operators, and returns IterableExt.
Pipes an AsyncIterable
through the list of operators, and returns AsyncIterableExt.
Converts any synchronous iterable into asynchronous one.
It makes possible to use asynchronous-only operators downstream, while also correctly casting all types in the pipeline, avoiding any ambiguity between synchronous and asynchronous iterables.
const i = pipe(
toAsync(source), // make iterable asynchronous
delay(500) // now can use asynchronous operators
);
Converts a synchronous Iterator
into a synchronous Iterable
, so it can be used as a pipeline source/input.
Note that an iterator type can only be determined by starting the iteration, which is what this method does. So if getting the first iterator value throws an error, it will occur outside the pipeline.
Passing it an already iterable object will just reuse it.
Converts an asynchronous AsyncIterator
into asynchronous AsyncIterable
, so it can be used as a pipeline source/input.
Note that an iterator type can only be determined by starting the iteration, which is what this method does. So if getting the first iterator value throws an error, it will occur outside the pipeline.
Passing it an already iterable object will just reuse it.
Converts a Promise
into a one-value AsyncIterable
, so it can be used as a pipeline source/input.
import {pipe, toIterable, spread} from 'iter-ops';
const input = Promise.resolve([1, 2, 3, 4, 5]);
const i = pipe(
toIterable(input),
spread()
); // = AsyncIterableExt<number>
for await(const a of i) {
console.log(a); // 1, 2, 3, 4, 5
}
Converts a simple value into a one-value synchronous iterable, so it can be used as a pipeline source/input.
Aggregates/accumulates all values into array, passes it into the callback/aggregate, to process the data and return the result. It is to simplify integration into the pipeline, of external functions that can only operate on a complete data set.
If the callback result is an iterable that you want to emit values, follow it up with the spread operator.
It basically repeats the logic of combining toArray + map.
import {pipe, aggregate, spread} from 'iter-ops';
const input = [7, 4, 3, 8, 2, 1]; // unsorted list
const i = pipe(
input,
aggregate(values => values.sort()), // sort the values
spread() // spread aggregation result
);
console.log(...i); //=> 1, 2, 3, 4, 7, 8
Catches iteration errors (see Error Handling).
What you can do inside the error handler:
ctx.emit(value)
)Merges current iterable with a list of values, iterators or iterables.
Merged inputs are iterated over after depleting the current iterable, in the order in which they were specified,
i.e. the standard Array.concat
logic.
Note that if you concatenate asynchronous iterables with a synchronous pipeline, they will be processed as simple values.
Goes through the entire iterable, counting the values, and produces a one-value iterable with the count.
import {pipe, count} from 'iter-ops';
const i = pipe(
'hello world!',
count()
);
console.log(...i); //=> 12
console.log(i.first); //=> 12
When the optional predicate is specified, only values that satisfy the predicate are counted.
import {pipe, count} from 'iter-ops';
const i = pipe(
'hello world!',
count(a => a === 'l')
);
console.log(...i); //=> 3
console.log(i.first); //=> 3
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Adds a default iterable to an empty pipeline.
import {pipe, defaultEmpty} from 'iter-ops';
const i = pipe(
[], // empty iterable/pipeline
defaultEmpty([1, 2, 3]) // default for an empty pipeline
);
console.log(...i); //=> 1, 2, 3
Note that if you add asynchronous defaults into a synchronous pipeline, they will be processed as simple values.
Adds a default iterator to an empty pipeline.
Adds a default value to an empty pipeline.
Emits unique values, with optional key selector.
import {pipe, distinct} from 'iter-ops';
const i = pipe(
[1, 1, 1, 2, 2, 2, 3, 3],
distinct() // selector not needed for simple types
);
console.log(...i); //=> 1, 2, 3
With optional selector function:
import {pipe, distinct} from 'iter-ops';
const i = pipe(
[{a: 1}, {a: 1}, {a: 2}, {a: 2}],
distinct(v => v.a)
);
console.log(...i); //=> {a: 1}, {a: 2}
Drains the iterable of all emitted values, and then emits an empty iterable. Therefore, it cannot be used for infinite sequences.
The operator doesn't change type of the previous iterable.
import {pipe, map, wait, drain, onEnd} from 'iter-ops';
const i = pipe(
iterable,
map(a => myService.request(a)), // map into promise
wait(), // resolve each promise
drain(), // drain all values
onEnd(s => {
console.log('duration:', s.duration);
})
);
// Below, even though we trigger iteration just for the first value,
// onEnd will still be triggerred, because we drain the iterable:
await i.first; // iterates to the first item
Emits an empty iterable, without pulling any values from the source, i.e. it simply replaces the source iterable with an empty one.
The operator doesn't change type of the previous iterable.
Standard Array.every
logic for the iterable, extended with iteration state + async.
It emits a boolean
, indicating whether all elements pass the predicate test.
import {pipe, every} from 'iter-ops';
const i = pipe(
[1, 2, 3],
every(a => a % 2 === 0) // checks if every number is even
);
console.log(...i); //=> false
console.log(i.first); //=> false
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Standard Array.filter
logic for the iterable, extended with iteration state + async.
In the example below, we take advantage of the IterationState, to detect and remove repeated values (do not confuse with distinct, which removes all duplicates).
import {pipe, filter} from 'iter-ops';
const i = pipe(
iterable,
filter((value, index, state) => {
if(value === state.previousValue) {
return false;
}
state.previousValue = value;
return true;
})
);
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Produces a one-value iterable, with the first emitted value.
Without the optional predicate, it is the same as take(1)
.
import {pipe, first} from 'iter-ops';
const i = pipe(
[10, 20, 30],
first()
);
console.log(...i); //=> 10
console.log(i.first); //=> 10
When the optional predicate is provided, the first value satisfying it will be emitted.
import {pipe, first} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5],
first(a => a % 2 === 0) // first even number
);
console.log(...i); //=> 2
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Emits indexed values that pass the predicate test.
import {pipe, indexBy} from 'iter-ops';
const i = pipe(
[12, 7, 30, 9],
indexBy(a => a % 2 === 0) // index even numbers
);
console.log(...i); //=> {index: 0, value: 12}, {index: 2, value: 30}
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Checks if the iterable is empty, and emits a boolean flag.
import {pipe, isEmpty} from 'iter-ops';
const i = pipe(
[],
isEmpty()
);
console.log(...i); //=> true
console.log(i.first); //=> true
Produces a one-value iterable, with the last emitted value.
import {pipe, last} from 'iter-ops';
const i = pipe(
[1, 2, 3],
last()
);
console.log(...i); //=> 3
console.log(i.first); //=> 3
When the optional predicate is provided, the last value satisfying it will be emitted.
import {pipe, last} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5, 6, 7, 8, 9],
last(a => a % 2 === 0) // last even number
);
console.log(i.first); //=> 8
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Standard Array.map
logic for the iterable, extended for supporting iteration state.
Notifies of the end of a successful iteration, for the immediately preceding operator, and provides a summary.
It doesn't handle or affect any upstream errors, and should they occur, it may never reach the end, and thus never trigger the notification.
The operator doesn't affect the iteration, unless the callback function throws an error.
import {pipe, map, wait, onEnd, catchError} from 'iter-ops';
const i = pipe(
asyncIterable,
map(a => myService.getValues(a)), // remap into requests-promises
wait(), // resolve requests
onEnd(s => {
if(s.avgDuration > 1000) {
// took longer than 1s per value on average;
throw new Error('Method getValues is too slow');
}
}),
catchError((err, ctx) => {
console.log(err?.message || err);
throw err;
})
);
Splits values into pages of fixed size (last page can be smaller).
import {pipe, page} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5],
page(2)
);
console.log(...i); //=> [1, 2], [3, 4], [5]
It throws an error when size
is less than 1 or not a number
.
Standard reducer for the iterable, extended for supporting iteration state.
Repeats every value specified number of times.
import {pipe, repeat} from 'iter-ops';
const i = pipe(
[1, 2, 3],
repeat(2)
);
console.log(...i); //=> 1, 1, 1, 2, 2, 2, 3, 3, 3
Repeats values while passing predicate test.
value
- repeated valueindex
- original value indexcount
- repeats count thus far (starts with 0)state
- iteration stateimport {pipe, repeat} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5, 6, 7],
repeat((a, idx, c) => a % 2 === 0 && c < 2) // repeat even numbers 2 times
);
console.log(...i); //=> 1, 2, 2, 2, 3, 4, 4, 4, 5, 6, 6, 6, 7
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Starts emitting values after count
number of values.
Standard Array.some
logic for the iterable, extended with iteration state + async.
It emits a boolean
, indicating whether at least one element passes the predicate test.
import {pipe, some} from 'iter-ops';
const i = pipe(
[1, 2, 3],
some(a => a % 2 === 0) // checks if even numbers are present
);
console.log(...i); //=> true
console.log(i.first); //=> true
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Splits values into separate lists when predicate returns true
(or resolves with true
).
When option toggle
is set, the split uses the toggle start/end logic.
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
When you know only the split value of each block, you can use the default split mode,
with carryEnd
set to 1/forward
(in case you do not want it skipped);
When you know only the end value of each block, you can use the default split mode,
with carryEnd
set to -1/back
(in case you do not want it skipped);
When you know both start and end values of each block, you can use the toggle
mode,
with carryStart
set to 1/forward
, and carryEnd
set to -1/back
, unless you want
either of those skipped, then leave them at 0/none
.
Note that in toggle
mode, you cannot use carryStart=back
(it will be ignored),
because it would delay emission of the current block indefinitely, plus carrying
block start backward doesn't make much sense anyway.
Spreads iterable values.
The source iterable is expected to emit iterable values only.
import {pipe, spread} from 'iter-ops';
const i = pipe(
['first', 'second'],
spread()
);
console.log(...i); //=> 'f', 'i', 'r', 's', 't', 's', 'e', 'c', 'o', 'n', 'd'
It will throw an iteration-time error, if a non-iterable value is encountered.
Starts emitting values, once the predicate test passes.
import {pipe, start} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5, 6, 7, 8, 9],
start(a => a === 5) // start emitting when 5 is encountered
);
console.log(...i); //=> 5, 6, 7, 8, 9
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Stops iteration, once the predicate test passes.
import {pipe, stop} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5, 6, 7, 8, 9],
stop(a => a === 5) // stop when 5 is encountered
);
console.log(...i); //=> 1, 2, 3, 4
Note that the predicate can only return a Promise
inside an asynchronous pipeline,
or else the Promise
will be treated as a truthy value.
Emits up to count
number of values, then stops iteration.
import {pipe, take} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5],
take(2)
);
console.log(...i); //=> 1, 2
Emits up to count
number of the last values.
import {pipe, takeLast} from 'iter-ops';
const i = pipe(
[1, 2, 3, 4, 5],
takeLast(2)
);
console.log(...i); //=> 4, 5
Taps into each value, without changing the output, for logging or debugging.
Measures timings for each value, and provides a notification callback.
It is mainly to help evaluate performance of asynchronous lengthy iterables, though it works synchronously also.
The operator doesn't affect the iteration, unless the callback function throws an error.
import {pipe, map, wait, timing, catchError} from 'iter-ops';
const i = pipe(
asyncIterable,
map(a => myService.requestData(a)), // map into promises
wait(), // resolve each promise
timing(t => {
if(t.duration > 3000) {
// took over 3s to get the value, needs investigation;
throw new Error(`Took too long to get value ${t.value} for index ${t.index}`);
}
}),
catchError((err, ctx) => {
console.log(err?.message || err);
throw err;
})
);
Accumulates all values and emits an array.
Zips values together by index, into an array, while all sources continue emitting.
import {pipe, zip} from 'iter-ops';
const i = pipe(
[1, 2, 3],
zip('hello') // <- any number of arguments
);
console.log(...i); //=> [1, 'h'], [2, 'e'], [3, 'l']
The operator takes any number of iterable or iterator arguments.
Delays each value by the specified timeout. When the timeout is a negative number, it is not added.
import {pipe, toAsync, delay} from 'iter-ops';
const data = [1, 2, 3, 4, 5]; // some synchronous data
const i = pipe(
toAsync(data), // make asynchronous
delay(1000)
);
for await(const a of i) {
console.log(a); //=> 1, 2, 3, 4, 5 (with 1s delay)
}
Throws an error during iteration, if inside a synchronous pipeline.
Delays each value by the specified timeout (as returned from the callback). When the timeout is a negative number, it is not added.
Note that it doesn't support return of Promise<number>
on purpose, to avoid
confusion with what operator throttle does.
Throws an error during iteration, if inside a synchronous pipeline.
When the iterable rejects, retries getting the value specified number of times.
Note that retries deplete values prior the operator that threw the error, and so it is often used in combination with operator repeat.
import {pipe, toAsync, tap, retry} from 'iter-ops';
const i = pipe(
toAsync([1, 2, 3, 4, 5, 6, 7, 8, 9]),
tap(value => {
if (value % 2 === 0) {
throw new Error(`fail-${value}`); // throw for all even numbers
}
}),
retry(1) // retry 1 time
);
for await(const a of i) {
console.log(a); // 1, 3, 5, 7, 9
}
Above, we end up with just odd numbers, because we do not provide any repeat logic,
and as a result, the retry
simply jumps to the next value on each error.
The method throws an error during iteration, if inside a synchronous pipeline.
When the iterable rejects, the callback is to return the flag, indicating whether we should retry getting the value one more time.
The callback is only invoked when there is a failure, and it receives:
index
- index of the iterable value that we failed to acquireattempts
- number of retry attempts made so far (starts with 0)state
- state for the entire iteration sessionNote that retries deplete values prior the operator that threw the error, and so it is often used in combination with operator repeat.
Throws an error during iteration, if inside a synchronous pipeline.
Emits each value after the callback result resolves, to control/mitigate the processing flow.
The resolved value itself is ignored.
import {pipe, toAsync, tap, throttle} from 'iter-ops';
const i = pipe(
toAsync([1, 2, 3, 4, 5]),
throttle(async (value, index, state) => {
await processValue(value);
}),
tap(value => {
// value = 1, 2, 3, 4, 5 (each delayed by processing time)
})
);
Throws an error during iteration, if inside a synchronous pipeline.
When the value is a Promise
, it is resolved, or else returned as is,
i.e. the same logic as for JavaScript operator await
.
It throws an error during iteration, if inside a synchronous pipeline.
import {pipe, toAsync, map, wait} from 'iter-ops';
const userIds = [1, 2, 3, 4, 5]; // synchronous list of user id-s
const i = pipe(
toAsync(userIds), // make pipeline asynchronous
map(id => myService.getUserData(id)), // map into promises
wait() // resolve each promise
);
for await(const user of i) {
console.log(user); // print details for each user
}
In case you want all promises resolved before emitting values:
import {pipe, toAsync, map, aggregate, wait, spread} from 'iter-ops';
const userIds = [1, 2, 3, 4, 5]; // synchronous list of user id-s
const i = pipe(
toAsync(userIds), // make pipeline asynchronous
map(id => myService.getUserData(id)), // map into promises
aggregate(list => Promise.all(list)), // resolve all promises
wait(), // resolve the list
spread() // emit each resolved value
);
for await(const user of i) {
console.log(user); // print details for each user
}
Any synchronous or asynchronous value type.