David Cramer's Blog

Distributing Work in Python Without Celery

We’ve been migrating a lot of data to various places lately at DISQUS. These generally have been things like running consistancy checks on our PostgreSQL shards, or creating a new system which requires a certain form of denormalized data. It usually involves iterating through the results of an entire table (and sometimes even more), and performing some action based on that row. We never care about results, we just want to be able to finish as quickly as possible.

Generally, we’d just create a simple do_something.py that would look something like this:

1
2
for comment in RangeQuerySetWrapper(Post.objects.all()):
    do_something(comment)

Note: RangeQuerySetWrapper is a wrapper around Django’s ORM that efficiently iterates a table.

Eventually we came up with an internal tool to make this a bit more bearable. Mostly to handle resuming processes based on the last primary key, and to track status. It evolved into a slightly more complex, but still simple utility we called Taskmaster:

1
2
3
4
5
6
7
def callback(obj):
    do_something(obj)

def main(**options):
    qs = Post.objects.all()
    tm = Taskmaster(callback, qs, **options)
    tm.start()

This used to never be much of a problem. We’d just spin up some utility server and max the CPUs on that single machine to get data processed in a day or less. Lately however, we’ve grown beyond the bounds of what is reasonable for a single machine to take care of, and we’ve had to look towards other solutions.

Why Not Celery?

As with most people, we rely on Celery and RabbitMQ for distributing asyncrhonous tasks in our application. Unfortunately that’s not quite the ideal fit out of the box for us in these situations. The root of the problem stems from the fact that we may need to run through a billion objects, and without some effort, that would mean every single task would need to fit into a RabbitMQ instance.

Given that we can’t simply queue every task and then distribute them to some Celery workers, and even more so that we simply dont want to bring up Celery machines/write throwaway Celery code for a simple script, we chose to take a different route. That route ended up with a simple distributed buffer queue, built on the Python multiprocessing module.

Introducing Taskmaster

Taskmaster takes advantage of the remote management capabilities built into the multiprocessing module. This makes it very simple to just throw in a capped Queue and have workers connect, get and execute jobs, and control state via that single master process. In the end, we came up with an API looking something like this:

1
2
3
4
5
# spawn the master process
$ tm-master taskmaster.example --reset --key=foo --host=0.0.0.0:5050

# run a slave
$ tm-slave do_something:handle_job --host=192.168.0.1:5050

You’ll see the status on the master as things process, and if you cancel the process and start it again, it will automatically resume:

1
2
3
$ tm-master taskmaster.example --reset --key=foo --host=0.0.0.0:5050
Taskmaster server running on '0.0.0.0:5050'
Current Job: 30421 | Rate:  991.06/s | Elapsed Time: 0:00:40

Implementing the iterator and the callback are just as simple as they used to be:

1
2
3
4
5
6
7
def get_jobs(last=0):
    # ``last`` will only be passed if previous state was available
    for obj in RangeQuerySetWrapper(Post.objects.all(), min_id=last):
        yield obj

def handle_job(obj):
    print "Got %r!" % obj

Now under the hood Taskmaster will continue to iterate on get_jobs whenever the size of the queue is under the threshold (which defaults to 10,000 items). This means we have a constant memory footprint and can just spin slaves to process the data.

Taskmaster is still new, but if you’re in need of these kinds of one-off migration scripts, we encourage you to try it out and see if it fits.

Comments

Comments