Callbacks from Threaded Node.js C++ Extension

UPDATE: this guide is bit outdated, new Node (0.6 > ) versions support easier way to access pooled worker threads so extension doesn’t need to create its own. See links in comments.

Writing threaded Node.js extension requires some care. All Javascript in Node.js is executed in single main thread, so you can not simply call the V8 engine directly from your background thread. That would cause segfault. Recommended way to do this is to spawn new thread on background and use the libev events to wake up the main thread to execute the Javascript callbacks.

Node.js framework has lots of ready stuff for implementing extensions, but there is no simple example how to implement this kind extension so here it is.

Add-on Source

Save this source to texample.cc

#include <queue>

// node headers
#include <v8.h>
#include <node.h>
#include <ev.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

using namespace node;
using namespace v8;

// handles required for callback messages
static pthread_t texample_thread;
static ev_async eio_texample_notifier;
Persistent<String> callback_symbol;
Persistent<Object> module_handle;

// message queue
std::queue<int> cb_msg_queue = std::queue<int>();
pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;

// The background thread
static void* TheThread(void *)
{
    int i = 0;
    while(true) {
         // fire event every 5 seconds
        sleep(5);
       pthread_mutex_lock(&queue_mutex);
       cb_msg_queue.push(i);
       pthread_mutex_unlock(&queue_mutex);
       i++;
      // wake up callback
      ev_async_send(EV_DEFAULT_UC_ &eio_texample_notifier);
    }
    return NULL;
}

// callback that runs the javascript in main thread
static void Callback(EV_P_ ev_async *watcher, int revents)
{
    HandleScope scope;

    assert(watcher == &eio_texample_notifier);
    assert(revents == EV_ASYNC);

    // locate callback from the module context if defined by script
    // texample = require('texample')
    // texample.callback = function( ... ) { ..
    Local<Value> callback_v = module_handle->Get(callback_symbol);
    if (!callback_v->IsFunction()) {
         // callback not defined, ignore
         return;
    }
    Local<Function> callback = Local<Function>::Cast(callback_v);

    // dequeue callback message
    pthread_mutex_lock(&queue_mutex);
    int number = cb_msg_queue.front();
    cb_msg_queue.pop();
    pthread_mutex_unlock(&queue_mutex);

    TryCatch try_catch;

    // prepare arguments for the callback
    Local<Value> argv[1];
    argv[0] = Local<Value>::New(Integer::New(number));

    // call the callback and handle possible exception
    callback->Call(module_handle, 1, argv);

    if (try_catch.HasCaught()) {
        FatalException(try_catch);
    }
}

// Start the background thread
Handle<Value> Start(const Arguments &args)
{
    HandleScope scope;

    // start background thread and event handler for callback
    ev_async_init(&eio_texample_notifier, Callback);
    //ev_set_priority(&eio_texample_notifier, EV_MAXPRI);
    ev_async_start(EV_DEFAULT_UC_ &eio_texample_notifier);
    ev_unref(EV_DEFAULT_UC);
    pthread_create(&texample_thread, NULL, TheThread, 0);

    return True();
}

void Initialize(Handle<Object> target)
{
    HandleScope scope;

    NODE_SET_METHOD(target, "start", Start);

    callback_symbol = NODE_PSYMBOL("callback");
    // store handle for callback context
    module_handle = Persistent<Object>::New(target);
}

extern "C" {
static void Init(Handle<Object> target)
{
    Initialize(target);
}

NODE_MODULE(texample, Init);

Function walkthrough

  • The Init function gets called when you require('texample') the native module.
  • Initialize function defines module function start that will be called by javascript. It also stores the module handle for locating and calling the script defined callback on right context.
  • Start function initializes the libev event notifier and starts the background thread TheThread
  • Thread TheThread simply loops, sleeps and puts incremental integers to the queue and wakes up the main thread each time.
  • Callback function gets waken up the libev and it locates and calls the javascript function callback

Building

Copy this to the ‘wscript’  file.

def set_options(opt):
  opt.tool_options("compiler_cxx")

def configure(conf):
  conf.check_tool("compiler_cxx")
  conf.check_tool("node_addon")

def build(bld):
  obj = bld.new_task_gen("cxx", "shlib", "node_addon")
  obj.cxxflags = ["-g", "-D_FILE_OFFSET_BITS=64",
                  "-D_LARGEFILE_SOURCE", "-Wall"]
  obj.target = "texample"
  obj.source = "texample.cc"

Compile the code with node-waf

$ node-waf configure
$ node-waf build

Running

Start node shell and load the native add on module

$ node
> texample = require('./build/default/texample');

Define the callback function that the module will call

> texample.callback = function(i) {
... console.log('Bang: ' + i);
... }
>

Call start to kick of the background thread

> texample.start();
true
>

Wait for 5 seconds, you’ll start seeing your callback getting triggered every 5 seconds.

> Bang: 0
Bang: 1
> Bang: 2

Have fun!

Embedding V8 Javascript Engine and Go

This is two common examples merged together; how to run V8 as embedded and how to
call C modules from Go language. I’m using Ubuntu 10.04 x64 with standard gcc toolchain.

Step 1. Compile v8

Get v8 source and build v8 as shared library.
Use this command line and copy libv8.so to to your project directory:

$ scons mode=release library=shared snapshot=on arch=x64
$ cp libv8.so ~/v8example

Step 2. C Wrapper for V8

Write C++ function that accepts javascript source code as argument and compiles and runs it in v8.

Header file:

#ifndef _V8WRAPPER_H
#define _V8WRAPPER_H

#ifdef __cplusplus
extern "C" {
#endif
    // compiles and executes javascript and returns the script return value as string
    char * runv8(const char *jssrc);

#ifdef __cplusplus
}
#endif

#endif // _V8WRAPPER_H

Source file, this is slightly modified version from official v8 C++ embedders guide.

#include <v8.h>
#include <string.h>

#include "v8wrapper.h"

using namespace v8;

char * runv8(const char *jssrc)
{
    // Create a stack-allocated handle scope.
    HandleScope handle_scope;

    // Create a new context.
    Persistent<Context> context = Context::New();

    // Enter the created context for compiling and
    // running the script.
    Context::Scope context_scope(context);

    // Create a string containing the JavaScript source code.
    Handle<String> source = String::New(jssrc);

    // Compile the source code.
    Handle<Script> script = Script::Compile(source);

    // Run the script
    Handle<Value> result = script->Run();

    // Dispose the persistent context.
    context.Dispose();

    // return result as string, must be deallocated in cgo wrapper
    String::AsciiValue ascii(result);
    return strdup(*ascii);
}

Makefile.wrapper

V8_INC=/home/user/builds/v8/include

CC=g++
CFLAGS=-c -fPIC -I$(V8_INC)
SOURCES=v8wrapper.cc
OBJECTS=$(SOURCES:.cc=.o)
TARGET=libv8wrapper.so

all: $(TARGET)

.cc.o:
    $(CC) $(CFLAGS) $< -o $@

$(TARGET): $(OBJECTS)
    ld -G -o $@ $(OBJECTS)

Compile to get the shared library

$ make -f Makefile.wrapper

You should end up with file libv8wrapper.so

Step 3. CGO Wrapper for Go

Now define a CGO wrapper source file that exposes the v8 to the Go language.

Go source file for the CGO compiler. Note that the comments are functional and contain instructions to cgo compiler. The libv8.so and just compiled libv8wrapper.so are assumed to be in current working directory for linking.

// #cgo LDFLAGS: -L. -lv8wrapper -lv8  -lstdc++ -pthread
// #include <stdlib.h>
// #include "v8wrapper.h"
import "C"
import "unsafe"

func RunV8(script string) string {

  // convert Go string to nul terminated C-string
  cstr := C.CString(script)
  defer C.free(unsafe.Pointer(cstr))

  // run script and convert returned C-string to Go string
  rcstr := C.runv8(cstr)
  defer C.free(unsafe.Pointer(rcstr))

  return C.GoString(rcstr)  
}

CGO Makefile. Note here that you need to have GOROOT defined. The OS and Architecture are defined here too.

include $(GOROOT)/src/Make.inc

GOOS=linux
GOARCH=amd64

TARG=v8runner
CGOFILES=\
    v8runner.go\

include $(GOROOT)/src/Make.pkg

Compile to Go package v8runner and install it

$ make -f Makefile.cgo
$ make -f Makefile.cgo install

Install copies the package file to the $GOROOT/pkg/linux_amd64/v8runner.a where it can be imported by Go compiler and linker.

Step 4. The GO program

Now you’re finally ready to make plain Go program that runs v8.

package main

import "v8runner"
import "fmt"

func main() {
    r: = v8runner.RunV8("'Hello Go World'")
    fmt.Println(r)
}

Makefile.hello

include $(GOROOT)/src/Make.inc

TARG=hello
GOFILES=hello.go

Compile

$ make -f Makefile.hello

Set LD_LIBRARY_PATH to current directory, assuming you have libv8.so and libv8wrapper.so there.

$ export LD_LIBRARY_PATH=.

Run the program

$ ./hello
Hello Go World

To recap the steps

  1. Shared C++ library that exposes C-function to run javascript : libv8wrapper.so
  2. CGO compiled wrapper that passes arguments between Go and C world and calls the C functions: v8runner
  3. Go program that imports the package and uses it normally.

This hack has some  caveats.

  • There is currently no way to link everything statically, as the CGO does not support it. You need to use shared libraries.
  • I’m not aware of any easy way to call back Go from the CGO wrapped C++. You need wrappers over wrappers as demonstrated by this post: http://groups.google.com/group/golang-nuts/msg/c98b4c63ba739240. Matroska ftw.
  • Only one thread at a time can use v8 instance. You need to use Isolates (See v8 source for more information) how to support multiple instances. Still only one thread at a time can use specific instance

Developing on Google App Engine for Production

If you’re considering App Engine as platform for your next big thing, here is potpourri of observations that you might find worth reading. This is not tutorial, and basic App Engine hands-on experience is required. Stuff here is written from experiences in Python environment, for Java mileage may vary. There is also lots of functionality that is not covered here because I didn’t personally use them or they are otherwise well documented.

Queries and Indexes

Applications can use basically only two queries: Get data entity by key or get data entities by range.  In ranged query key can be property value, or composite of property values. Anything that needs more than one filter property and specific order will need composite index definition.

For key ordered queries App Engine supports self-merge for table, but in real life it doesn’t work always very far as when number of entities grow you may eventually hit the error “NeedIndexError: The built-in indices are not efficient enough for this query and your data. Please add a composite index for this query.”. This means that some values are too sparse to filter data efficiently. e.g. you have 30 000 entities and one of the properties you’re querying is boolean flag that is either True or False to every entity.

App engine uses composite keys for  building indexes for  queries that need specific order. Be careful when combining more than one list property values in composite index,  App Engine will build permutations of all key values and even with modest lists you end up with hundreds of index entries.

For example define model with two list properties and timestamp

class MyModel(db.Model):
   created = db.DateTimeProperty()
   list1 = db.StringListProperty()
   list2 = db.ListProperty(int)

Define composite index for the model

- kind: MyModel
  properties:
  - name: list1
  - name: list2
  - name: created
    direction: desc

Put entity

m = MyModel()
m.list1 = ["cat", "dog", "pig", "angry", "bird"]
m.list2 = [1, 2, 3]
m.put()

This would create following reverse and custom index entries

  • 5 for each item in list1
  • 3 for each item in list2
  • 1 for created
  • 5 * 3 = 15 entries for permutations (cat, 1, created), (cat, 2, created), (cat, 3, created), (dog, 1, created), (dog, 2, created), …

Total 15 + 1 + 3 + 5 = 24 entries. This is not much in the example, but if grows exponentially when number of list entries and indexes increases. 3 lists in index each having 10 values would mean 10^3 = 1000 index entries.

Maximum number of index entries is 5000 per entity, and this is shared with implicit reverse property index and explicit custom indexes. For example if you have listproperty that you use in custom index, it can have at maximum ~2500 values because the implicit reverse index will take 2500 and the custom index rest 2500 totalling 5000.

Remember to set indexed=false in property definition if you don’t need to query against property, this saves both space and CPU.

Query latencies are pretty ok,  ~100ms for few dozen entities and you can use IN-operator to make parallel queries. Just make sure that your ‘IN’ queries  do not return lots of overlapping results as that can hurt performance. Direct get by key latencies are very good. (~20ms). Naturally latency increases linearly if your objects are very large, especially with long listproperties.

Text search is in App Engine roadmap and under development. Meanwhile  you can make simple startswith queries against single property or list of strings. Queries are identical in both cases.
Single property startswith query

class User(db.Model):
   name = db.StringProperty()

users = User.all().filter('name >=', query).filter('name <', query + '\ufffd').fetch()

Listproperty startswith query

class SomeModel(db.Model):
   keywords = db.StringListProperty()

models = SomeModel.all().filter('keywords >=', query).filter('keywords <', query + '\ufffd').fetch()

Note that in latter the sort order may not what you wish for as you must sort always first by first inequality filter property, in this example keywords. Just keep in mind the index growth when you add more properties to the query.

Soft memory limit is < 200MB that is reached easily if you’ve large entities, don’t rely that you can do lots of in-memory sorting. Especially Python memory overhead is pretty big. As rule of thumb you can manipulate ~15000 properties per call. (e.g. 1000 entities each having 15 properties). Each element in listproperty is counted as property.

You’ll see often DeadLineExceedError in the logs, nothing you can do to these except to use highly replicated datastore. Just note that it has much higher CPU cost. Curiously frequency of these errors seem pretty constant and independent of the load. Maybe App Engine gives more priority to more popular apps.

Quotas

Depends lot of your application, but at least in my experience the CPU is limiting factor for most of the use cases. This is mainly because you need to do most of the work when inserting new objects instead of when querying them, so even rarely used queries will cost you in every single insert. Queries needs indexes and storing entities with indexes cost API CPU time. Both your own application execution and the API (DB, etc..) execution time is counted in your quota. Be sure to measure and estimate your costs. Putting entities with very large listproperties that use custom indexes can easily cost 25-60seconds of CPU time per entity.

In case combined CPU time (app + api) grows large enough (> ~1000ms) App Engine will warn you in logs that the call uses high amount of CPU and may run over it’s quota. Curiously it makes this same warning even when you have billing enabled but  it wont’ restrict your app in that case however.

Scalability is Latency

App Engine scalability rules are complex but what mostly matters is your average latency. If you’ve only slow request handler (latency > 500ms) app engine will limit your scalability. It’s not bad to have few slow ones, but make sure that the average is somewhere around ~250ms or less. In worst case App Engine refuses to start new instances and queues new requests to already serving instances thus growing the user perceived request latency. You can observe this from App Engine dashboard log entries showing ‘pending_ms’ times.

Note that cpu time is not same thing as latency, for example these two pieces of code have roughly same CPU cost, but latter has only 1/3 of latency

Slow put

 e1.put()
 e2.put()
 e3.put()

Fast put

db.put([e1, e2, e3])

Slow get

 e1 = db.get(key1)
 e2 = db.get(key2)
 e3 = db.get(key3)

Fast get

ents = db.get([key1, key2, key3])

Parental fetch, aka relation index, aka. parent reference

App Engine DB API does not support partial fetch that can be issue if you have very large listproperties in the entities. It’s possible to achieve something similar by using parent keys. For example if you’ve large number of elements in listproperty, you can make key only query againts that property and fetch only the keys. Then get keys parent value and fetch entity you need.

class Message(db.Model):
  text = db.StringProperty()

class MsgIdx(db.Model)
  recipients = db.ListProperty(db.Key)

msg = Message(text="Hello World")
msg.put()

idx = MsgIndex(key_name='idx', parent=msg)
idx.recipients.append(user1.key())
idx.recipients.append(user2.key())
 ...
idx.put()

Query messages where userX is in recipient list, first get keys

keys = MsgIndex.all(keys_only=True).filter('recipients', userX).fetch(100)

query actual message objects

msg = db.get([k.parent() for k in keys])

In this way you avoid serializing the potentially large recipient list completely.

See Brett Slatkin’s presentation for more details.

Transactions

App Engine DB supports  transactions but it’s not possible to implement global consistency, because transaction can only operate objects in single entity group. For example if you have entity A and B that have no parent keys, you can not operate them both in single transaction. Entity group is all entities with same parent root key, entity without parent key is its own group.

Word of warning, when you use transactions all entities with same parent key are locked for transaction (entity group), in general there should not be more than 1-3 updates per second for single entity group or you’ll get lots of transaction collisions retries that will eat your CPU and increase latency. Collision retries are logged as warnings in App Engine console.

Pre-fetch Referenceproperties

Prefetch referenceproperties before accessing them in sequence.
Bad, will trigger separate DB query for user property each time

class Foo(db.Model):
  user = db.ReferenceProperty(User)

foos = Foo.all().fetch(100)
for f in foo:
  print f.user.name

Good, See prefetch_reprop function implementation here.

foos = Foo.all().fetch(100)
prefetch_refprop(foos,  Foo.user)
for f in foo:
  print f.user.name

This will decrease latency and API CPU time significantly

Debugging and Profiling

Standard Python debugger does not work in App Engine development server, but you can use following wrapper and start dev_appserver.py from command line to get into debugger.

def appe_set_trace():
  import pdb, sys
  debugger = pdb.Pdb(stdin=sys.__stdin__,
  stdout=sys.__stdout__)
  debugger.set_trace(sys._getframe().f_back)

API profiling. Define appengine_config.py in your app and define access stats handler in app.yaml.

def webapp_add_wsgi_middleware(app):
  from google.appengine.ext.appstats import recording
  app = recording.appstats_wsgi_middleware(app)
  return app

- url: /stats.*
  script: $PYTHON_LIB/google/appengine/ext/appstats/ui.py
  login: admin

CPU profiling. Define profiling wrapper that dumps the CPU times to the log.

def real_main():
  # Run the WSGI CGI handler with that application.
  util.run_wsgi_app(application)

def profile_main():
  # This is the main function for profiling
  # We've renamed our main() above to real_main()
  import cProfile, pstats
  prof = cProfile.Profile()
  prof = prof.runctx("real_main()", globals(), locals())
  stream = StringIO.StringIO()
  stats = pstats.Stats(prof, stream=stream)
  stats.sort_stats("cumulative")  # time or cumulative
  stats.print_stats(80)  # 80 = how many to print
  # The rest is optional.
  # stats.print_callees()
  # stats.print_callers()
  logging.info('Profile data:\n%s', stream.getvalue());

if __name__ == '__main__':
  main = profile_main
  #main = real_main
  main()

Task TransientErrors

Task add fails often with transienterror, just retry it once more and you should get rarely failed task adds.

try:
   taskqueue.add(...
except taskqueue.TransientError:
   taskqueue.add(..  # retry once more

Misc

Other things.

  • Static files are not served from application environment, your application can not access them programmatically.
  • Urlfetch service has maximum of 10 sec timeout and can do maximum 10 parallel queries per instance. Queries fail occasionally with application error that is usually caused by server timeout. Queries are done from pretty random source ip’s that are shared by all other  engine apps. You can not override header.
  • Naked domains are not supported (like example.com)
  • Memcache lifetime can very short, mere minutes but if your application is popular App Engine might give more priority. Use multi get and set when ever possible.

Interpreting Go Socket Errors

Go sockets returns error variables when something goes wrong, and the different error codes are documented here in the Go documentation. However I was not able to find coherent example that would show how the error variable is supposed to be used. Canonical way seems to be just checking it against nill and dump it out in case it’s something else, like this:

n, err := conn.Read(buffer[:])
if err != nill {
    fmt.Printf("%v\n", err)
}

Real applications (especially system applications) need to branch based on error to recover properly, so just error description is not enough. I made here example what it’s possible to deduct from the error variable.

conn, err := net.Dial("tcp", "", "example.com:80")
n, err := conn.Read(buffer[:])

if err != nil {

    // print error string e.g.
    // "read tcp example.com:80: resource temporarily unavailable"
    fmt.Printf("reader %v\n", err)

    // print type of the error, e.g. "*net.OpError"
    fmt.Printf("%T", err)

    if err == os.EINVAL {
      // socket is not valid or already closed
      fmt.Println("EINVAL");
    }
    if err == os.EOF {
      // remote peer closed socket
      fmt.Println("EOF");
    }

    // matching rest of the codes needs typecasting, errno is
    // wrapped on OpError
    if e, ok := err.(*net.OpError); ok {
       // print wrapped error string e.g.
       // "os.Errno : resource temporarily unavailable"
       fmt.Printf("%T : %v\n", e.Error, e.Error)
       if e.Timeout() {
         // is this timeout error?
         fmt.Println("TIMEOUT")
       }
       if e.Temporary() {
         // is this temporary error?  True on timeout,
         // socket interrupts or when buffer is full
         fmt.Println("TEMPORARY")
       }

      // specific granular error codes in case we're interested
     switch e.Error {
        case os.EAGAIN:
           // timeout
           fmt.Println("EAGAIN")
       case os.EPIPE:
          // broken pipe (e.g. on connection reset)
          fmt.Println("EPIPE")
       default:
          // just write raw errno code, can be platform specific
          // (see syscall for definitions)
          fmt.Printf("%d\n", int64(e.Error.(os.Errno)))
     }
 }

For example in case read times out, the code would print following

read tcp 192.0.32.10:80: resource temporarily unavailable
*net.OpError
os.Errno : resource temporarily unavailable
TIMEOUT
TEMPORARY
EAGAIN