Socket binary data stream parsing in Erlang

Erlang code has two different ways of reading from the socket, active and passive. In passive mode, your code calls recv to the socket to receive bytes. In active mode you install controlling process to the socket and receive data as Erlang messages.

Binary data parsing is more complicated on the latter, as application code doesn’t have any control over size of the packets (or flow control for that matter) that it receives. It could be 1 byte, few kilobytes or whatever. Naive pattern matching fails if you don’t get exactly the amount of bytes you want.

Lets assume simple binary protocol, where you need to read packets that are varying in length and formatted as following. Timestamp is defined in first 4 bytes, next 2 bytes define length of payload followed by the payload itself.

|    Timestamp   |  Len  |    Payload      |
|  0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | ... |

Incidentally, this is the data format used by Apple Push notification feedback service.

First lets define a function that accepts binary Data and process Pid where it sends parsed result. This tail recursive function simply matches as many packets from data as possible, and returns with remaining unparsed data.

match_data(Data, Parent) ->
   case Data of
      <<Timestamp:32/big, Size:16/big, PushToken:Size/binary-unit:8, Rest/binary>> ->
         Parent ! {Timestamp, PushToken}, % notify parent
         %% parse rest of the data
        match_data(Rest, Parent);
      Rest ->
         %% no match
        Rest
   end.

Then function that actually receives the data from the socket. It receives arbitrary pieces of data, concatenates it to existing unparsed data and calls the match_data handler to make actual packet matching. Then it loops again with unparsed portion of data.

loop(Bin, Parent) ->
   receive
      {_, _Sock, Data} ->
         loop(match_data(erlang:list_to_binary([Bin, Data]), Parent), Parent);
      {ssl_closed, _Sock} -> ok;
      {_event, _Event} ->
         error_logger:error_msg("Unexpected", [_event, _Event])
   end.

Install the loop as controlling process, with initially empty “seed” data.

Pid = self(),
ssl:setopts(Sock, [{active, true}, {mode, binary}]),
ssl:controlling_process(Sock, spawn(fun() -> loop(<<>>, Pid) end)).

This way data is parsed correctly, no matter what size of chunks data is returned from the socket.

Leave a comment