I always love to learn about new programming paradigms to make my expression potential better. One such paradigm that I've read about lately is about programming in pipelines. The idea in itself shouldn't be anything new (it's quite old and much talked about literature), but the way you can do this in python with generators and coroutines is really empowering.

The idea is that your code is comprised of a series of steps for consuming, transforming, and possibly storing data or sending out signals to other systems/generators/coroutines, and every single step gives its result to the next one like a pipeline of commands. The real advantages of using generators are that 1) your program will process elements one at a time (or a cluster at a time if you collect some before sending them to the next step) and 2) that you can easily plug in a part of processing in the middle of the pipeline to reuse code, as long as the input and outputs formats are respected by the generator or coroutine you plug in the middle of the pipepline.

The pipeline paradigm becomes very interesting when you can view the resolution to your logical problem as a series of identical or very similar operations on a long, and possibly infinite, list of items. One perfect example where that paradigm applies directly would be an HTTP server: * the server waits indefinitely until a connection is made to it. the number of connections that can be handled through time is possibly infinitely high (theoretically... everyone needs to restart the server to change config, or needs to reboot to apply kernel security fixes) * each connection request is passed along a first selection process to identify which kind of request it is. (e.g. GET vs. POST vs. HEAD etc.) * the request is chopped into interesting bits, and placed into variables, and then passed on to a backend * the backed opens files, processes code, does whatever it takes to get the right output, and finally returns a properly formatted response.

Both of the following presentations, which slides are extremely interesting (available in PDF) have been written by David M. Beazley.

Generators

Generator Tricks For Systems Programmers

This first presentation deals with generators in Python, and how you can use them to pipeline processing on some data, via iteration in your code. The principle here is that you pull data through the generators to make it traverse every step of the processing line.

So for example, you have this generator which mimics tail -f thefile (stolen from the presentation):

import time
def follow(thefile):
    thefile.seek(0,2)
    while True:
        line = thefile.readline()
        if not line:
            time.sleep(0.1)
            continue
        yield line

You can use this generator to process one line at a time as soon as they come into a file. For example, let's prefix lines with line numbers:

i = 0
for line in follow(open("file","r")):
    i += 1
    print "%d: %s" % (i, line)

You can chain an arbitrary number of iteration levels to implement different "steps" of the computation, thus the principle of pipelining the program.

I've implemented the apache log parser found in this first presentation and modified it a bit to send processed data (in this case the amount of transferred bytes) to Carbon, Graphite's daemon, so that I can create graphs with data from the logs and build correlations between events that happened on the server for which I have other metrics.

Not only is that programming method very elegant and easy to grasp (and to read!), the resulting log parser is very powerful: I was able to parse a 1.3Gb apache log file (by removing thefile.seek(0,2), the follow() generator processes the entire file before wainting for lines to be appended to the file) and send all of its metrics to a Carbon instance on the same local network in about 7s, after which the parser continued to listen to new lines at the end of the log file and send them to Carbon as they were written to the log.

Check out what the log parser looks like (two short libraries are missing, I may include them later if I update this gist. one parses strings according to an apache log format of the same form found in the LogFormat directive, and the other one parses command line arguments):

Coroutines

A Curious Course on Coroutines and Concurrency

Coroutines are a 180 degree flipped version of a generator : coroutines work on only one item at a time, too, but data is pushed in them instead (and they can push the data in other coroutines), vs. the generators that pull data by themselves. Coroutines inherit generators' property of being fast.

The interesting difference here is that the coroutine can decide where the data is being sent to after its business with the data is done. Thus, you can implement some crazy cases of message routing.

Let's implement the tail -f thefile with a coroutine instead of a generator (also stolen from the presentation):

import time
def follow(thefile, target):
    thefile.seek(0,2)      # Go to the end of the file
    while True:
         line = thefile.readline()
         if not line:
             time.sleep(0.1)    # Sleep briefly
             continue
         target.send(line)

The main difference is that the follow() coroutine decides where to send out its data. It sends it out to the arbitrary destination "target" that is passed as last argument to it, so the choice of destination is not hardcoded. The target argument is assumed to be another coroutine (e.g. has a send() method and assigns the value of yield to a variable somewhere).

Let's build a router. Our router will send messages to different coroutines based on what first word is present on each line (abstracted : the @coroutine decorator is a means of prepparing the coroutine so that it's ready for a first call to its send() method -- see the presentation for the code):

@coroutine
def route(map):
    while True:
        line = (yield)
        first_word, rest=s.split(' ',1)
        if target = map.get(first_word):
            target.send(rest)

The above code lets us build arbitrary routing of messages based on simple matching on the first word of each line. It's sending lines to the target without the first word, but if you want to send the whole line, it can be changed in that effect quite trivially. The route() method expects a dictionary in the map argument in which keys are words against which we'd like to match, and values are coroutines (e.g. the different targets).

With coroutines you always need a termination coroutine (called a "sink" in the presentation). Since you're pushing messages along, the termination coroutine is quite simply recieving data but not pushing anywhere, thus terminating the computation on the item that's passed along.

Let's build two simple termination coroutines and glue all of that together with the router coroutine:

@coroutine
def print_line():
    while True:
        line = (yield)
        print line

@coroutine
def delay(interval=1):
    while True:
        dummy = (yield)  # we won't be doing anything with it but I'm still fetching an item in so that it can get discarded and we can move the next item through the pipeline.
        time.sleep(interval)  # remember the "time" library was imported earlier when declaring the "follow" coroutine

f = open("packet_flow", "r")
follow(f,
  router({
    'ACCEPT' : print_line(),
    'DROP': delay(),
  })

Combining both tools into an... Operating System!

Well, we've been playing around with code that can be paused and later resumed. Does that ring a bell? Yes! the author of the presentations mixes coroutines and generators in order to build a multitasking operating system: a part of code that executes some times, and that starts during that time different processes so that each of them can have little amounts of CPU time here and there, producing the illusion of a computer being doing multiple things simultaneously.

Actually, having processes yield execution to the OS is the oldest method of reclaiming control back in the OS. So, using the yield operation to represent points where our tasks give control back to the OS is just plainly natural.

Also, if you abstract most of modern Operating Systems' jobs, the process of managing tasks actually fits perfectly for the pipelining paradigm:

  • The OS reads in code and starts it.
  • the code gives back control to the OS, the os stores info about current work for later returning to it.
  • go back to square one and read in some more code, etc.

Conclusions

Generators, as well as coroutines are a simple means of expressing small iterative steps of computation of individual items. They are very fast when used in this paradigm (see benchmarks in both presentations).

Although, you must remember when using this paradigm, that each item goes through the whole pipeline before another one is passed along. So if you branch to too many code during the processing of an item, or if your processing code is too expensive in terms of processing time, then you might be slowing down all of your work. If you reach a point where your computation on each item is reasonably slow and expensive, you could start looking into ways of working on items in parallel. Watch out, though, you can't call a coroutine that's already send()ing an item.