Fork me on GitHub

Tasks - Задачи в Celery

источник: https://docs.celeryproject.org/en/stable/userguide/tasks.html

Basics

Создание задачи используя декоратор task()

from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

Важно, если используются различные декораторы в комбинации с декоратором task, то он должен быть применен последним, т. е. первая в списке.

@app.task
@decorator2
@decorator1
def add(x, y):
    return x + y

Наследование Task

import celery

class MyTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@app.task(base=MyTask)
def add(x, y):
    raise KeyError()

Логгирование

Воркеры автоматически логгируют, но можно вручную определить логгирование.

Хорошей практикой создание основного логгера для всех своих задач в модуле:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Повторение задачи

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

Повторение задачи используя задержку

@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        something_raising()
    except Exception as exc:
        # overrides the default delay to retry after 1 minute
        raise self.retry(exc=exc, countdown=60)

Статусы задач

Встроенные статусы

PENDING

Task is waiting for execution or unknown. Any task id that’s not known is implied to be in the pending state.

STARTED

Task has been started. Not reported by default, to enable please see app.Task.track_started.

  • meta-data

pid and hostname of the worker process executing the task.

SUCCESS

Task has been successfully executed.

  • meta-data

result contains the return value of the task.

  • propagates

Yes

  • ready

Yes

FAILURE

Task execution resulted in failure.

  • meta-data

result contains the exception occurred, and traceback contains the backtrace of the stack at the point when the exception was raised.

  • propagates

Yes

RETRY

Task is being retried.

  • meta-data

result contains the exception that caused the retry, and traceback contains the backtrace of the stack at the point when the exceptions was raised.

  • propagates

No

REVOKED

Task has been revoked.

  • propagates

Yes

Task classes

@app.task
def add(x, y):
    return x + y

=>

class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]

Создание экземпляра

from celery import Task

class NaiveAuthenticateServer(Task):

    def __init__(self):
        self.users = {'george': 'password'}

    def run(self, username, password):
        try:
            return self.users[username] == password
        except KeyError:
            return False

сохраняем состояние между процессами.

Также можно использовать кеш ресурсов, например, класс задачи кеширует соединение с БД:

from celery import Task

class DatabaseTask(Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

можно добавлять для каждой задачи:

@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        process_row(row)

Вызов задачи

источник: https://docs.celeryproject.org/en/stable/userguide/calling.html

Quick Cheat Sheet

  • T.delay(arg, kwarg=value) Star arguments shortcut to .apply_async. (.delay(args, *kwargs) calls .apply_async(args, kwargs)).
  • T.apply_async((arg,), {'kwarg': value})
  • T.apply_async(countdown=10) executes in 10 seconds from now.
  • T.apply_async(eta=now + timedelta(seconds=10)) executes in 10 seconds from now, specified using eta
  • T.apply_async(countdown=60, expires=120) executes in one minute from now, but expires after 2 minutes.
  • T.apply_async(expires=now + timedelta(days=2)) expires in 2 days, set using datetime.

Linking (callbacks/errbacks)

@app.task
def add(x, y):
    return x + y
@app.task
def error_handler(request, exc, traceback):
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          request.id, exc, traceback))
add.apply_async((2, 2), link=add.s(16))     
add.apply_async((2, 2), link_error=error_handler.s())
add.apply_async((2, 2), link=[add.s(16), other_task.s()])

Проектирование потоков worker'ов

источник: https://docs.celeryproject.org/en/stable/userguide/canvas.html

social