This video is available to students only

Delayed Jobs

All of the Python code we have written so far has always been executing in the context of directly responding to an incoming web request. If we wanted to anything run anything that wasn't strictly necessary to render a page to user, (for example sending a confirmation email, or updating internal analytics), we would have to do it in a route and do it before we return a response.

If the code we are thinking of running could take a while to complete (like running a complicated query or talking to a slow API), our users would have to wait for that slow code to complete before being able to get a response.If the code might take longer than 60 seconds to run, we might not even be able to respond in time before the HTTP server in front of Flask cuts us off.

The goal of our web controller code should be focused on rendering a response to the end user, and any ancillary tasks like sending an confirmation email, would ideally be executed later in separate environment. To do that, we can setup a job queue and have a separate process act as "worker" to listen to the queue and execute the code.

To create the queue and workers, we'll use a Python library called RQ (which stands for Redis Queue). If you have used other Python frameworks, RQ is similar to Celery task queue, but simpler to work with. As you can infer from the name, this will also mean we will need to install and serve redis which is a fast in-memory data store that will store our queue.

Setting up Redis

Installing Redis

To install redis on OSX run: brew install redis and then run brew services start redis

On Windows (using WSL) and Linux:

sudo apt-get install redis-server
sudo service redis-server start

Installing RQ

Then we will add rq, a Flask Extension called Flask-RQ2, and a tool called rq-dashboard to our requirements.txt file.

Flask-RQ2
rq==1.4.3
rq-dashboard

(due to a testing incompatibility, we temporarily need to pin our rq version to 1.4.3)

Make sure your virtual environment is activate, then run pip install -r requirements.txt to install the two libraries.

Within yumroad/config.py, we need to tell RQ where to find redis, so we will add a configuration variable.

class BaseConfig:
    ...
    REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
    RQ_REDIS_URL = REDIS_URL
    RQ_DASHBOARD_REDIS_URL =  RQ_REDIS_URL

In our test environment, we don't want to delay tests and we do not even need to connect to redis, so under the test configuration, we will disable asynchronous job processing by adding RQ_ASYNC to False and change the type of connection.

class TestConfig:
    ...
    RQ_ASYNC = False
    RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'

Within extensions.py we can import RQ from flask-RQ2 and initialize it.

from flask_rq2 import RQ
...
rq2 = RQ()

Then within yumroad/__init__.py, import rq2

from yumroad.extensions import ( ... , rq2)

def create_app(environment_name='dev'):
    ...
    rq2.init_app(app)
    ...

Configuring & Processing Jobs

Using Flask-RQ2 we can designate specific functions as jobs using a decorator.

@rq2.job
def average(x, y):
   print("I am running")
   return (x + y)/2

Then to invoke the function, you can queue it up by using a queue method. In this case it would be average.queue(1, 2).

To see this in action, we can try it out from the Flask shell. For the sake of example, lets add this function called average to extensions.py. We can invoke it using RQ2 in by running flask shell.

>>> from yumroad.extensions import average
>>> average(1, 2)
I am running
1.5
>>> job = average.queue(1, 2)
>>> job
FlaskJob('375200f3-d380-4822-94d4-c18b6d88e914', enqueued_at=datetime.datetime(2020, 7, 27, 1, 31, 22, 156183))

To launch a worker, in a separate terminal session, with your virtual activated, run flask rq worker.

This will result in the following output.

$ flask rq worker
18:31:39 Worker rq:worker:a81e81d9f5104b91bd43ed58b4522aa7: started, version 1.5.0
18:31:39 *** Listening on default...
18:31:39 default: yumroad.extensions.average(1, 2) (375200f3-d380-4822-94d4-c18b6d88e914)
I am running
18:31:39 default: Job OK (375200f3-d380-4822-94d4-c18b6d88e914)
18:31:39 Result is kept for 500 seconds

If we wanted to later lookup the result, we can do so by having RQ2 fetch the result by getting the job by the job id.

>>> from yumroad.extensions import rq2
>>> job = rq2.get_queue().fetch_job('375200f3-d380-4822-94d4-c18b6d88e914')
>>> job.result
1.5

By default, jobs go into the default queue, but we can control what queues job go into to get more precision and control how many workers operate on which queue.

job = average.average.queue(3, 4, queue='important_math', timeout=60 * 5)

In this case, we would also need to tell our workers to listen to the important_math queue.

$ flask rq worker important_math default

Scheduling Jobs

To schedule jobs, you can use the built in scheduler in RQ.

average.schedule(timedelta(seconds=60), 1, 2)
average.schedule(datetime(2020, 4, 25, 11, 59, 59), 1, 2) # UTC

In addition to running a worker listening on queues, you would want to run the rq scheduler task so that RQ can keep an eye out for new jobs and queue them up when the time comes.

To run a scheduler, run the following command as well (in the background or in a different terminal window)

$ flask rq scheduler

Using a Job Queue for Emails

The process of sending an email is something that doesn't need to happen in order to render a page to users. In fact, if our mail server has downtime or is slow, that might result in the user seeing an error message when we should probably just retry sending the email. By putting the task of sending emails into a job queue, we will both be able to make our web responses faster and can also configure our job queue to retry failed jobs.

Start a new discussion. All notification go to the author.