sperea.es
Published on

Asynchronous tasks and message queues with Django + Celery + RabbitMQ

Authors

Celery is an asynchronous task queue based on distributed message passing. Task queues are used as a strategy to distribute the workload among different microservices. In this tutorial, I will explain how to install and configure Celery + RabbitMQ to execute asynchronous tasks in a Django application.

To work with Celery, we also need to install RabbitMQ because Celery requires an external solution for sending and receiving messages. These solutions are called message brokers. Currently, Celery supports RabbitMQ, Redis, and Amazon SQS as message broker solutions.

RabbitMQ

Sometimes we want to communicate our system with other internal or external systems. However, we also want to do it in an orderly, fast, and reliable manner. This is where message queues systems come into play. They arise from the need to implement an intermediary in the communication between two systems.

RabbitMQ is that intermediary. It is open-source, cross-platform software that implements the Advanced Message Queuing Protocol (AMQP) standard. It manages message queues and their negotiation, and therefore, it can be classified as a messaging middleware or more commonly known as a message broker.

The messages in RabbitMQ follow this flow:

  • The producer system generates the message and sends it to our RabbitMQ server.
  • RabbitMQ receives the message and routes it to its corresponding queue.
  • The message stays in the queue until the consumer receives it and confirms the receipt.
  • The consumer system processes the message.

Configuring our RabbitMQ Server

I recommend dockerizing RabbitMQ for installation. However, if you cannot or do not want to use Docker, the easiest way to install RabbitMQ on Ubuntu is as follows:

bashCopy code

# Install RabbitMQ server
sudo apt-get install rabbitmq-server

# Create a user with its password
sudo rabbitmqctl add_user celery_user celery_password
sudo rabbitmqctl set_user_tags celery_user administrator # Assign permissions

# Create permissions
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags celery_user mytag
sudo rabbitmqctl set_permissions -p myvhost celery_user ".*" ".*" ".*"

Configuring Celery

Before anything else, we need to include the following packages in our requirements.txt:

  • celery
  • amqp

We need to add the following configuration to the settings.py file of our project. Note that for security reasons, the password for our celery_user is retrieved from the environment variables of our Ubuntu, so don't forget to include it beforehand:

BROKER_URL = 'amqp://celery_user:celery_password@localhost:5672'
CELERY_RESULT_BACKEND = 'amqp://celery_user:celery_password@localhost:5672'
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = 'MyQueue'
CELERY_TIMEZONE = 'Europe/Madrid'
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

Now, create a celery.py file in the root folder of our app with the following content:

from __future__ import absolute_import, unicode_literals
import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery(project_name_celery)

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
   print('Request: {0!r}'.format(self.request))

This code does the following:

  • Creates an instance of a Celery application with a given name (project_name_celery).
  • Loads the configuration we included earlier in our settings.py.
  • Activates the ability to discover new tasks.

To enable our Django application to see Celery, we also need to include the following in the init.py file in the root folder of our app:

from .celery import app as celery_app

__all__ = ['celery_app']

Creating our first asynchronous task

To create our first task, we can create a tasks.py file in the desired app and include the following code. Note the annotation used when declaring the task:

from django.conf import settings
from project.celery import app as celery_app
import time

logger = logging.getLogger(__name__)

@celery_app.task
def hello_world(nombre):
   time.sleep(30)
   print("Hola {nombre}")

Now, we can call it from another part of our code like this:

from project.app.tasks import hello_world

def some_function(instance=None, created=False, **kwargs):

   hello_world.s(nombre="manolo").apply_async()

Scheduling Tasks (Cron)

We could schedule tasks within our Django application using the system's cron. However, it is much more flexible to do it with Celery Beats.

Celery Beats is an add-on for Celery that allows us to schedule asynchronous tasks in a similar way to using cron.

But why not use cron? Let's see some advantages of using Celery over cron:

  • Celery + Celerybeat has a finer granularity than cron. Cron cannot be executed more than once per minute, while Celery can.
  • With a cron line, you have to call a script or a single command with an absolute path and user information. Celery calls Python functions and can use all available Django libraries. You just need to write code integrated into our application.
  • Celery is more suitable when you need to coordinate jobs across multiple machines, ensuring that jobs are executed even when machines are added to or removed from a working group.
  • Celery has the ability to set timeouts for jobs, define multi-step jobs with graphical style instead of the typical dependency flow.
  • Celery allows for a single repository of programming logic that operates in the same way on multiple operating systems and versions.
  • Scheduling an existing Celery task is incredibly simple. You just need to use the @periodic_task annotation in the task's header, like this:
@periodic_task(run_every=(crontab(minute='*/1')), name="hello_how_are_you")
def hello_how_are_you():
    logger = logging.getLogger("django")
    logger.setLevel(logging.INFO)
    logger.info('TASK EXECUTED')

Running All of This

We need to run two workers: one for running Celery and another for managing scheduled tasks. In development, we would do it separately with these two calls:


celery --app=MyAPP worker --loglevel=info -Q MyQueue

celery -A MyAPP beat -l info

However, in production and for performance reasons, it is recommended to "daemonize" these calls. For that, you can use a supervisor configuration for the Celery worker in /etc/supervisor/conf.d/ called celeryworker.conf:

[program:celeryworker]

; Set full path to celery program if using virtualenv
command=/bin/celery worker -A mycelery --loglevel=INFO

; The directory to your Django project
directory=

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=/log/celeryworker-supervisor.log

; Put process stderr output in this file
stderr_logfile=/log/celeryworker-supervisor.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; process' configuration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds that the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long-running tasks.
stopwaitsecs=600

; When resorting to send SIGKILL to the program to terminate it,
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; If your broker is supervised, set its priority higher
; so it starts first
priority=998

Similarly, create another one for the Celery Beat worker called celerybeat.conf:

[program:celerybeat]

; Set full path to celery program if using virtualenv
command=/bin/celery beat -A mycelery --loglevel=INFO

; The directory to your Django project
directory=

; If supervisord is run as the root user, switch users to this UNIX user account
; before doing any processing.
user=

; Supervisor will start as many instances of this program as named by numprocs
numprocs=1

; Put process stdout output in this file
stdout_logfile=/log/celerybeat-supervisor.log

; Put process stderr output in this file
stderr_logfile=/log/celerybeat-supervisor.log

; If true, this program will start automatically when supervisord is started
autostart=true

; May be one of false, unexpected, or true. If false, the process will never
; be autorestarted. If unexpected, the process will restart when the program
; exits with an exit code that is not one of the exit codes associated with this
; process' configuration (see exitcodes). If true, the process will be
; unconditionally restarted when it exits, without regard to its exit code.
autorestart=true

; The total number of seconds that the program needs to stay running after
; a startup to consider the start successful.
startsecs=10

; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long-running tasks.
stopwaitsecs=600

; When resorting to send SIGKILL to the program to terminate it,
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true

; If your broker is supervised, set its priority higher
; so it starts first
priority=999

Now, update the supervisor:

sudo supervisorctl reread
sudo supervisorctl update

With the following command, you can manage the start and stop of each worker's daemon:

sudo supervisorctl stop celeryworker
sudo supervisorctl start celeryworker
sudo supervisorctl status celeryworker

Bonus Track:

Once all of this is working, we can monitor all our tasks with Flower.

To install it, we only need to do this:

bashCopy code

pip install flower
flower --port=5555

And we can access it through the URL: localhost:5555