Exploring @goodware/task-queue

node v8.17.0
version: 2.1.1
endpointsharetweet
Introduction @goodware/task-queue limits the number of synchronous and asynchronous tasks that can execute at once. - Read more about its motivation: https://link.medium.com/Q8rDyNTsXlb - git: https://github.com/good-ware/js-task-queue - npm: https://www.npmjs.com/package/@goodware/task-queue Simple Example This example runs at most two tasks at a time. It outputs: 2, 1, 4, 3.
const TaskQueue = require('@goodware/task-queue'); { const queue = new TaskQueue({ size: 2 }); // Task #1 : await push() returns immediately because the queue is empty. 'await' doesn't wait for the // task to complete. await queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 1 ${Date.now()}`); resolve(); }, 400) ) ); // Task #2 : await push() returns immediately because the queue has an open slot await queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 2 ${Date.now()}`); resolve(); }, 300) ) ); // The queue is full. Task #2 will finish in about 300 ms. // Task #3 : await push() waits until task #2 finishes await queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 3 ${Date.now()}`); resolve(); }, 200) ) ); // The queue is full again. 300 ms have already passed. Task #1 will terminate in about 100 ms, leaving // task #3 in the queue. // Task #4 : await push() waits until task #1 finishes const ret = await queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 4 ${Date.now()}`); resolve(); }, 100) ) ); // Wait for task #4 to finish await ret.promise; await queue.stop(); }
Minimizing Memory Usage via Backpressure push() returns a new Promise each time it is called, thus consuming memory. Depending on your application, it may be necessary to limit calls to push() when the queue is full if you are unable to control the number of calls to this method. For example, consider the following constraints: 1. Up to 5 tasks can call push() and immediately continue their work. Subsequent callers will wait until a worker has finished. 2. Up to 2 workers can execute at the same time Although it appears that resources are properly constrained in this scenario, if push() is called, say, 1,000 times a second, and the workers take longer than 1 second each, the process will likely run out of memory. One solution to this scenario is backpressure. No form of backpressure is a silver bullet. External systems must handle errors and retry.
{ const queue = new TaskQueue({ size: 5, workers: 2 }); let tasks = 0; // Wait 10 ms async function doWork() { const me = ++tasks; console.log(`${me} begin`); await new Promise((resolve) => setTimeout(resolve, 10)); console.log(`${me} end`); } // This will run fewer than 25 tasks. The actual number is random. for (let i = 1; i <= 25; ++i) { if (queue.full) { // The most basic implementation of backpressure: do nothing // Another interesting form of backpressure would be to call push() // infrequently. For example... if ((Math.random()*8) < 2) { console.log('infrequently queued'); await queue.push(doWork); } else { console.log('full'); } } else { console.log('queued'); await queue.push(doWork); } } await queue.stop(); console.log(`Ran ${tasks} tasks`); }
Wait Behavior "await wait()" waits for all functions passed to push() to finish their execution. If an functions return Promises, wait() will wait for the Promises to resolve. push() may be called during and after wait() executes.
Stop Behavior stop() causes push() to throw StoppedError. stop(). It can be undone by start(). "await stop()" otherwise behaves the same as "await wait()."
{ const queue = new TaskQueue({ size: 1 }); // Task #1 : await push() returns immediately because the queue is empty. 'await' doesn't wait for the // task to complete. await queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 1 ${new Date().toISOString()}`); resolve(); }, 400) ) ); // The following calls to push() return Promises. The provided // functions are not queued until there's an available slot in the // queue. The are queued in the order in which push() is called. // Task #2 (background) : await push() waits until task #1 finishes queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 2 ${new Date().toISOString()}`); resolve(); }, 2000) ) ); // Task #3 (background) : await push() waits until task #2 finishes queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 3 ${new Date().toISOString()}`); resolve(); }, 1000) ) ); // Task #4 (background) : await push() waits until task #2 finishes queue.push( () => new Promise((resolve) => setTimeout(() => { console.log(`Task 4 ${new Date().toISOString()}`); resolve(); }, 5) ) ); await queue.wait(); console.log('stopped'); }
Loading…

no comments

    sign in to comment