CouchDB cleanup script for purging old docs

CouchDB does not have straightforward ways to clean up old data. This is one simple way do delete entries by date, but it requires that

  • Your documents have date or timestamp property
  • There is view for each database to fetch documents for that property

Prerequisities

  1. Node.js
  2. jss module, i.e. ‘npm install jss’.

1. Prepare views for Cleanup

Define view in each database that needs regular cleanup. Use something like this where emitted key field is timestamp in seconds.

views: {
    created: {
	map: function(doc) {
		if ( doc.created ) {
			emit(doc.created, doc._rev);
		}
	}
    }
    ....
}

2. The Cleanup script

Script queries old doc ids from the cleanup view and marks them as deleted. Documents are not deleted immediately but are removed physically on next CouchDB compact. CouchDB 1.2.0 supports autocompact so just enable it and don’t worry about it.

#!/bin/bash

DBHOST=localhost

# Get key for entries that are over 6 months old. This assumes that created view can be queried using timestamps as keys.
if uname -a | grep -i darwin > /dev/null
then
	TODAY=$(date '+%Y-%m-%d')
	MONTHSAGO=$(date -v -24w '+%Y-%m-%d')
	MONTHSAGO_E=$(date -v -24w '+%s')
else
	TODAY=$(date '+%Y-%m-%d')
	MONTHSAGO=$(date -d '24 weeks ago' '+%Y-%m-%d')
	MONTHSAGO_E=$(date -d '24 weeks ago' '+%s')
fi

PATH=$PATH:/usr/local/bin

# JSON scripting tool
JSS=$(npm bin)/jss

cleanup() {
	DATABASE=$1
	DESIGN=$2

	echo "Cleaning $DATABASE/$DESIGN"
	curl --silent -S http://$DBHOST:5984/$DATABASE/_design/$DESIGN/_view/created?endkey=$MONTHSAGO_E | \
		$JSS --bulk_docs '$.id' '{_id: $.id, _rev:$.value, _deleted:true}' | \
		curl --silent -S -X POST -d @-  -H "Content-Type:application/json" http://$HOST:$PORT/$DATABASE/_bulk_docs | \
		sed 's/\({[^}]*}\),/\1\n/g' | tr -d '[]' | \
		$JSS '$.ok != true'
}

echo "STATS CLEANUP <= $MONTHSAGO - Start" `date`

# Put databases and views here
cleanup somedb1 someview
cleanup somedb1 otherview
cleanup somedb2 alsoview

echo "STATS CLEANUP - Done" `date`

The script does this

  1. Get expired docs, e.g. curl ‘http://localhost:5984/mydatabase/_design/mydesign/_view/created?endkey=1337049581&#8217;
  2. Build bulk doc delete request (jss)
  3. Issue delete bulk request (curl post)
  4. sanitize couchdb output, i.e. add newlines and remove brackets (sed)
  5. print failed ones

Note that default version of jss doesn’t output proper JSON if no documents are found, use my fork to workaround this problem if you dont want to see errors in logs.

npm install https://github.com/tikonen/jss/tarball/master

Cross Domain data channel with HTML5 Canvas

Standard Ajax is restricted to single origin policy so JSONP is the de-facto way for exchanging data with cross-domain sources and it works pretty well. Alternative, though bit hacky way, is to use HTML5 Canvas as cross domain work-around using pseudo Images as “covert channel”.

Basic idea is simple, javascript in client requests image file from 3rd party site where server encodes a data to the Image, client can use cookies and url parameters to identify itself as desired. Then client renders image, and decodes the data from the image pixels.

Backend

Backend needs to be able to construct images with custom pixel level data, in this example we use Node.js and canvas module that is server side HTML5 Canvas implementation based on Cairo graphics library.

This function accepts any object and returns canvas object that contains the objects JSON presentation encoded in image pixels.

function encodeDataToImage( data ) {

	// Convert data to binary buffer while being utf-8
	var s = encodeURIComponent( JSON.stringify(data) );
	var buffer = new Buffer(s, 'utf8');
	var pixelc = (buffer.length / 3) + (buffer.length % 3 ? 1 : 0)

	// Encode data as PNG image
	var Canvas = require('canvas');
	var canvas = new Canvas(pixelc, 1)
	var ctx = canvas.getContext('2d');
	var imgdata = ctx.getImageData(0, 0, pixelc, 1);

	for (var i=0, k=0; i < pixelc * 4; i += 4 ) {
		imgdata.data[i + 3] = 0xFF; // set alpha to full opaque
		for (var j=0; j < 3 && k < buffer.length; k++, j++ ) {
			imgdata.data[i + j] = buffer[k];
		}
	}
	// set "image" data
	ctx.putImageData(imgdata, 0, 0);
	return canvas;
}

Define xd request handler that builds and sends the data coded image to the client. (Example in Express.js).

someapp.get('/xd', function(req, res ) {
    // do here something with query or cookies, like resolve uid and set
    // data.
    // Example data
    var data = { a: 1, en: 'owl', fi: 'pöllö', es: 'búho', uid: req.query.uid }

    var canvas = encodeDataToImage( data );
    var img = canvas.toBuffer();
    res.contentType('png');
    res.header('Content-Length', img.length);
    res.send( img );
});

Browser
At browser side load the image and decode it back to object

function queryXD( query, callback ) {

	var img = new Image();
	img.src = 'http://some.site.example.com/xd?' + query;
	img.addEventListener('load', function() {

		// Image loaded, create temporary canvas
		var canvas = document.createElement('canvas');
		var ctx = canvas.getContext('2d');

		// draw image on canvas
		canvas.width = img.width;
		canvas.height = img.height;
		ctx.drawImage( img, 0, 0 );

		// collect bytes from image pixels
		var bytes = [];
		var imgdata = ctx.getImageData(0, 0, img.width, img.height);
		for (var i=0; i < img.width * 4; i++ ) {
			if ( i && (i + 1) % 4 == 0) {
				i++;
			}
			var b = imgdata.data[i];
			if (!b) {
				break;
			}
			bytes.push( b );
		}

		// convert bytes to string and parse JSON
		var s = decodeURIComponent( String.fromCharCode.apply(null, bytes) );
		var data = JSON.parse(s);

		callback(false, data);
	}, false);

        // image failed to load
	img.addEventListener('error', function(err) {
		callback(err);
	}, false);
}

And now its simple to do cross domain data exchange like

queryXD('uid=2134', function(err, data) {
   alert(data.en + ' is ' + data.fi + ' in Finnish and ' + data.es + ' in Spanish');
});

Caveats
Proxies and browsers like to cache the images, so use every time unique dummy parameter to force fetch.

Cached sequential unique identifiers with Node.js and MongoDB

Acquiring sequential id with MongoDB is simple, as it supports

$inc

command for atomic sequence increment. However, naive implementation requires hit to database every single time id is required, and this can create latency and overhead issues. Typical case is for user tracking where application needs to get unique global id for every user in load balanced array of node.js instances.

This is more optimized method and works if you’re running simultaneously several Node.js instances. This method fetches a unique number range from database, uses them from memory and fetches new range when it runs out. This example assumes you use https://github.com/mongodb/node-mongodb-native .

Initialize the id starting point.

On instance startup, the implementation initializes the id to starting value (if it does not exists) and fetch the current status from database. In this example the starting value is 1000.

function init_id( seqname, next ) {
    idcollection = new mongodb.Collection(client, 'ids');
    function _findId() {
        idcollection.findOne({_id: seqname}, function(err, doc) {
            if ( err ) { console.log( 'ERROR MONGO', 'ids', err ); return next(err); }
            if( doc ) {
               return next( false, { _id: seqname, waiters: [], high: doc.index, index: doc.index } )
            }
            idcollection.insert( {_id: seqname, index: 1000}, {safe: true}, function(err, doc) {
                if ( err ) { console.log( 'ERROR MONGO', 'ids', err ); return next(err); }
                return _findId();
            });
        });
    }
    _findId();
}

callback ‘next’ is called with object initialized to the current range from database.

    init_id( 'myseq', function(err, idstatus ) {
        // we have now id status
    }
...

Sequence generation function

Next we define function that is called to fetch the next id. Tricky part is that if code needs to fetch next batch of unique identifiers it needs to queue the other callers until fetch completes so we don’t end up fetching more than one range increment at a time.

The high and index properties were set to current value in initialization so first call to next_id will always trigger fetch.

var INDEX_STEP = 10; // range to prefetch per query

function next_id( idstatus, next ) {

    if (idstatus.high > idstatus.index) {
        // id available from memory
        return next(false, idstatus.index++);
    }

    // need to fetch, put callback in wait list
    idstatus.waiters.push( next )

    if (idstatus.infetch) {
       // already fetch in progress
       return;
    }

    // initiate fetch
    _fetch( INDEX_STEP );

    function _fetch( step ) {
        // use findandmodify to increment index and return new value
        idstatus.infetch = true;
        idcollection.findAndModify( {_id: idstatus._id}, [['_id','asc']],
				    {$inc: {index: step}},
		   		    {new: true}, _after_fetch);
    }

    function _after_fetch(err, object) {

        function _notify_waiters( err ) {
            // give id to all waiters
            while ( idstatus.waiters.length ) {
                if ( err ) {
                    (idstatus.waiters.shift())( err )
                } else {
                     if (idstatus.high <= idstatus.index) {
                        // we got more waiters during fetch and
                        // exhausted this batch, get next batch
                        return _fetch( INDEX_STEP );
                     }
                    (idstatus.waiters.shift())( false, idstatus.index++ )
                }
            }
           idstatus.infetch = false;
        }

        if (err) return _notify_waiters( err )
        if (!object) return _notify_waiters('index not found')

        idstatus.high = object.index

        // the current index must be reset to the allocated range
        // start, because there could be several parallel nodes making
        // incremental queries to the db so each node does not get
        // sequential ranges.
        idstatus.index = object.index - INDEX_STEP

        _notify_waiters();
    }
}

Code gets next id as argument to callback

    next_id( idstatus, function(err, id) {

        // 'id' is next unique id to use!
    });

Note.

  • Identifiers are sequential (growing) but not incremental, as multiple node instances will at some point make requests at the same time.
  • Each startup increments the current value of sequence in database by STEP_INDEX amount if next_id is called at least once
  • INDEX_STEP must be large enough to avoid race condition, or optimally should implement some kind of exponential retry

EC2 EBS Backup Python script

This is simple EC2 backup script that snapshots listed EBS volumes daily. Script keeps maximum number of daily, weekly and monthly snapshots per volume and checks if daily backup has already been done or in progress, so it does not make duplicates for single day.

Prerequisities

1. Ec2 command line tools.
Check that you can run them from command line

$ ec2-describe-snapshots
SNAPSHOT	snap-070cba6c	vol-123123	completed	2012-04-19T02:06:54+0000	100%	457025778133		my.com root
SNAPSHOT	snap-170cba7c	vol-455445	completed	2012-04-19T02:07:09+0000	100%	457025778133	10	my.net root
...

2. Fabric administration and deployment scripting tool

Install with easy_install or pip

<pre>$ sudo easy_install fabric</pre>

See http://docs.fabfile.org/en/1.4.1/installation.html for more details

3. The script.
Copy following to ec2-backup.py and replace the BACKUP_VOLS array with your own volumes and their descriptions. Script is also available in GitHub.

import os, sys, time
import dateutil.parser
from datetime import date, timedelta, datetime

from fabric.api import (local, settings, abort, run, lcd, cd, put)
from fabric.contrib.console import confirm
from fabric.api import env

# for each volume, define the how many daily, weekly and monthly backups
# you want to keep. For weekly monday's backup is kept and for the each month
# the one from 1st day.
BACKUP_VOLS = {
	'vol-abc1234': {'comment': 'my.com root', 'days': 7, 'weeks': 4, 'months': 4},
	'vol-1234565': {'comment': 'my.com database', 'days': 7, 'weeks': 4, 'months': 4},
}


today = date.today()

snapshots = {}
hastoday = {}
savedays = {}	# retained snapshot days for each volume

for (volume, conf) in BACKUP_VOLS.items():
	daylist = savedays[volume] = []
	# last n days
	for c in range(conf['days'] - 1, -1, -1):
		daylist.append(today - timedelta(days=c))
	# last n weeks (get mondays)
	monday = today - timedelta(days=today.isoweekday() - 1)
	daylist.append(monday)
	for c in range(conf['weeks'] - 1, 0, -1):
		daylist.append(monday - timedelta(days=c * 7))
	# last n months (first day of month)
	for c in range(conf['months'] - 1, -1, -1):
		daylist.append(datetime(today.year, today.month - c, 1).date())

SNAPSHOTS = local('ec2-describe-snapshots', capture=True).split('\n')

SNAPSHOTS = [tuple(l.split('\t')) for l in SNAPSHOTS if l.startswith('SNAPSHOT')]

for (_, snapshot, volume, status, datestr, progress, _, _, _) in SNAPSHOTS:
	snapshotdate = dateutil.parser.parse(datestr).date()
	if volume in BACKUP_VOLS:
		if snapshotdate == today:
			hastoday[volume] = {'status': status, 'snapshot': snapshot, 'progress': progress.replace('%', '')}
		if volume not in snapshots:
			snapshots[volume] = []
		snapshots[volume].append((snapshot, status, snapshotdate))

for snapshotlist in snapshots.values():
	snapshotlist.sort(key=lambda x: x[2], reverse=True)

for volume in BACKUP_VOLS.keys():
	if volume not in snapshots:
		snapshots[volume] = []

print "VOLUME\tSNAPSHOT\tSTATUS\tDATE\tDESC"
for (volume, snapshotlist) in snapshots.items():
	for (snapshot, status, date) in snapshotlist:
		datestr = date.strftime('%Y-%m-%d')
		print "%s\t%s\t%s\t%s\t%s" % (volume, snapshot, status, datestr, BACKUP_VOLS[volume]['comment'])


def status():
	pass


def backup(dryrun=False):
	print "\nCREATING SNAPSHOTS"
	for (volume, snapshotlist) in snapshots.items():
		if volume in hastoday:
			print '%s has %s%% %s snapshot %s for today "%s"' % (volume,
															hastoday[volume]['progress'],
															hastoday[volume]['status'],
															hastoday[volume]['snapshot'],
															BACKUP_VOLS[volume]['comment'])
		else:
			print 'creating snapshot for %s "%s"' % (volume, BACKUP_VOLS[volume]['comment'])
			snapshotlist.insert(0, ('new', 'incomplete', today))
			if not dryrun:
				local('ec2-create-snapshot %s -d "%s"' % (volume, BACKUP_VOLS[volume]['comment']))

	print "\nDELETING OLD SNAPSHOTS"
	for (volume, snapshotlist) in snapshots.items():
		for (snapshot, _, date) in snapshotlist:
			if not date in savedays[volume]:
				datestr = date.strftime('%Y-%m-%d')
				print "deleting %s %s for %s (%s)" % (snapshot, datestr, volume, BACKUP_VOLS[volume]['comment'])
				if not dryrun:
					with settings(warn_only=True):
						local('ec2-delete-snapshot %s' % snapshot)


def dryrun():
	print """

*** DRY RUN ***

"""
	backup(dryrun=True)

You can dry run the script first to see what it would do

$ fab -f ec2-backup.py dryrun

To make actual backup

$ fab -f ec2-backup.py backup

Example output

$ fab -f ec2-backup.py backup
[localhost] local: ec2-describe-snapshots
VOLUME	SNAPSHOT	STATUS	DATE	DESC
vol-abc1234	snap-48fe4023	completed	2012-04-24	my.com database
vol-abc1234	snap-23863a48	completed	2012-04-23	my.com database
vol-abc1234	snap-838131e8	completed	2012-04-20	my.com database
vol-abc1234	snap-1b0cba70	completed	2012-04-19	my.com database
vol-abc1234	snap-0d4ffb66	completed	2012-04-17	my.com database
vol-1234565	snap-42fe4029	completed	2012-04-24	my.com root
vol-1234565	snap-25863a4e	completed	2012-04-23	my.com root
vol-1234565	snap-858131ee	completed	2012-04-20	my.com root
vol-1234565	snap-1f0cba74	completed	2012-04-19	my.com root
vol-1234565	snap-034ffb68	completed	2012-04-17	my.com root

CREATING SNAPSHOTS
creating snapshot for vol-abc1234 "my.com database"
[localhost] local: ec2-create-snapshot vol-abc1234 -d "my.com database"
SNAPSHOT	snap-8ccd74e7	vol-abc1234	pending	2012-04-25T02:18:58+0000		457025778133	50	my.com database
creating snapshot for vol-1234565 "my.com root"
[localhost] local: ec2-create-snapshot vol-1234565 -d "my.com root"
SNAPSHOT	snap-86cd74ed	vol-1234565	pending	2012-04-25T02:19:03+0000		457025778133	8	my.com root

DELETING OLD SNAPSHOTS
deleting snap-0d4ffb66 2012-04-17 for vol-abc1234 (my.com database)
[localhost] local: ec2-delete-snapshot snap-0d4ffb66
SNAPSHOT	snap-0d4ffb66
deleting snap-034ffb68 2012-04-17 for vol-1234565 (my.com root)
[localhost] local: ec2-delete-snapshot snap-034ffb68
SNAPSHOT	snap-034ffb68

Done.

If you try to run it again, it will notify about already running backups

...

CREATING SNAPSHOTS
vol-abc1234 has 55% pending snapshot snap-8ccd74e7 for today "my.com database"
vol-1234565 has 100% completed snapshot snap-86cd74ed for today "my.com root"

...

HTML5 Canvas Layout and Mobile Devices

Common problem with Canvas and mobile devices is how to get the canvas to fill the browser window properly. This can be tricky and require lots of tweaking and testing with different devices to get it exactly right.

Even if you get the size correctly defined, the rotation is another hurdle and the layout could break after orientation change or two.

I wrote an example of simple layout page, that should work both on desktops and mobile devices Android (>2.2) and iPhone/iPad. It should appear as following layout in all browsers shown here in the iPhone screenshots and not break on resize or orientation change.

portrait

 

The layout works also after rotation.

layout_l

Page defines a canvas (green), that occupies most of the screen and under that a fixed height div (yellow) containing ‘Some Text Here’. On every resize the code draws black rectangle that is -10 pixels short from each canvas border and writes number of orientation changes and the resize events for debugging purposes. The document background is defined blue to reveal possible unwanted overflows.

How it works?

DOM/CSS

First the meta elements tell to mobile devices how to handle the page. No scaling and width is fixed to device width.

<meta name="viewport" content="user-scalable=no, initial-scale=1.0, maximum-scale=1.0, width=device-width">
<meta name="apple-mobile-web-app-capable" content="yes">

The document is wrapped in single div (“container”) that contains the canvas and the fixed height div.

<body>
   <div id="container">
     <canvas id="canvas">HTML5 Canvas not supported.</canvas>
     <div id="fix">Some Text Here</div>
   </div>
...

Container is forced to fill the browser window by CSS rule that defines overflow as auto and width/height 100%.

body,html
{
    height: 100%;
    margin: 0;
    padding: 0;
    color: black;
}
#container
{
    width: 100%;
    height: 100%;
    overflow: auto;
}

Canvas element is inside the container and has no initial height and width. It is defined as display block in CSS to avoid unwanted padding or margins. Canvas default display is inline, that is something you almost never want.

#container canvas {
    vertical-align: top;
    display: block;
    overflow: auto;
}

Finally div (“fix”) is defined with fixed height

#fix {
    background: yellow;
    height: 20px;
}

This is not enough though, and some JS handling is required for resize and the orientation change.

Javascript

The JS listens both timeout and orientation change events and installs a timeout function that gets cancelled if browser sends several events rapidly.

var resizeTimeout;
$(window).resize(function() {
    clearTimeout(resizeTimeout);
    resizeTimeout = setTimeout(resizeCanvas, 100);
});

var otimeout;
window.onorientationchange = function() {
    clearTimeout(otimeout);
    otimeout = setTimeout(orientationChange, 50);
}

Orientation change listener does nothing important, it just updates the counter for debugging purposes.

The resizeCanvas is more involved. When browser is iPhone it first increases the container height 60 pixels higher than the browser window height. This makes possible to scroll the window down and hide the iPhone Safari address bar.

if (ios) {
    // increase height to get rid off ios address bar
    $("#container").height($(window).height() + 60)
    setTimeout(function() { window.scrollTo(0, 1);  }, 100);
}

Then it gets the container width and height, that are height and width of the browser window.

var width = $("#container").width();
var height = $("#container").height();

And finally forces the canvas size and width to the required. The height is subtracted by 20 to leave room for the fixed height div.

cheight = height - 20; // subtract the fix div height
cwidth = width;

// set canvas width and height
$("#canvas").attr('width', cwidth);
$("#canvas").attr('height', cheight);

There could be better way to do this, but at least this seems to be pretty robust and works in all major desktop and mobile browsers.

Code is available in Github.

Keeping CouchDB design docs up to date with Node.js

CouchDB views are defined typically as Javascript snippets and are part of special documents called design documents. I noticed that keeping these design documents up to date during development is pretty cumbersome and error prone. So I devised simply way to keep them updated using Node.js and Cradle couchdb driver.

Idea is to define the views in as variables in runnable js script and run that with Node each time it’s changed.

Here is the code. Copy it to e.g. cdb-views.js.

var cradle = require('cradle');

cradle.setup({ host: 'localhost',
               port: 5984,
               options: { cache:true, raw: false }});

var cclient = new (cradle.Connection)

function _createdb(dbname) {
    var db = cclient.database(dbname);
    db.exists(function(err, exists) {
        if (!exists) {
            db.create()
        }
    });
    return db;
}
var DB_SOMETHING = _createdb('somedb')

function cradle_error(err, res) {
    if (err) console.log(err)
}


function update_views( db, docpath, code ) {

    function save_doc() {
        db.save(docpath, code, function(err) {
            // view has changed, so initiate cleanup to get rid of old
            // indexes
            db.viewCleanup( cradle_error );
        });

        return true;
    }

    function compare_code( str1, str2 ) {
        var p1 = str1.split('\n');
        var p2 = str2.split('\n');

        for ( var i=0; i < p1.length || i < p2.length; i++ ) {
            var l1 = p1[i];
            var l2 = p2[i];
            l1 = l1 ? l1.trim() : '';
            l2 = l2 ? l2.trim() : '';
            if ( !l1 && !l2 ) continue;
            if ( l1 != l2 ) return true;
        }
        return false;
    }

    // compare function definitions in document and in code
    function compare_def(docdef, codedef) {
        var i = 0;

        if (!docdef && codedef) {
            console.log('creating "' + docpath +'"')
            return true;
        }
        if (!codedef && docdef) {
            console.log('removing "' + docpath +'"')
            return true;
        }
        if (!codedef && !docdef) {
            return false;
        }

        for (var u in docdef) {

            i++;
            if (codedef[u] == undefined) {
                console.log('definition of "' + u + '" removed - updating "' + docpath +'"')
                return true;
            }

            if (typeof(codedef[u]) == 'function') {
                if (!codedef[u] || compare_code( docdef[u], codedef[u].toString()) ) {
                    console.log('definition of "' + u + '" changed - updating "' + docpath +'"')
                    return true;
                }
            } else for (var f in docdef[u]) {
                i++;
                if (!codedef[u][f] || compare_code( docdef[u][f], codedef[u][f].toString()) ) {
                    console.log('definition of "' + u + '.' + f + '" changed - updating "' + docpath +'"')
                    return true;
                }

            }
        }
        // check that both doc and code have same number of functions
        for (var u in codedef) {
            i--;
            if (typeof(codedef[u]) != 'function') {
                for (var f in codedef[u]) {
                    i--;
                }
            }
        }
        if (i != 0) {
            console.log('new definitions - updating "' + docpath +'"')
            return true;
        }

        return false;
    }

    db.get(docpath, function(err, doc) {

        if (!doc) {
            console.log('not found - creating "' + docpath +'"')
            return save_doc();
        }

        if (compare_def(doc.updates, code.updates) || compare_def(doc.views, code.views)) {
            return save_doc();
        }
        console.log('"' + docpath +'" up to date')
    });
}

var EXAMPLE1_DDOC = {
    language: 'javascript',
    views: {
        active: {
            map: function (doc) {
                if (doc.lastsession) {
                    emit(parseInt(doc.lastsession / 1000), 1)
                }
            },
            reduce: function(keys, counts, rereduce) {
                return sum(counts)
            }
        },
        users: function(doc) { 
            if (doc.created) {
                emit(parseInt(doc.created / 1000), 1)
            }
        }
    }    
}

var EXAMPLE2_DDOC = {
    language: 'javascript',
    views: {
        myview: function(doc) {
            if (doc.param1 && doc.param2) {
                emit([doc.param1, doc.param2], null)
            }
        }
    }
}

update_views(DB_SOMETHING, '_design/example1', EXAMPLE1_DDOC);
update_views(DB_SOMETHING, '_design/example2', EXAMPLE2_DDOC);

The code is pretty simple.

  1. First it loads the Cradle couchdb driver and creates needed databases if they do not already exist. In this example only single database ‘somedb’ is created.
  2. The update_views is responsible of keeping the design docs up to date. It loads the design doc from defined DB and compares it to the code defined in the design doc in this file. If it has changed (or missing) it will be recreated.
  3. The example design docs (EXAMPLE1_DDOC and EXAMPLE2_DDOC) are simple design doc definitions as Javascript object. You’re familiar with CouchDB so this is self explanatory.
  4. Lastly the code just calls the update_views to update the design documents.

Now it’s possible to maintain the views in this Javascript file, the Node will make sure that the syntax is always valid.

Example output:

Views are up to date.

$ node cdb-views.js
"_design/example1" up to date
"_design/example2" up to date

Definition of view example2/myview has changed

$ node cdb-views.js
"_design/example1" up to date
definition of "myview" changed - updating "_design/example2"

Design doc example2 can not be found and is created.

$ node cdb-views.js
"_design/example1" up to date
no design doc found updating "_design/example2"

 

 

Callbacks from Threaded Node.js C++ Extension

UPDATE: this guide is bit outdated, new Node (0.6 > ) versions support easier way to access pooled worker threads so extension doesn’t need to create its own. See links in comments.

Writing threaded Node.js extension requires some care. All Javascript in Node.js is executed in single main thread, so you can not simply call the V8 engine directly from your background thread. That would cause segfault. Recommended way to do this is to spawn new thread on background and use the libev events to wake up the main thread to execute the Javascript callbacks.

Node.js framework has lots of ready stuff for implementing extensions, but there is no simple example how to implement this kind extension so here it is.

Add-on Source

Save this source to texample.cc

#include <queue>

// node headers
#include <v8.h>
#include <node.h>
#include <ev.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

using namespace node;
using namespace v8;

// handles required for callback messages
static pthread_t texample_thread;
static ev_async eio_texample_notifier;
Persistent<String> callback_symbol;
Persistent<Object> module_handle;

// message queue
std::queue<int> cb_msg_queue = std::queue<int>();
pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;

// The background thread
static void* TheThread(void *)
{
    int i = 0;
    while(true) {
         // fire event every 5 seconds
        sleep(5);
       pthread_mutex_lock(&queue_mutex);
       cb_msg_queue.push(i);
       pthread_mutex_unlock(&queue_mutex);
       i++;
      // wake up callback
      ev_async_send(EV_DEFAULT_UC_ &eio_texample_notifier);
    }
    return NULL;
}

// callback that runs the javascript in main thread
static void Callback(EV_P_ ev_async *watcher, int revents)
{
    HandleScope scope;

    assert(watcher == &eio_texample_notifier);
    assert(revents == EV_ASYNC);

    // locate callback from the module context if defined by script
    // texample = require('texample')
    // texample.callback = function( ... ) { ..
    Local<Value> callback_v = module_handle->Get(callback_symbol);
    if (!callback_v->IsFunction()) {
         // callback not defined, ignore
         return;
    }
    Local<Function> callback = Local<Function>::Cast(callback_v);

    // dequeue callback message
    pthread_mutex_lock(&queue_mutex);
    int number = cb_msg_queue.front();
    cb_msg_queue.pop();
    pthread_mutex_unlock(&queue_mutex);

    TryCatch try_catch;

    // prepare arguments for the callback
    Local<Value> argv[1];
    argv[0] = Local<Value>::New(Integer::New(number));

    // call the callback and handle possible exception
    callback->Call(module_handle, 1, argv);

    if (try_catch.HasCaught()) {
        FatalException(try_catch);
    }
}

// Start the background thread
Handle<Value> Start(const Arguments &args)
{
    HandleScope scope;

    // start background thread and event handler for callback
    ev_async_init(&eio_texample_notifier, Callback);
    //ev_set_priority(&eio_texample_notifier, EV_MAXPRI);
    ev_async_start(EV_DEFAULT_UC_ &eio_texample_notifier);
    ev_unref(EV_DEFAULT_UC);
    pthread_create(&texample_thread, NULL, TheThread, 0);

    return True();
}

void Initialize(Handle<Object> target)
{
    HandleScope scope;

    NODE_SET_METHOD(target, "start", Start);

    callback_symbol = NODE_PSYMBOL("callback");
    // store handle for callback context
    module_handle = Persistent<Object>::New(target);
}

extern "C" {
static void Init(Handle<Object> target)
{
    Initialize(target);
}

NODE_MODULE(texample, Init);

Function walkthrough

  • The Init function gets called when you require('texample') the native module.
  • Initialize function defines module function start that will be called by javascript. It also stores the module handle for locating and calling the script defined callback on right context.
  • Start function initializes the libev event notifier and starts the background thread TheThread
  • Thread TheThread simply loops, sleeps and puts incremental integers to the queue and wakes up the main thread each time.
  • Callback function gets waken up the libev and it locates and calls the javascript function callback

Building

Copy this to the ‘wscript’  file.

def set_options(opt):
  opt.tool_options("compiler_cxx")

def configure(conf):
  conf.check_tool("compiler_cxx")
  conf.check_tool("node_addon")

def build(bld):
  obj = bld.new_task_gen("cxx", "shlib", "node_addon")
  obj.cxxflags = ["-g", "-D_FILE_OFFSET_BITS=64",
                  "-D_LARGEFILE_SOURCE", "-Wall"]
  obj.target = "texample"
  obj.source = "texample.cc"

Compile the code with node-waf

$ node-waf configure
$ node-waf build

Running

Start node shell and load the native add on module

$ node
> texample = require('./build/default/texample');

Define the callback function that the module will call

> texample.callback = function(i) {
... console.log('Bang: ' + i);
... }
>

Call start to kick of the background thread

> texample.start();
true
>

Wait for 5 seconds, you’ll start seeing your callback getting triggered every 5 seconds.

> Bang: 0
Bang: 1
> Bang: 2

Have fun!

Embedding V8 Javascript Engine and Go

This is two common examples merged together; how to run V8 as embedded and how to
call C modules from Go language. I’m using Ubuntu 10.04 x64 with standard gcc toolchain.

Step 1. Compile v8

Get v8 source and build v8 as shared library.
Use this command line and copy libv8.so to to your project directory:

$ scons mode=release library=shared snapshot=on arch=x64
$ cp libv8.so ~/v8example

Step 2. C Wrapper for V8

Write C++ function that accepts javascript source code as argument and compiles and runs it in v8.

Header file:

#ifndef _V8WRAPPER_H
#define _V8WRAPPER_H

#ifdef __cplusplus
extern "C" {
#endif
    // compiles and executes javascript and returns the script return value as string
    char * runv8(const char *jssrc);

#ifdef __cplusplus
}
#endif

#endif // _V8WRAPPER_H

Source file, this is slightly modified version from official v8 C++ embedders guide.

#include <v8.h>
#include <string.h>

#include "v8wrapper.h"

using namespace v8;

char * runv8(const char *jssrc)
{
    // Create a stack-allocated handle scope.
    HandleScope handle_scope;

    // Create a new context.
    Persistent<Context> context = Context::New();

    // Enter the created context for compiling and
    // running the script.
    Context::Scope context_scope(context);

    // Create a string containing the JavaScript source code.
    Handle<String> source = String::New(jssrc);

    // Compile the source code.
    Handle<Script> script = Script::Compile(source);

    // Run the script
    Handle<Value> result = script->Run();

    // Dispose the persistent context.
    context.Dispose();

    // return result as string, must be deallocated in cgo wrapper
    String::AsciiValue ascii(result);
    return strdup(*ascii);
}

Makefile.wrapper

V8_INC=/home/user/builds/v8/include

CC=g++
CFLAGS=-c -fPIC -I$(V8_INC)
SOURCES=v8wrapper.cc
OBJECTS=$(SOURCES:.cc=.o)
TARGET=libv8wrapper.so

all: $(TARGET)

.cc.o:
    $(CC) $(CFLAGS) $< -o $@

$(TARGET): $(OBJECTS)
    ld -G -o $@ $(OBJECTS)

Compile to get the shared library

$ make -f Makefile.wrapper

You should end up with file libv8wrapper.so

Step 3. CGO Wrapper for Go

Now define a CGO wrapper source file that exposes the v8 to the Go language.

Go source file for the CGO compiler. Note that the comments are functional and contain instructions to cgo compiler. The libv8.so and just compiled libv8wrapper.so are assumed to be in current working directory for linking.

// #cgo LDFLAGS: -L. -lv8wrapper -lv8  -lstdc++ -pthread
// #include <stdlib.h>
// #include "v8wrapper.h"
import "C"
import "unsafe"

func RunV8(script string) string {

  // convert Go string to nul terminated C-string
  cstr := C.CString(script)
  defer C.free(unsafe.Pointer(cstr))

  // run script and convert returned C-string to Go string
  rcstr := C.runv8(cstr)
  defer C.free(unsafe.Pointer(rcstr))

  return C.GoString(rcstr)  
}

CGO Makefile. Note here that you need to have GOROOT defined. The OS and Architecture are defined here too.

include $(GOROOT)/src/Make.inc

GOOS=linux
GOARCH=amd64

TARG=v8runner
CGOFILES=\
    v8runner.go\

include $(GOROOT)/src/Make.pkg

Compile to Go package v8runner and install it

$ make -f Makefile.cgo
$ make -f Makefile.cgo install

Install copies the package file to the $GOROOT/pkg/linux_amd64/v8runner.a where it can be imported by Go compiler and linker.

Step 4. The GO program

Now you’re finally ready to make plain Go program that runs v8.

package main

import "v8runner"
import "fmt"

func main() {
    r: = v8runner.RunV8("'Hello Go World'")
    fmt.Println(r)
}

Makefile.hello

include $(GOROOT)/src/Make.inc

TARG=hello
GOFILES=hello.go

Compile

$ make -f Makefile.hello

Set LD_LIBRARY_PATH to current directory, assuming you have libv8.so and libv8wrapper.so there.

$ export LD_LIBRARY_PATH=.

Run the program

$ ./hello
Hello Go World

To recap the steps

  1. Shared C++ library that exposes C-function to run javascript : libv8wrapper.so
  2. CGO compiled wrapper that passes arguments between Go and C world and calls the C functions: v8runner
  3. Go program that imports the package and uses it normally.

This hack has some  caveats.

  • There is currently no way to link everything statically, as the CGO does not support it. You need to use shared libraries.
  • I’m not aware of any easy way to call back Go from the CGO wrapped C++. You need wrappers over wrappers as demonstrated by this post: http://groups.google.com/group/golang-nuts/msg/c98b4c63ba739240. Matroska ftw.
  • Only one thread at a time can use v8 instance. You need to use Isolates (See v8 source for more information) how to support multiple instances. Still only one thread at a time can use specific instance

Developing on Google App Engine for Production

If you’re considering App Engine as platform for your next big thing, here is potpourri of observations that you might find worth reading. This is not tutorial, and basic App Engine hands-on experience is required. Stuff here is written from experiences in Python environment, for Java mileage may vary. There is also lots of functionality that is not covered here because I didn’t personally use them or they are otherwise well documented.

Queries and Indexes

Applications can use basically only two queries: Get data entity by key or get data entities by range.  In ranged query key can be property value, or composite of property values. Anything that needs more than one filter property and specific order will need composite index definition.

For key ordered queries App Engine supports self-merge for table, but in real life it doesn’t work always very far as when number of entities grow you may eventually hit the error “NeedIndexError: The built-in indices are not efficient enough for this query and your data. Please add a composite index for this query.”. This means that some values are too sparse to filter data efficiently. e.g. you have 30 000 entities and one of the properties you’re querying is boolean flag that is either True or False to every entity.

App engine uses composite keys for  building indexes for  queries that need specific order. Be careful when combining more than one list property values in composite index,  App Engine will build permutations of all key values and even with modest lists you end up with hundreds of index entries.

For example define model with two list properties and timestamp

class MyModel(db.Model):
   created = db.DateTimeProperty()
   list1 = db.StringListProperty()
   list2 = db.ListProperty(int)

Define composite index for the model

- kind: MyModel
  properties:
  - name: list1
  - name: list2
  - name: created
    direction: desc

Put entity

m = MyModel()
m.list1 = ["cat", "dog", "pig", "angry", "bird"]
m.list2 = [1, 2, 3]
m.put()

This would create following reverse and custom index entries

  • 5 for each item in list1
  • 3 for each item in list2
  • 1 for created
  • 5 * 3 = 15 entries for permutations (cat, 1, created), (cat, 2, created), (cat, 3, created), (dog, 1, created), (dog, 2, created), …

Total 15 + 1 + 3 + 5 = 24 entries. This is not much in the example, but if grows exponentially when number of list entries and indexes increases. 3 lists in index each having 10 values would mean 10^3 = 1000 index entries.

Maximum number of index entries is 5000 per entity, and this is shared with implicit reverse property index and explicit custom indexes. For example if you have listproperty that you use in custom index, it can have at maximum ~2500 values because the implicit reverse index will take 2500 and the custom index rest 2500 totalling 5000.

Remember to set indexed=false in property definition if you don’t need to query against property, this saves both space and CPU.

Query latencies are pretty ok,  ~100ms for few dozen entities and you can use IN-operator to make parallel queries. Just make sure that your ‘IN’ queries  do not return lots of overlapping results as that can hurt performance. Direct get by key latencies are very good. (~20ms). Naturally latency increases linearly if your objects are very large, especially with long listproperties.

Text search is in App Engine roadmap and under development. Meanwhile  you can make simple startswith queries against single property or list of strings. Queries are identical in both cases.
Single property startswith query

class User(db.Model):
   name = db.StringProperty()

users = User.all().filter('name >=', query).filter('name <', query + '\ufffd').fetch()

Listproperty startswith query

class SomeModel(db.Model):
   keywords = db.StringListProperty()

models = SomeModel.all().filter('keywords >=', query).filter('keywords <', query + '\ufffd').fetch()

Note that in latter the sort order may not what you wish for as you must sort always first by first inequality filter property, in this example keywords. Just keep in mind the index growth when you add more properties to the query.

Soft memory limit is < 200MB that is reached easily if you’ve large entities, don’t rely that you can do lots of in-memory sorting. Especially Python memory overhead is pretty big. As rule of thumb you can manipulate ~15000 properties per call. (e.g. 1000 entities each having 15 properties). Each element in listproperty is counted as property.

You’ll see often DeadLineExceedError in the logs, nothing you can do to these except to use highly replicated datastore. Just note that it has much higher CPU cost. Curiously frequency of these errors seem pretty constant and independent of the load. Maybe App Engine gives more priority to more popular apps.

Quotas

Depends lot of your application, but at least in my experience the CPU is limiting factor for most of the use cases. This is mainly because you need to do most of the work when inserting new objects instead of when querying them, so even rarely used queries will cost you in every single insert. Queries needs indexes and storing entities with indexes cost API CPU time. Both your own application execution and the API (DB, etc..) execution time is counted in your quota. Be sure to measure and estimate your costs. Putting entities with very large listproperties that use custom indexes can easily cost 25-60seconds of CPU time per entity.

In case combined CPU time (app + api) grows large enough (> ~1000ms) App Engine will warn you in logs that the call uses high amount of CPU and may run over it’s quota. Curiously it makes this same warning even when you have billing enabled but  it wont’ restrict your app in that case however.

Scalability is Latency

App Engine scalability rules are complex but what mostly matters is your average latency. If you’ve only slow request handler (latency > 500ms) app engine will limit your scalability. It’s not bad to have few slow ones, but make sure that the average is somewhere around ~250ms or less. In worst case App Engine refuses to start new instances and queues new requests to already serving instances thus growing the user perceived request latency. You can observe this from App Engine dashboard log entries showing ‘pending_ms’ times.

Note that cpu time is not same thing as latency, for example these two pieces of code have roughly same CPU cost, but latter has only 1/3 of latency

Slow put

 e1.put()
 e2.put()
 e3.put()

Fast put

db.put([e1, e2, e3])

Slow get

 e1 = db.get(key1)
 e2 = db.get(key2)
 e3 = db.get(key3)

Fast get

ents = db.get([key1, key2, key3])

Parental fetch, aka relation index, aka. parent reference

App Engine DB API does not support partial fetch that can be issue if you have very large listproperties in the entities. It’s possible to achieve something similar by using parent keys. For example if you’ve large number of elements in listproperty, you can make key only query againts that property and fetch only the keys. Then get keys parent value and fetch entity you need.

class Message(db.Model):
  text = db.StringProperty()

class MsgIdx(db.Model)
  recipients = db.ListProperty(db.Key)

msg = Message(text="Hello World")
msg.put()

idx = MsgIndex(key_name='idx', parent=msg)
idx.recipients.append(user1.key())
idx.recipients.append(user2.key())
 ...
idx.put()

Query messages where userX is in recipient list, first get keys

keys = MsgIndex.all(keys_only=True).filter('recipients', userX).fetch(100)

query actual message objects

msg = db.get([k.parent() for k in keys])

In this way you avoid serializing the potentially large recipient list completely.

See Brett Slatkin’s presentation for more details.

Transactions

App Engine DB supports  transactions but it’s not possible to implement global consistency, because transaction can only operate objects in single entity group. For example if you have entity A and B that have no parent keys, you can not operate them both in single transaction. Entity group is all entities with same parent root key, entity without parent key is its own group.

Word of warning, when you use transactions all entities with same parent key are locked for transaction (entity group), in general there should not be more than 1-3 updates per second for single entity group or you’ll get lots of transaction collisions retries that will eat your CPU and increase latency. Collision retries are logged as warnings in App Engine console.

Pre-fetch Referenceproperties

Prefetch referenceproperties before accessing them in sequence.
Bad, will trigger separate DB query for user property each time

class Foo(db.Model):
  user = db.ReferenceProperty(User)

foos = Foo.all().fetch(100)
for f in foo:
  print f.user.name

Good, See prefetch_reprop function implementation here.

foos = Foo.all().fetch(100)
prefetch_refprop(foos,  Foo.user)
for f in foo:
  print f.user.name

This will decrease latency and API CPU time significantly

Debugging and Profiling

Standard Python debugger does not work in App Engine development server, but you can use following wrapper and start dev_appserver.py from command line to get into debugger.

def appe_set_trace():
  import pdb, sys
  debugger = pdb.Pdb(stdin=sys.__stdin__,
  stdout=sys.__stdout__)
  debugger.set_trace(sys._getframe().f_back)

API profiling. Define appengine_config.py in your app and define access stats handler in app.yaml.

def webapp_add_wsgi_middleware(app):
  from google.appengine.ext.appstats import recording
  app = recording.appstats_wsgi_middleware(app)
  return app

- url: /stats.*
  script: $PYTHON_LIB/google/appengine/ext/appstats/ui.py
  login: admin

CPU profiling. Define profiling wrapper that dumps the CPU times to the log.

def real_main():
  # Run the WSGI CGI handler with that application.
  util.run_wsgi_app(application)

def profile_main():
  # This is the main function for profiling
  # We've renamed our main() above to real_main()
  import cProfile, pstats
  prof = cProfile.Profile()
  prof = prof.runctx("real_main()", globals(), locals())
  stream = StringIO.StringIO()
  stats = pstats.Stats(prof, stream=stream)
  stats.sort_stats("cumulative")  # time or cumulative
  stats.print_stats(80)  # 80 = how many to print
  # The rest is optional.
  # stats.print_callees()
  # stats.print_callers()
  logging.info('Profile data:\n%s', stream.getvalue());

if __name__ == '__main__':
  main = profile_main
  #main = real_main
  main()

Task TransientErrors

Task add fails often with transienterror, just retry it once more and you should get rarely failed task adds.

try:
   taskqueue.add(...
except taskqueue.TransientError:
   taskqueue.add(..  # retry once more

Misc

Other things.

  • Static files are not served from application environment, your application can not access them programmatically.
  • Urlfetch service has maximum of 10 sec timeout and can do maximum 10 parallel queries per instance. Queries fail occasionally with application error that is usually caused by server timeout. Queries are done from pretty random source ip’s that are shared by all other  engine apps. You can not override header.
  • Naked domains are not supported (like example.com)
  • Memcache lifetime can very short, mere minutes but if your application is popular App Engine might give more priority. Use multi get and set when ever possible.

Interpreting Go Socket Errors

Go sockets returns error variables when something goes wrong, and the different error codes are documented here in the Go documentation. However I was not able to find coherent example that would show how the error variable is supposed to be used. Canonical way seems to be just checking it against nill and dump it out in case it’s something else, like this:

n, err := conn.Read(buffer[:])
if err != nill {
    fmt.Printf("%v\n", err)
}

Real applications (especially system applications) need to branch based on error to recover properly, so just error description is not enough. I made here example what it’s possible to deduct from the error variable.

conn, err := net.Dial("tcp", "", "example.com:80")
n, err := conn.Read(buffer[:])

if err != nil {

    // print error string e.g.
    // "read tcp example.com:80: resource temporarily unavailable"
    fmt.Printf("reader %v\n", err)

    // print type of the error, e.g. "*net.OpError"
    fmt.Printf("%T", err)

    if err == os.EINVAL {
      // socket is not valid or already closed
      fmt.Println("EINVAL");
    }
    if err == os.EOF {
      // remote peer closed socket
      fmt.Println("EOF");
    }

    // matching rest of the codes needs typecasting, errno is
    // wrapped on OpError
    if e, ok := err.(*net.OpError); ok {
       // print wrapped error string e.g.
       // "os.Errno : resource temporarily unavailable"
       fmt.Printf("%T : %v\n", e.Error, e.Error)
       if e.Timeout() {
         // is this timeout error?
         fmt.Println("TIMEOUT")
       }
       if e.Temporary() {
         // is this temporary error?  True on timeout,
         // socket interrupts or when buffer is full
         fmt.Println("TEMPORARY")
       }

      // specific granular error codes in case we're interested
     switch e.Error {
        case os.EAGAIN:
           // timeout
           fmt.Println("EAGAIN")
       case os.EPIPE:
          // broken pipe (e.g. on connection reset)
          fmt.Println("EPIPE")
       default:
          // just write raw errno code, can be platform specific
          // (see syscall for definitions)
          fmt.Printf("%d\n", int64(e.Error.(os.Errno)))
     }
 }

For example in case read times out, the code would print following

read tcp 192.0.32.10:80: resource temporarily unavailable
*net.OpError
os.Errno : resource temporarily unavailable
TIMEOUT
TEMPORARY
EAGAIN