Django Periodic Task with Celery Task Queue

Task Queue? Periodic Tasks? What are they?

Task Queue is a way of organizing work or tasks in programs asynchronously, without a user’s request, and distribute them to some threads or machines. Whenever the program needs to execute a background task (i.e. a task which does not need to be displayed to users), the program would add the task to the task queue. These tasks can then be executed by worker services immediately or later. When these tasks are scheduled, they are called scheduled tasks, and when they are scheduled to be executed periodically, they are called periodic tasks. This queue is important because, sometimes, we need to execute tasks in the background so that they would not delay the interface process and make users wait. We could also use this system to schedule some periodic tasks, such as deleting and updating objects after some period of time, which would be discussed more in this article.

What is Celery?

Celery is a python-based distributed task queue which provides a simple, reliable, and flexible system that supports real-time processing and task scheduling. Celery uses a broker to link clients to workers. A broker is an engine that acts as a mediator who receive messages from task queue and then deliver them to the worker. Every time a client has a new task to be done, Celery places a message to the task queue, which will then be delivered to the worker by the broker. To send and receive messages, Celery needs message transport broker, such as RabbitMQ and Redis. Celery system could use multiple brokers and workers, so it would have high availability and horizontal scaling. It could also work in single machine or multiple machines. Celery’s last stable release is 4.2, but unfortunately it does not support Windows yet. For this article, we would use Celery 3.1.25 that has Windows support. For this version, we need Python >== 2.5. Don’t forget to install RabbitMQ or Redis also (we use Redis==2.10.6 for this article).

How do I get started with Celery?

In this article, we would not talk about how to start a new Django project nor about how to make models in Django, because we assume that you have already had the knowledge to do that. If you have not, you could go here first before reading any further. After we start a new Django project, we need to make a Django application, for example we call it sample_app. We also need to install Celery and Redis:

pip install celerypip install redis

After installing Celery and Redis, don’t forget to set REDIS_HOST in your settings.py file. Assuming that we use Redis in the same machine with our Django application, your code should be:

REDIS_HOST = ‘localhost’

I’m ready! How do I set Celery up?

Create a file celery.py inside your root folder (in the same folder with your settings.py). On top of the file, we need to import settings to get the REDIS_HOST that we set before:

from django.conf import settingsredis_host = settings.REDIS_HOST

We also need to import celery and crontab to schedule tasks on top of our file:

from celery import Celeryfrom celery.schedules import crontab

After that, we need to configure the broker and backend for our Celery app:

app = Celery(‘project’,broker=’redis://’ + settings.REDIS_HOST + ‘:6379’,backend=’redis://’ + settings.REDIS_HOST + ‘:6379’,include=[‘sample_app.tasks’])

Don’t worry about sample_app.tasks, we would make it later. After we configure the broker and backend, we also need to do some custom configuration to schedule our tasks. We could use this code to set our application’s task and result serializer to json and also our app’s task result lifespan to one hour (3600 seconds) or another amount of time.

app.conf.update(CELERY_TASK_SERIALIZER=’json’,CELERY_RESULT_SERIALIZER=’json’,CELERY_TASK_RESULT_EXPIRES=3600)

For our simple app, this base configuration is enough, therefore, if you want to explore more about Celery’s configurations you could go here. To make a scheduler, we need to add CELERYBEAT_SCHEDULE and CELERY_TIMEZONE property to our app.conf.update. Using those properties, we could set a scheduler for our tasks and set our timezone to UTC. For example, we have two tasks called task1 and task2 which is located in our sample_app.tasks file. We want to run task1 on midnight, so we set its schedule to crontab(minute=0, hour=0). For the second task, imagine that we want it to run every hour, so we set its schedule to crontab(minute=0, hour=’*/1′). For more variations of using crontab, you could refer here. After above configurations, your app.conf.update should look like this:

app.conf.update(CELERY_TASK_SERIALIZER=’json’,CELERY_RESULT_SERIALIZER=’json’,CELERY_TASK_RESULT_EXPIRES=3600,CELERY_TIMEZONE=’UTC’,CELERYBEAT_SCHEDULE = {‘task1’: {‘task’: ‘sample_app.tasks.task1’,‘schedule’: crontab(minute=0, hour=0),},‘task2’: {‘task’: ‘sample_app.tasks.task2’,‘schedule’: crontab(minute=0, hour=’*/1'),},},)

Last but not least, after configuring our Celery application, we need to set our __main__, just like other Python applications.

if __name__ == ‘__main__’:app.start()

So my Celery is good to go, how do I give it tasks?

In the previous section, we have finished our Celery application configuration. Now, create the tasks that Celery need to execute. First of all, we have to create a file named tasks.py in our sample_app directory. After that, we need to import our Celery application that we have configured before:

from root.celery import app

Then, let’s make our first task. For example, we already had a model called SimpleModel that has expired attribute, and we want to have a periodical delete task that runs every midnight and could delete SimpleModel objects that is already expired. For this, we need to import our models, pytz, and datetime:

from chat.models import ChatMessagefrom datetime import datetimeimport pytz

After that, we just build our task1 function, which gets all objects from SimpleModel and checks whether they are expired or not. If it is expired, then the task would delete it. Below is a sample code to do that task:

@app.taskdef task1():objs = SimpleModel.objects.all()now = datetime.now()timezone = pytz.timezone(“UTC”)now_localize = timezone.localize(now)for obj in objs:if (obj.expired < now_localize):obj.delete()

That is our first task. For the second task, for example, we have the age attribute in our SimpleModel which we want to be updated every hour. Assuming we don’t care when was the object created and just want to add the age up every hour, below is the sample code:

@app.taskdef task2():objs = SimpleModel.objects.all()for obj in objs:obj.update(age = obj.age + 1)

Setup check, Tasks check. How do I get Celery to run?

To run our celery application, we need two commands. The first command is used to start our Celery worker:

celery -A <your-main-directory-name> worker

In this example, we use root as our main directory name (main directory is where we placed our settings.py and celery.py file). After that command, the second command we need to run in another terminal is:

celery -A <your-main-directory-name> beat

This command would run Celery beat, which will send a message to our worker when the time has come to execute our tasks. If you are using an Ubuntu/Linux operating system, you could combine those two commands together into one command:

celery -A <your-main-directory-name> worker -B

Conclusion

Sometimes, when we develop our Python-based application, we need to execute tasks periodically on the background. This kind of task does not need to be displayed to users, but it needs to be done regularly, such as deleting and updating objects in our models. Celery is one Python-based simple task queue system that could help us to make that happen without unnecessary complexity.

References

Was this post helpful?

9cv9
9cv9
We exist for one purpose: To educate the masses and the world in HR, Coding and Tech.

Related Articles