All of the Python code we have written so far has always been executing in the context of directly responding to an incoming web request. If we wanted to anything run anything that wasn't strictly necessary to render a page to user, (for example sending a confirmation email, or updating internal analytics), we would have to do it in a route and do it before we return a response.
If the code we are thinking of running could take a while to complete (like running a complicated query or talking to a slow API), our users would have to wait for that slow code to complete before being able to get a response.If the code might take longer than 60 seconds to run, we might not even be able to respond in time before the HTTP server in front of Flask cuts us off.
The goal of our web controller code should be focused on rendering a response to the end user, and any ancillary tasks like sending an confirmation email, would ideally be executed later in separate environment. To do that, we can setup a job queue and have a separate process act as "worker" to listen to the queue and execute the code.
To create the queue and workers, we'll use a Python library called
RQ (which stands for Redis Queue). If you have used other Python frameworks,
RQ is similar to
Celery task queue, but simpler to work with. As you can infer from the name, this will also mean we will need to install and serve
redis which is a fast in-memory data store that will store our queue.
Setting up Redis
To install redis on OSX run:
brew install redis and then run
brew services start redis
On Windows (using WSL) and Linux:
sudo apt-get install redis-server sudo service redis-server start
Then we will add
rq, a Flask Extension called
Flask-RQ2, and a tool called
rq-dashboard to our requirements.txt file.
Flask-RQ2 rq==1.4.3 rq-dashboard
(due to a testing incompatibility, we temporarily need to pin our
rq version to 1.4.3)
Make sure your virtual environment is activate, then run
pip install -r requirements.txt to install the two libraries.
yumroad/config.py, we need to tell RQ where to find redis, so we will add a configuration variable.
class BaseConfig: ... REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0') RQ_REDIS_URL = REDIS_URL RQ_DASHBOARD_REDIS_URL = RQ_REDIS_URL
In our test environment, we don't want to delay tests and we do not even need to connect to redis, so under the test configuration, we will disable asynchronous job processing by adding
False and change the type of connection.
class TestConfig: ... RQ_ASYNC = False RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
extensions.py we can import RQ from
flask-RQ2 and initialize it.
from flask_rq2 import RQ ... rq2 = RQ()
from yumroad.extensions import ( ... , rq2) def create_app(environment_name='dev'): ... rq2.init_app(app) ...
Configuring & Processing Jobs
Flask-RQ2 we can designate specific functions as jobs using a decorator.
@rq2.job def average(x, y): print("I am running") return (x + y)/2
Then to invoke the function, you can queue it up by using a
In this case it would be
To see this in action, we can try it out from the Flask shell. For the sake of example, lets add this function called
extensions.py. We can invoke it using RQ2 in by running
>>> from yumroad.extensions import average >>> average(1, 2) I am running 1.5 >>> job = average.queue(1, 2) >>> job FlaskJob('375200f3-d380-4822-94d4-c18b6d88e914', enqueued_at=datetime.datetime(2020, 7, 27, 1, 31, 22, 156183))
To launch a worker, in a separate terminal session, with your virtual activated, run
flask rq worker.
This will result in the following output.
$ flask rq worker 18:31:39 Worker rq:worker:a81e81d9f5104b91bd43ed58b4522aa7: started, version 1.5.0 18:31:39 *** Listening on default... 18:31:39 default: yumroad.extensions.average(1, 2) (375200f3-d380-4822-94d4-c18b6d88e914) I am running 18:31:39 default: Job OK (375200f3-d380-4822-94d4-c18b6d88e914) 18:31:39 Result is kept for 500 seconds
If we wanted to later lookup the result, we can do so by having RQ2 fetch the result by getting the job by the job id.
>>> from yumroad.extensions import rq2 >>> job = rq2.get_queue().fetch_job('375200f3-d380-4822-94d4-c18b6d88e914') >>> job.result 1.5
By default, jobs go into the default queue, but we can control what queues job go into to get more precision and control how many workers operate on which queue.
job = average.average.queue(3, 4, queue='important_math', timeout=60 * 5)
In this case, we would also need to tell our workers to listen to the
$ flask rq worker important_math default
To schedule jobs, you can use the built in scheduler in RQ.
average.schedule(timedelta(seconds=60), 1, 2) average.schedule(datetime(2020, 4, 25, 11, 59, 59), 1, 2) # UTC
In addition to running a worker listening on queues, you would want to run the
rq scheduler task so that RQ can keep an eye out for new jobs and queue them up when the time comes.
To run a scheduler, run the following command as well (in the background or in a different terminal window)
$ flask rq scheduler
Using a Job Queue for Emails
The process of sending an email is something that doesn't need to happen in order to render a page to users. In fact, if our mail server has downtime or is slow, that might result in the user seeing an error message when we should probably just retry sending the email. By putting the task of sending emails into a job queue, we will both be able to make our web responses faster and can also configure our job queue to retry failed jobs.