Connect with us

Technology

How one can Implement a Easy Process Queue in Node.js – SitePoint


This tutorial explains and demonstrates queuing methods. Queues are sometimes used to course of long-running duties comparable to electronic mail e-newsletter supply.

It’s not all the time sensible to execute a process the second it’s requested.

Take into account an electronic mail e-newsletter administration system. After writing, an administrator should hit an enormous crimson “SEND NOW” button. The applying may ship each electronic mail instantly and present a “accomplished” response. That might work for a dozen messages, however how lengthy wouldn’t it take for 1,000 subscribers or extra? The browser request would day trip earlier than the method accomplished.

One other instance: a person can add any variety of pictures to a gallery utility. The system resizes and sharpens every picture for various dimensions. This course of may run on add, however it might to incur a delay for each picture.

It’s more practical to decouple duties in these conditions. The person receives an instantaneous response however process processing happens within the background. Different purposes or servers deal with duties and schedule re-attempts on failure. The person can obtain alerts or study logs to find out progress.

Queues

A queue is a knowledge construction which holds a set of things:

  • Any course of can ship (or enqueue) an merchandise at any time — comparable to ship e-newsletter X to recipient Y.
  • Any course of can obtain (or dequeue) the merchandise on the entrance of the queue — for instance, the merchandise that’s been within the queue for longest.

Queues are a first-in-first-out (FIFO) construction. The primary merchandise added to the queue would be the first out.

Implementing a Fundamental JavaScript Queue

You may create a queue utilizing a JavaScript array. The push() methodology provides an merchandise to the top of an Array whereas the shift() methodology removes and returns an merchandise from the beginning:

const queue = [];

queue.push( 'merchandise 1' );
queue.push( 'merchandise 2' );

console.log( queue.shift() ); 
console.log( queue.shift() ); 
console.log( queue.shift() ); 

Particular person array components can maintain any knowledge. You may push strings, numbers, Booleans, different arrays, or objects.

You should utilize an ES6 class to outline any variety of separate queues:

class Queue {

  constructor() { this.q = []; }
  ship( merchandise )  { this.q.push( merchandise ); }
  obtain()     { return this.q.shift(); }

}


const q1 = new Queue();
const q2 = new Queue();

q1.ship('merchandise 1');
q2.ship('merchandise 2');

console.log( q1.obtain() ); 
console.log( q1.obtain() ); 
console.log( q2.obtain() ); 

These easy examples could also be helpful for much less crucial client-side code comparable to queuing UI updates so processing happens in a single DOM replace. localStorage or IndexedDB can provide a stage of information persistence if obligatory.

Queuing Platforms

In-memory queues are much less sensible for complicated server purposes:

  1. Two or extra separate purposes can’t (simply) entry the identical queue.
  2. Queue knowledge disappears when the appliance terminates.

Function constructed message-broker software program offers extra strong queuing. Platforms range, however provide options comparable to:

  • knowledge persistence in a selection of databases with replication, sharding, and clustering choices
  • a spread of entry protocols, typically together with HTTP and Internet Sockets
  • any variety of separate queues
  • delayed messaging, the place message processing can happen at a later time
  • transaction-like assist, the place a message is re-queued when processing isn’t confirmed
  • publish-subscribe patterns, the place purposes obtain an occasion when a brand new merchandise seems on a queue

Message-broker software program contains Redis, RabbitMQ, Apache ActiveMQ, and Gearman. Cloud messaging providers embody Amazon SQS, Azure Service Bus, and Google Pub/Sub.

These could also be viable choices for enterprise-level purposes. But they might be overkill when you’ve got less complicated necessities and already use a database.

Use MongoDB as a Message Dealer

It’s attainable to develop a complicated queuing system in a few hundred strains of Node.js code.

The queue-mongodb module described right here makes use of MongoDB for knowledge storage, however the identical ideas might be adopted by any SQL or NoSQL database. The code is offered on GitHub and npm.

Fast Begin

Be sure you have Node.js 14 or above put in, then create a brand new undertaking folder comparable to queue-test. Add a brand new bundle.json file:

{
  "identify": "queue-test",
  "model": "1.0.0",
  "description": "Queue take a look at",
  "sort": "module",
  "scripts": {
    "ship": "node ./ship.js",
    "obtain": "node ./obtain.js"
  }
}

Notice: "sort": "module" configures the undertaking to make use of ES6 modules. The "scripts" will ship and obtain queued gadgets.

Set up the queue-mongodb module:

npm set up @craigbuckler/queue-mongodb

Then create a .env file together with your MongoDB database connection credentials. For instance:

QUEUE_DB_HOST=localhost
QUEUE_DB_PORT=27017
QUEUE_DB_USER=root
QUEUE_DB_PASS=mysecret
QUEUE_DB_NAME=qdb
QUEUE_DB_COLL=queue

Notice: this creates a queue assortment (QUEUE_DB_COLL) within the qdb database (QUEUE_DB_NAME). You should utilize an current database, however be sure the gathering doesn’t conflict with one other.

Database learn/write entry should be granted to the person root (QUEUE_DB_USER) with password mysecret (QUEUE_DB_PASS). Set each values clean if no authentication is required.

Begin a MongoDB database if it’s not already operating. These with Docker and Docker Compose can create a brand new docker-compose.yml file:

model: '3'

providers:

  queuedb:
    atmosphere:
      - MONGO_INITDB_ROOT_USERNAME=${QUEUE_DB_USER}
      - MONGO_INITDB_ROOT_PASSWORD=${QUEUE_DB_PASS}
    picture: mongo:4.4-bionic
    container_name: queuedb
    volumes:
      - queuedata:/knowledge/db
    ports:
      - "${QUEUE_DB_PORT}:${QUEUE_DB_PORT}"
    restart: all the time

volumes:
  queuedata:

Then run docker-compose up to obtain and begin MongoDB with a persistent knowledge quantity.

Docker is offered Linux, macOS, and Home windows 10. See the Docker set up directions.

Create a brand new ship.js file so as to add a randomly generated electronic mail messages to a queue named information:


import { Queue } from '@craigbuckler/queue-mongodb';


const newsQ = new Queue('information');


const identify = String.fromCharCode(65 + Math.random() * 26).repeat(1 + Math.random() * 10);


const ship = await newsQ.ship({
  identify:     identify,
  electronic mail:    `${ identify.toLowerCase() }@take a look at.com`,
  date:     new Date(),
  message:  `Hey there, ${ identify }!`
});

console.log('ship', ship);


console.log('gadgets queued:', await newsQ.rely());


await newsQ.shut();

Execute it with npm run ship and also you’ll see output comparable to this:

ship {
  _id: 607d692563bd6d05bb459931,
  despatched: 2021-04-19T11:27:33.000Z,
  knowledge: {
    identify: 'AAA',
    electronic mail: 'aaa@take a look at.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  }
}
gadgets queued: 1

The .ship() methodology returns an qItem object containing:

  1. the MongoDB doc _id
  2. the date/time the merchandise was initially queued, and
  3. a duplicate of the message knowledge

Run the script any variety of instances so as to add additional gadgets to the queue. The gadgets queued will increment on each run.

Now create a brand new obtain.js file to retrieve messages from the identical queue:


import { Queue } from '@craigbuckler/queue-mongodb';


const newsQ = new Queue('information');

let qItem;

do {

  qItem = await newsQ.obtain();

  if (qItem) {

    console.log('nreceive', qItem);

    
    

  }

} whereas (qItem);


console.log('gadgets queued:', await newsQ.rely());

await newsQ.shut();

Run npm run obtain to fetch and course of queued gadgets:

obtain {
  _id: 607d692563bd6d05bb459931,
  despatched: 2021-04-19T11:27:33.000Z,
  knowledge: {
    identify: 'AAA',
    electronic mail: 'aaa@take a look at.com',
    date: 2021-04-19T11:27:33.426Z,
    message: 'Hey there, AAA!'
  }
}
gadgets queued: 0

No electronic mail is shipped on this instance, however that might be applied utilizing Nodemailer or one other appropriate module.

If processing fails — maybe as a result of the mail server is down — an merchandise could be re-queued with this:

newsQ.ship( qItem.knowledge, 600 );

The second 600 argument is an non-obligatory variety of seconds or future date. This command re-queues the merchandise after 600 seconds (ten minutes) have elapsed.

It is a easy instance, however any utility can ship knowledge to any variety of queues. One other course of, maybe began as a cron job, can obtain and course of gadgets when obligatory.

How the queue-mongodb Module Works

The sort string handed to the category constructor defines a queue identify. The .ship() methodology creates a brand new MongoDB doc when handed knowledge so as to add to the queue. The MongoDB doc incorporates:

  1. A MongoDB _id (the creation date/time is encoded throughout the worth).
  2. The queue sort.
  3. A processing date/time worth named proc. It’s attainable to set a future time however the present time is the default.
  4. The merchandise knowledge. This may be something: a Boolean, quantity, string, array, object, and so forth.

The .obtain() methodology locates the oldest doc that has an identical sort and a proc date/time previously. The doc is formatted, returned to the calling code, and deleted from the database.

The next sections describe the module in additional element.

queue-mongodb Module: Initialization

The dotenv module reads the .env atmosphere variables if obligatory. A database connection object is created utilizing the official mongodb driver module:


import dotenv from 'dotenv';
import mongoDB from 'mongodb';


if (!course of.env.QUEUE_DB_HOST) {
  dotenv.config();
}


const
  dbName = course of.env.QUEUE_DB_NAME || 'qdb',
  qCollectionName = course of.env.QUEUE_DB_COLL || 'queue',
  qAuth = course of.env.QUEUE_DB_USER ? `${ course of.env.QUEUE_DB_USER }:$ '' @` : '',

  dbClient = new mongoDB.MongoClient(
    `mongodb://${ qAuth }$ 'localhost' :$/`,
    { useNewUrlParser: true, useUnifiedTopology: true }
  );

The qCollection variable holds a reference to the database’s queue assortment (outlined by QUEUE_DB_COLL). It’s created and returned by the dbConnect() perform, which additionally defines the gathering schema and indexes when obligatory. All Queue strategies run const q = await dbConnect(); to get the gathering reference:

let qCollection; 



async perform dbConnect() {

  
  if (qCollection) return qCollection;

  
  await dbClient.join();

  
  const
    db = dbClient.db( dbName ),
    colList = await db.listCollections({ identify: qCollectionName }, { nameOnly: true }).toArray();

  if (!colList.size) {

    
    let $jsonSchema = {
      bsonType: 'object',
      required: [ 'type', 'proc', 'data' ],
      properties: {
        sort: { bsonType: 'string', minLength: 1 },
        proc: { bsonType: 'date' }
      }
    };
    await db.createCollection(qCollectionName, { validator: { $jsonSchema } });

    
    await db.assortment( qCollectionName ).createIndexes([
      { key: { type: 1 } },
      { key: { proc: 1 } }
    ]);

  }

  
  qCollection = db.assortment( qCollectionName );
  return qCollection;

}

The dbClose() perform closes the database connection:


async perform dbClose() {

  if (qCollection) {
    await dbClient.shut();
    qCollection = null;
  }

}

queue-mongodb Module: Queue Constructor

The Queue constructor units the queue sort or identify:

export class Queue {

  constructor(sort = 'DEFAULT') {

    this.sort = sort;

  }

queue-mongodb Module: Queue.ship() Technique

The .ship() methodology provides knowledge to the queue with the suitable sort. It has an non-obligatory delayUntil parameter, which provides an merchandise to the queue at a future time by specifying quite a few seconds or a Date().

The tactic inserts a brand new doc into the database and returns a qItem object ( { _id, despatched, knowledge} ) or null if unsuccessful:

  async ship(knowledge = null, delayUntil) {

    attempt {

      
      let proc = new Date();
      if (delayUntil instanceof Date) {
        proc = delayUntil;
      }
      else if (!isNaN(delayUntil)) {
        proc = new Date( +proc + delayUntil * 1000);
      }

      
      const
        q     = await dbConnect(),
        ins   = await q.insertOne({
          sort: this.sort, proc, knowledge
        });

      
      return ins && ins.insertedCount && ins.insertedId ? { _id: ins.insertedId, despatched: ins.insertedId.getTimestamp(), knowledge } : null;

    }
    catch(err) {

      console.log(`Queue.ship error:n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.obtain() Technique

The .obtain() methodology retrieves and deletes the oldest queued merchandise within the database with a particular sort and a proc date/time previously. It returns a qItem object ( {_id, despatched, knowledge} ) or null if nothing is offered or an error happens:

  async obtain() {

    attempt {

      
      const
        now = new Date(),
        q   = await dbConnect(),
        rec = await q.findOneAndDelete(
          {
            sort: this.sort,
            proc: { $lt: now }
          },
          {
            kind: { proc: 1 }
          }
        );

      const v = rec && rec.worth;

      
      return v ? { _id: v._id, despatched: v._id.getTimestamp(), knowledge: v.knowledge } : null;

    }
    catch(err) {

      console.log(`Queue.obtain error:n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.take away() Technique

The .take away() methodology deletes the queued merchandise recognized by a qItem object ( {_id, despatched, knowledge} ) returned by the .ship() methodology. It may be used to take away a queued merchandise no matter its place within the queue.

The tactic returns the variety of deleted paperwork (usually 1) or null when an error happens:

  async take away(qItem) {

    
    if (!qItem || !qItem._id) return null;

    attempt {

      const
        q   = await dbConnect(),
        del = await q.deleteOne({ _id: qItem._id });

      return del.deletedCount;

    }
    catch(err) {

      console.log(`Queue.take away error:n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.purge() Technique

The .purge() methodology deletes all queued gadgets of the identical sort and returns the variety of deletions:

  async purge() {

    attempt {

      const
        q   = await dbConnect(),
        del = await q.deleteMany({ sort: this.sort });

      return del.deletedCount;

    }
    catch(err) {

      console.log(`Queue.purge error:n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.rely() Technique

The .rely() methodology returns the variety of queued gadgets of the identical sort:

  async rely() {

    attempt {

      const q = await dbConnect();
      return await q.countDocuments({ sort: this.sort });

    }
    catch(err) {

      console.log(`Queue.rely error:n${ err }`);
      return null;

    }

  }

queue-mongodb Module: Queue.shut() Technique

The .shut() methodology runs the dbClose() perform to terminate the database connection so the Node.js occasion loop can finish:

  async shut() {

    attempt {

      await dbClose();

    }
    catch(err) {

      console.log(`Queue.shut error:n${ err }`);
      return null;

    }

  }


}

A New Queue

Queues are a consideration for any net utility with computationally costly features that might trigger a bottleneck. They will enhance efficiency and upkeep by decoupling purposes into smaller, quicker, extra strong processes. Devoted message dealer software program is an possibility, however easy queuing methods are attainable in a number of dozen strains of code.

Click to comment

Leave a Reply

Your email address will not be published. Required fields are marked *