Sunday, April 26, 2015

A Distributed Task Queue Using Celery, RabbitMQ, and Redis

Celery is a distributed system for processing messages in a task queue. It focuses on real-time processing, supports task scheduling, and allows out of band processing which can be started periodically or triggered by web requests. Coupled with a message broker, it's reasonably good at distributed computing and high availability.

With the right components in place, you have what is essentially a buffer when your systems fail and the ability to conduct operations on that buffer through the employment of multiple, concurrent workers. Today, we're going to be demonstrating some of these capabilities by making requests and seeing how tasks get queued and executed in real-time.

Prerequisites: You'll want to first get familiar with Vagrant. This guide was written for Mac OS X users.

Provisioning a VM with Vagrant

Copy the following content to a file named Vagrantfile and run vagrant up followed by vagrant ssh from the command line.

# -*- mode: ruby -*-
# vi: set ft=ruby :
# Vagrantfile API/syntax version. Don't touch unless you know what you're doing!
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config| = "ubuntu/trusty64"
  config.vm.provider "virtualbox" do |v|
    v.memory = 1024
    v.cpus = 2

Install Dependencies

Now that we've SSH'd into our VM, let's set it up with a few things.

sudo apt-get update
sudo apt-get install python-virtualenv rabbitmq-server redis-server git

Start Services

At this point you'll either want to open up a a shell window for each of the following services, or start a tmux session and have each running in a separate pane.

# start the redis-server

# start the rabbitmq-server

Note: If rabbitmq is already running in the background, stop it with sudo service rabbitmq-server stop, and execute rabbitmq-server, again. You'll want both of these services running in the foreground for this tutorial.

Prepare Our Tasks

cd ~
git clone
cd celery-example
virtualenv --no-site-packages venv
source venv/bin/activate

# install dependencies
pip install celery
pip install redis

Our Files

These are all the constants we will initialized Celery with. In this configuration, we're overriding the default amqp backend with redis.



Here, we initialize Celery, define our functions and decorate them with @app.task, which lets Celery know to run them asynchronously and across workers.

from __future__ import absolute_import
from celery import Celery

# initialize with defaults
app = Celery('tasks')

def fiblist(n):
  return list(fib(n))

def fib(n):
  a,b = 1,1
  for i in xrange(n-1):
    a,b = b, a+b
    yield a

Finally, we call our function 10000 times, with the delay method, which sends them to the queue for Celery. Tasks not called through this method execute directly in the current process.

from tasks import fiblist

for i in xrange(10000):

We're enqueueing a function as a task with delay, rather than calling it inline (synchronously). This effectively pushes that task into rabbitMQ. Celery will then pull that task out and work on it.

Executing Our Tasks

Now you should already have RabbitMQ and a Redis Server running in the foreground. You're now going to want to start a celery worker.

cd ~/celery-example

# don't forget to be in the virtualenv
source venv/bin/activate

# start worker with verbose logging
celery worker -A tasks -l INFO

Finally, you'll want to make the requests. Open up one more shell window and run the following commands:

cd ~/celery-example

# don't forget to be in the virtualenv
source venv/bin/activate

# run the script

No comments:

Post a Comment