Erlang -> Proposals -> gen_stream module

A Stream Server

Recently Tim Bray proposed a problem called The Wide Finder Project which he thought would be appropriate for concurrent languages that could take advantage of multicore machines. His Ruby program was around 10 lines of code. Several erlang developers attempted to improve on the performance, and ended up writing 300-400 lines of code to compete.

One of the themes that was common to the erlang solutions was the need to read a file larger than memory and split it into bite-sized chunks to be scanned in parallel. There were several approaches, and many inefficiencies were rediscovered on the way to more efficient implementations. The lack of a good idiom was the impetus for this proposal.

Our suggested solution is to add a module called gen_stream which provides a standard mechanism for delivering binary data in chunks so that the large binaries don't overwhelm a node's resources. It has been optimized to ensure good performance in a variety of situations, a consistent API and a declarative set of options to help an application developer tune the efficiency. The model is a sequential stream of successive chunks, with the possibility of making it circular so that there is no end to the stream.

Reference Implementation

The current implementation supports the following features:

 

A gen_stream is a gen_server with configurable options. It supports the following calls:

 

A working implementation with unit tests can be downloaded so that others can evaluate the usefulness of a gen_stream module (the tar file creates a new directory called gen_stream with all files contained inside the new directory):

 

Implementation Details

Below are examples of how to use the gen_stream module (note it is not required that you call stream_pos or pct_complete, these are provided only as usage patterns):

      process_file(FileName, Fun, Opts) ->
        {ok, S} = gen_stream:start_link([{stream, {file, FileName}}] ++ Opts),
        {stream_size, Size} = gen_stream:stream_size(S),
        consume(S, Fun),
        gen_stream:stop(S).

      consume(Stream, ProcessFn) ->
        case gen_stream:next_chunk(Stream) of
          {next_chunk, end_of_stream} -> ok;
          {next_chunk, Binary} ->
            {pct_complete, Pct} = gen_stream:pct_complete(Stream),
            {stream_pos, Pos} = gen_stream:stream_pos(Stream),
            ProcessFn(Binary, Pct, Pos),
            consume(Stream, ProcessFn)
        end.
    

 

While the use of a gen_server causes the serialized gen_stream to be a narrow pipe when processing chunks concurrently, it gives the cleanest interface and most control over the flow of binary chunks into an application. The performance may not be as fast as a hand-tuned implementation, but the gain in flexibility by the standardized approach is valuable and imposes a minimal overhead on performance. By declaratively altering the options, architectural choices with differing number of processes or buffers can be tried to find the optimal configuration for a particular system.

Join the Discussion

The merits of this proposal are being discussed on the erlang-questions mailing list

DuoMark International, Inc.