mjpieters commented on issue #12519:
URL: https://github.com/apache/airflow/issues/12519#issuecomment-733118659


   My problem was a little different, but the same approach could work. 
   
   You'd need to plug into Celery and direct jobs to workers with capacity 
using *per-worker queues*. To make this work across schedulers and workers, 
I've so far used Redis to share bookkeeping information and to ensure 
consistency when multiple clients update that information.
   
   I'd set each worker capacity to its concurrency level, and default tasks to 
a cost of 1, your expensive tasks can then use larger values to reserve a 
certain 'chunk' of your worker resources.
   
   You can re-route tasks in Celery by hooking into the [`task_routes` 
configuration](https://docs.celeryproject.org/en/stable/userguide/configuration.html#task-routes).
 If you set this to a *function*, that function is called for every routing 
decision.  In Airflow, you can set this hook by supplying a custom 
`CELERY_CONFIG` dictionary:
   
   ```python
   from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
   
   CELERY_CONFIG = {
       **DEFAULT_CELERY_CONFIG,
       "task_routes": "[module with route_task].route_tasks",
   }
   ```
   
   and in a separate module (as celery should not try to import it until after 
configuration is complete and the task router is actually needed):
   
   ```python
   def route_tasks(name, args, kwargs, options, task=None, **kw):
       """Custom task router
   
       name: the Celery task name. Airflow has just one Celery task name.
       args: positional arguments for the task. Airlow: args[0] is the args 
list for the 'airflow task run' command,
           so set to ['airflow', 'task', 'run', dag_id, task_id, 
execution_date, ...]
       kwargs: keyword arguments for the task. Airflow: always empty
       options: the Celery task options dictionary. Contains existing routing 
information (such as 'queue').
           returned information is merged with this dictionary, with this 
dictionary *taking precedence*.
       task: Celery task object. Can be used to access the Celery app via 
task.app, etc.
       
       If the return value is not falsey (None, {}, ''), it must either be a 
string (name of a queue), or a dictionary
       with routing options.
   
       *Note*: Airflow sets a default queue in options. Delete it from that 
dictionary if you want to redirect
       a task.
   
       """
   ```
   
   and in `airflow.cfg`, set `celery_config_options`:
   
   ```ini
   [celery]
   # ...
   celery_config_options = [module with the celery config].CELERY_CONFIG
   ```
   
   You can then use [Celery signal 
handlers](https://docs.celeryproject.org/en/stable/userguide/signals.html) to 
maintain worker capacity. It'll depend on how you get your task 'size' data 
what hooks you'd need to use, but if I assume a hardcoded map then you'd use:
   
   * 
[`celeryd_after_setup`](https://docs.celeryproject.org/en/stable/userguide/signals.html#celeryd-after-setup)
 to generate a worker queue name to listen to.
   * 
[`worker_ready`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-ready)
 to add the worker queue to Redis with total worker capacity.
   * 
[`worker_shutting_down`](https://docs.celeryproject.org/en/stable/userguide/signals.html#worker-shutting-down)
 to remove the worker from Redis altogether.
   * 
[`task_postrun`](https://docs.celeryproject.org/en/stable/userguide/signals.html#task-prerun)
 to return task size back to the worker capacity level.
   
   The `task_router` is responsible for reducing the available worker capacity, 
you want to do this as soon as you make a routing decision so no further tasks 
are sent to a worker that is already committed to a workload.
   
   In Redis, store the capacity in a [sorted 
set](https://redis.io/commands/zadd) (`ZADD worker-capacity [worker-queue] 
[worker capacity]`) so you can quickly access the worker with the most 
capacity. Use Redis 5.0 or newer so you can use 
[`ZPOPMAX`](https://redis.io/commands/zpopmax) to get the least-loaded worker 
available. Unfortunately there is no way to both get the worker with most 
capacity *and* decrement its capacity in one command, so use a [pipeline with 
`WATCH`](https://github.com/andymccurdy/redis-py#pipelines):
   
   ```python
   def best_worker(redis, task_size):
       worker_queue, capacity = None, 0
       with r.pipeline() as pipe:
           while True:
               try:
                   pipe.watch(WORKER_CAPACITY_KEY)
                   best_worker_cap = pipe.zpopmax(WORKER_CAPACITY_KEY)
                   if best_worker_cap:
                       worker_queue, capacity = best_worker_cap[0]
   
                   if capacity < task_size:
                       # no workers with capacity available for this task
                       # decide what you need to do in this case. This returns 
None
                       # to use the default queue. We can't wait forever here 
as Airflow will
                       # time out the celery executor job eventually.
                       return None
                   
                   pipe.multi()
                   # put the worker back into the sorted set, with adjusted 
capacity.
                   pipe.zadd(WORKER_CAPACITY_KEY, worker_queue, capacity - 
task_size)    
                   pipe.execute()
                   return worker_queue.decode()
              except WatchError:
                  # something else altered the WORKER_CAPACITY_KEY sorted set
                  # so retry.
   ```
   
   Finally, if you are already using Redis is as your Celery broker, you can 
reuse the Celery connection pool for these tasks. This would differ if you are 
using RabbitMQ, you'll have to maintain your own connection pool or use a 
different method of sharing this information between different components.
   
   To reuse the connection pool, get access to the celery app. This will depend 
on the specific signal handler or if this is inside the router function. In the 
latter, the `task` object has an `app` attribute, for example, while the worker 
signal hooks will be passed in a `worker` instance object, which again has an 
`app` attribute. Given the Celery app in `app`, you can then use:
   
   ```python
   with app.pool.acquire().channel() as channel:
       redis = channel.client
   ```
   
   You *may* have to increase the Celery `broker_pool_limit` configuration 
however, depending on how busy your cluster gets.
   
   One of these days I may actually write that blog post on this subject I was 
planning, but the above will have to do for now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to