19 Jan 2021

feedDjango community aggregator: Community blog posts

Scheduling Celery Tasks in the (far) future

We used to make use of the fact that a celery task can be scheduled at some time in the future to auto-punch-out staff members who failed to punch out 24 hours after their shift started. This was as simple as scheduling a task with an `eta=86400`. However, as Adam points out [here](https://adamj.eu/tech/2020/02/03/common-celery-issues-on-django-projects/) (see number 5). This is not a great idea. For one, it will bog down your celery workers (and make them use a bunch more memory). Secondly, something Adam doesn't mention is that if your queue is corrupted, then all of those future tasks will not be executed. Discussing this in IRC today, I thought of a simple mechanism for scheduling these tasks and processing them at some point after they are due. We will start with a model to store our task: {% highlight python %} class ScheduledTask(models.Model): task_name = models.TextField() task_args = models.JSONField(default=list) task_kwargs = models.JSONField(default=dict) due_date = models.DateTimeField() objects = ScheduledTaskQuerySet.as_manager() @property def task(self): module, task = task_name.rsplit('.', 1) return getattr(importlib.import_module(module), task) def execute(self): self.task.apply_async(args=self.args, kwargs=self.kwargs) {% endhighlight %} We have a custom queryset defined, that allows us to see which tasks are due using a nice queryset method: {% highlight python %} class ScheduledTaskQuerySet(models.query.QuerySet): def due(self): return self.due_date < datetime.datetime.utcnow() def schedule(self, task, when, *args, **kwargs): self.create(task_name=task_name, task_args=args, task_kwargs=kwargs, due_date=when) {% endhighlight %} Finally, we need a task that will enqueue the due tasks: {% highlight python %} @app.task def enqueue_due_tasks(): for task in ScheduledTask.objects.due(): task.execute() task.delete() {% endhighlight %} As it stands, with this code, to schedule a task, you need to create a model instance: {% highlight python %} ScheduledTask.objects.schedule( 'foo.tasks.bar', datetime.datetime(2525, 11, 11, 9, 30), 'x', y='z' ) {% endhighlight %} But, it would be nice if we could use the task to schedule itself: {% highlight python %} foo.tasks.bar.schedule(args=[], kwargs={}, eta=X, countdown=Y) {% endhighlight %} Or, even better: {% highlight python %} foo.tasks.bar.s(arg, kwarg=value).schedule(datetime.timedelta(days=365)) {% endhighlight %} The first one we should be able to do by using custom tasks (and implementing a `schedule` method): {% highlight python %} class Scheduled(celery.Task): def schedule(self, *, args=None, kwargs=None, eta=None, countdown=None): if not eta and not countdown: raise ValueError('One of eta and countdown must be supplied') if eta and countdown: raise ValueError('Only one of eta and countdown must be supplied') if eta: ScheduledTask.objects.schedule(self.name, eta, *args, **kwargs) else: ScheduledTask.objects.schedule( self.name, datetime.datetime.utcnow() + datetime.timedelta(countdown), *args, **kwargs ) {% endhighlight %} Then, as long as a task is defined as using the base class, we can schedule it: {% highlight python %} @app.task(base=Schedule) def task_name(x, y=None): pass {% endhighlight %} But what about mucking with the `.s()` or `.signature()` calls? Now we are getting a bit experimental, but it still might be fun: {% highlight python %} from celery.canvas import Signature def schedule(self, when=None): if when: if isinstance(when, datetime.timedelta): when = datetime.datetime.utcnow() + when else: if self.options.countdown: when = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.options.countdown) elif self.options.eta: when = self.otions.eta ScheduledTask.objects.create( task_name=self.task, task_args=self.args, task_kwargs=self.kwargs, due_date=when, ) Signature.schedule = schedule {% endhighlight %} This is totally mostly untested, and I'm not sure it's a really good idea. A better idea might be to have a backend that doesn't even tell the workers about the tasks until after they are due...that way you would not have to duck-punch celery at all.

19 Jan 2021 3:29pm GMT

feedSymfony Blog

Moving