Socket Pooling on Node.js

UPDATE: This post is pretty old and nowadays there are quite a few socket pooling implementations for Node.js. I recommend taking look into Jackpot (https://github.com/3rd-Eden/jackpot). It’s simple and does the job.

I was looking for connection pooling for my Apple Push notification proxy but couldn’t find proper pool implementation, just some experiments that didn’t do proper error handling or could not handle basic real life requirements. I also needed a pool that could be called by both blocking and non-blocking mode.

Basic functionality requirements of a typical connection pool

  • Ensure that connections are available and no more than maximum number of parallel connections exists
  • Do not keep unnecessary connections open for long. Less traffic, less connections.
  • Handle connection decay. For example if connection gets closed by remote peer while it’s waiting in pool.
  • Sane retry logic. Most importantly do not retry connections as fast as CPU allows in case of error.
  • Can either guarantee waiting time, or supports way of checking connection availability.

Lets define first the interface as Node.js module that exports reserve and release methods.

exports.ConnectionPool = function(factory) {
 var self = {};
 var waitlist = [] ;   // callbacks waiting for connection
 var connections = [];  // unused connections currently in pool
 var ccount = 0;  // number of current connections

 // called to reserve connection from pool. Calls callback without connection if wait is false
 self.reserve = function(callback, wait) { ... }
 // returns connection to the pool. Connection is destroyed if destroy is true
 self.release = function(connection, destroy) { ... }
}

Pool requires user provided factory that is dictionary that defines 3 functions and one property.

factory = {
create : function(callback) { ... }  // create connection and call callback with it
validate: function(validate) { .. } // return true or false for connection
destroy: function(connection) { .. }  // destroy connection
max: 5
}

Pool implementation needs several methods for house keeping. checkWaiters() is called to create new connections for waiting callbacks.

function checkWaiters() {
  if(waitlist.length > 0 && ccount < factory.max) {
     ccount++;
     factory.create(function(connection) {          
       if(!connection) {
         ccount--; // failed                    
       } else {
         if(waitlist.length > 0)
            waitlist.shift()(connection);
         else
           connections.push(connection);
       }
    });                           
  }      
 }

Then its counterpart, destroyConnection() that removes connection from pool for good. This function can be called from several places and situations so it adds “deleted” flag to the connection to avoid duplicate processing. It also tries to recreate new connection immediately (if needed) by calling checkWaiters().

function destroyConnection(connection) {
    if(connection.destroyed) {
       return;
    }
    connection.destroyed = true;
    for(var i=0; i < connections.length; i++) {
        // remove from pool if it's there                                
        if(connection == connections[i]) {
           clearTimeout(connection.timeoutid);
           connections.splice(i,1);
        }
    }
    ccount--;
    factory.destroy(connection);

    // connection was lost, we need to create new one if there are       
    // waiting requests                                                  
    checkWaiters();
 }

Then the actual reserve interface method. Function has two modes, if wait is false it returns immediately if it fails to give connection, otherwise the callback goes to the waiting list.  Connections from pool are validated with factory before they are passed to the callback.

self.reserve = function(callback, wait) {
    if (wait == undefined) {
        wait = true;
    }
    if(connections.length > 0) {  // pool has available connections
        connection = connections.shift();
        if(factory.validate(connection)) {  // is it still valid
           clearTimeout(connection.timeoutid);  // cancel the cleanup timeout
           callback(connection);
           return;
        } else {
            destroyConnection(connection);  // stale connection
        }
    }
    if(ccount >= factory.max) { // maximum number of connections created
       if(!wait) {
          callback();
       } else {
          waitlist.push(callback);
       }
       return;
    }
    ccount++;  // try to create connection
    factory.create(function(connection) {
        if(!connection) {
           ccount--; // failed                                          
           if(!wait) {
              callback();
           } else {
              waitlist.push(callback);
           }
        } else {
          callback(connection); // connection created successfully
        }
    });
 }

And the release method. Release method destroys connection if requested and forgets about it then completely. Otherwise it tries to find callback from waiting list and passes the connection for immediate reuse. In case there is nobody waiting, it puts the connection back  to pool and times the connection cleanup event in 10 seconds.

self.release = function(connection, destroy) {
    if(destroy) {
       destroyConnection(connection);
    } else {
       if(waitlist.length > 0) {
          waitlist.shift()(connection);
         return;
       }
       connections.push(connection);
       connection.timeoutid = setTimeout(function() {
           destroyConnection(connection);
       }, 10000);
    }
 }

And finally the background polling that is responsible mainly of connection retry logic. It polls the waiting list once per second, as you remember that function creates connections if there is anyone waiting for it. The connections are normally created on demand in reserve() call.

function poll() {
   checkWaiters()
   setTimeout(poll, 1000);
}
setTimeout(poll, 1000);  // start poller

How this then handles the different error cases?

  • Idle connections are cleaned by timeout call that is set on release()
  • User code can request connection delete by setting the destroy flag to true in call to release()
  • Connections that go bad while in pool are (hopefully) intercepted by user provided factory.validate()
  • Counter keeps track of maximum number of created connections and because its increased before creating connection it also limits maximum number of parallel connection attempts!
  • When connection creation fails, the callback goes to waiting list and poll per second tries to create new parallel connections.
  • User code that needs process immediately can set wait flag to false when reserving connection. Code could be also changed to timed out callback call, so wait could be defined as milliseconds instead of instant fail v.s. infinite wait as its done now.

Then how to use it.

HTTP Pool Example

var pool = require('./pool');
 var http = require('http');
var httppool = pool.ConnectionPool({
 create: function(callback) { callback(http.createClient(80, "www.google.com")); },
 validate: function(connection) { return true; /* no need to validate  */ },
 destroy : function(httpclient) {  /* nothing to destroy */},
 max: settings.couchdbmax
 });

Using the pool

 httppool.reserve(function(connection) {
   var req = connection.request("GET", "/index.html");
   req.on('error', function(error) {
     httppool.release(connection, true);
   });
   req.on('response', function(response) {
     body = '';
     response.on('data', function(data) {
     body += data;
   });
   response.on('end', function() {
     console.log(body)
     httppool.release(connection);
   });
 });

Client socket pool

var pool = require('./pool');
var net = require('net');
var apnpool = pool.ConnectionPool({
 create: function(callback) {
   function errorcb(error) {  // error handler
     require('util').puts(error.stack);
     callback();
   }
   connection = net.createConnection(12345, 'server.example.com');
   connection.once('error', errorcb); 
   connection.on('connect', function() {
     connection.removeListener('error', errorcb); // clear error handler before passing forward
     callback(connection);
   });
 },
 validate: function(connection) { return connection.writable; },
 destroy : function(connection) { connection.end(); },
 max: 5,
 });

Reuse is little bit tricky with sockets, as you need probably set and clear ‘error’ and  ‘data’ event handlers for reading the responses in each worker.

 

 

Leave a comment