前回の記事でdjangoとredisを繋げることに成功したので今回はそのredisと、新しくceleryというモジュールを組み合わせて非同期処理を組み込んでいきたいと思います。
環境
Django: 2.2.4
Python: 3.6.9
Celery: 4.3.0
docker-compose.yml
僕は開発環境はdocker-composeで作っています。現在は Django、MySQL、Redisという3つのコンテナを動かしています。
version: '3.7'
services:
django:
restart: always
build: .
volumes:
- ./:/opt/apps
depends_on:
- db
- redis
command: /bin/sh -c "cd /opt/apps; pip install -r requirements.txt; python manage.py migrate; python manage.py runserver 0:8000"
ports:
- 8000:8000
db:
image: mysql:5.7
environment:
MYSQL_DATABASE: app
MYSQL_USER: root
MYSQL_ROOT_PASSWORD: パスワード
tty: true
ports:
- 3306:3306
command: mysqld --character-set-server=utf8mb4 --explicit_defaults_for_timestamp=true
redis:
image: redis:latest
ports:
- 6379:6379
tty: true
Dockerfile
FROM python:3.6.9-alpine3.10
ENV APP_PATH /opt/apps
COPY requirements.txt $APP_PATH/
RUN pip install --no-cache-dir -r $APP_PATH/requirements.txt
WORKDIR $APP_PATH
Celery導入
では、この環境にCeleryを入れていきましょう。
requirements.txt
requirements.txtに下記を追記する。
celery==4.3.0
celery.py
まずはメインアプリケーションのディレクトリ(settings.pyがあるところ)配下に、 celery.py
を作ります。
# celery.py
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'アプリケーションの名前(ディレクトリ名)')
app = Celery('アプリケーションの名前(ディレクトリ名)')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
tasks.py
任意のアプリケーションのディレクトリ配下に tasks.py
を作成します。このファイルに書かれた処理が非同期で実行されます。ここでは `app`というディレクトリ配下に作りたいと思います。
# app/tasks.py
from celery.task import task
@task
def say_hello():
print("start say_hello")
print("hello")
print("end say_hello")
本来であれば非同期にするという判断した処理は大きくなるかと思いますが、今回はサンプルということで文字列を出力するだけにします。
@task
メソッドの前に `@task`というのがあるのにきづきましたでしょうか。これはこのメソッドは celeryタスクですよ、というのを明示的に示しているものになります。
settings.py
settings.pyにceleryを使うための設定を書いていきます。
# settings.py
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZZER = 'json'
BROKER_URL = 'redis://redis:6379'
カスタムコマンド
ではceleryに非同期処理のキューを積むカスタムコマンドを作ります。
# app/management/commands/set_say_hello_queue.py
from django.core.management.base import BaseCommand
from app.tasks import say_hello
class Command(BaseCommand):
def handle(self, *args, **options):
print("====== START =================")
say_hello.apply_async(args=(), queue='say_hello')
print("====== END =================")
さて、これで準備が整いました。
実行
celery実行
celery -A プロジェクト名 worker -Q say_hello -c 2
このコマンドの説明をします。
Aオプション
プロジェクト名を入れます
Qオプション
さばきたいキューの名前を入れます
Cオプション
Cは concurrencyの略で、最大並列数を指定します。ここに1を入れると並列数は1、つまり直列処理になります。
ここに10を入れると最大10個のスレッドで並列処理が実行されます。
このコマンドを実行して、下記のようになれば準備OKです。(今は一旦warningは無視する)
# celery -A プロジェクト名 worker -Q say_hello -c 2
/usr/local/lib/python3.6/site-packages/celery/platforms.py:801: RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@54880b09a5b0 v4.3.0 (rhubarb)
---- **** -----
--- * *** * -- Linux-4.9.93-linuxkit-aufs-x86_64-with 2019-09-23 03:33:39
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: プロジェクト名:0x7ff3f70f8a20
- ** ---------- .> transport: redis://redis:6379//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> say_hello exchange=say_hello(direct) key=say_hello
[2019-09-23 03:33:40,227: WARNING/MainProcess] /usr/local/lib/python3.6/site-packages/celery/fixups/django.py:202: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
カスタムコマンド実行
それではキューを積むコマンドを打ってみましょう。
python manage.py set_say_hello_queue
すると、celeryでは下記のように処理がされることを確認してください。
[2019-09-23 03:35:05,562: WARNING/ForkPoolWorker-2] start say_hello
[2019-09-23 03:35:05,564: WARNING/ForkPoolWorker-2] hello
[2019-09-23 03:35:05,564: WARNING/ForkPoolWorker-2] end say_hello
積んだキューがceleryによって拾い上げられ、処理されることを確認できました!
今後はcelery実装で詰まった点や、celeryのキューを可視化するツールについてまとめていこうと思います。最後までお読みいただきありがとうございます。