This is what I (tentatively) suggested in AIP-72 ``` def queue_activity( self, **, # Kwargs only to allow this to change and evolve easier over time without breaking changes kind: ActivityType, # For now just "ExecuteTask", "RunCallback", but allows for "ParseDAGFile" etc. in future # Contains TI for "ExecuteTask", so executor can get ti.queue out of this. context: dict[str, Any], ): ... ```
So yeah, if this is all we're talking about makes sense to me for 3.0 - not sure we can do anything for 2.x though. -ash On 17 November 2024 17:48:16 GMT, Jens Scheffler <j_scheff...@gmx.de.INVALID> wrote: >Hi Ash, > >exactly this is the "minor" gap. In... > >https://github.com/apache/airflow/blob/main/airflow/executors/base_executor.py#L522 > >... the Scheduler passes only the "executor_config" field and not the >full TaskInstance. So an executor does not have access to "pool_slots" >of the task. PR https://github.com/apache/airflow/pull/44016 is >proposing to pass down the TaskInstance that the scheduler obvioulsy has. > >But as it is public API it would "Break" something... but not very >technically complex. > >Jens > >On 17.11.24 18:36, Ash Berlin-Taylor wrote: >> I'm very wary of adding anything extra to core executor interface at this >> stage/for 3.0, both in terms of added complexity with already a lot of >> interdependent AIPs and on terms of complexity for 3.0. >> >> That said: doesn't the executor already get passed the full TI so can access >> any property of the ti which includes number of slots? I.e. I'm not sure >> what core changes we need. >> >> Or if it doesn't do this yet it will next week as part of my AIP 72 changes. >> >> On 17 November 2024 17:25:35 GMT, Jens Scheffler >> <j_scheff...@gmx.de.INVALID> wrote: >>>> Should the Edge Executor just come up with it's own one-off/bespoke >>> solution? Should we update the base-executor interface itself to support >>> this as a first class feature across all executors? >>> >>> I think it should NOT be Edge Executor as being a "unicorn". It would >>> make sense to open a "Pool Slot Aware" execution for all cases where no >>> dedicated back-end resource allocation can be configured. So we might >>> need to distinguish two scheduler types: >>> >>> Category A) Not able (today) to respect task size (maybe a good name >>> needs to be found) - these can "crash" if running OOM or out of disk space. >>> >>> In these types of executor a support to handle "Pool Slots" can be >>> benefitial as small extension. >>> >>> - LocalExecutor >>> >>> - CeleryExecutor (I am not sure how easy it can be added to Celery though) >>> >>> - EdgeExecutor >>> >>> Category B) Have a very powerful resource management alongside >>> >>> In Category (B) in my view the current "executor_config" is best suited >>> as today control memory, CPU, ephimeral storage, potential GPU >>> reservations. In these executor types I *think* an additional Pool Slot >>> handling would be too much overhead. >>> >>> - KubernetesExecutor >>> >>> - Aws*Executor >>> >>> - Future: Yunicorm :-D >>> >>> - Nice: SlurmExecutor >>> >>> We had a similar discussion also in >>> https://lists.apache.org/thread/2qsgmr7czsth43kssmv6wtr90l1491lf - where >>> most responses said that a full resource management should not be the >>> scope of Airflow. I'd stay with this... But a respect of Pool Slots >>> would be benefitial for Category (A) Executors. >>> >>> Jens >>> >>> On 15.11.24 00:23, Oliveira, Niko wrote: >>>> I think passing the full TI to executors following a backwards compatible >>>> path is perfectly fine and shouldn't get much push-back. >>>> >>>> What I think we should really discuss is whether (and if yes, how) we want >>>> to introduce task resourcing to executors. Should the Edge Executor just >>>> come up with it's own one-off/bespoke solution? Should we update the >>>> base-executor interface itself to support this as a first class feature >>>> across all executors? If we do that, how should we integrate it with the >>>> existing idea of slots that executors _already_ have (and should that be >>>> connected to Airflow pools?). These are the bigger questions in my eyes! >>>> >>>> Cheers, >>>> Niko >>>> >>>> ________________________________ >>>> From: Jens Scheffler<j_scheff...@gmx.de.INVALID> >>>> Sent: Thursday, November 14, 2024 12:35:11 PM >>>> To:dev@airflow.apache.org >>>> Subject: RE: [EXT] [DISCUSSION] How to handle task_instance properties >>>> which are required in Executor >>>> >>>> CAUTION: This email originated from outside of the organization. Do not >>>> click links or open attachments unless you can confirm the sender and know >>>> the content is safe. >>>> >>>> >>>> >>>> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur externe. >>>> Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe si vous ne >>>> pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes pas >>>> certain que le contenu ne présente aucun risque. >>>> >>>> >>>> >>>> Hi, >>>> >>>> not really surprising as I was talking with Marco about this at work to >>>> further work on the EdgeWorker I am also in favor in option 2. It would >>>> be a small breaking change in the API but would be well suited in >>>> Airflow 3. But looking at the PR we could also keep the existing >>>> signature and allow existing executors to stay as they are - >>>> compatability cod eis really small. >>>> >>>> This could encourage that Executors take some Task meta data into >>>> consideration for internal scheduling, for example passing the priority >>>> down in K8s Executor for a priority queue or with the Pool_Slots also >>>> take care that a LocalExecutor does not overwhelm the resources of a node. >>>> >>>> We could interoduce the API change in 2.10.4 non_breaking (or also in >>>> 2.11 - but earlier is better) and could drop the old execute_async in >>>> 3.0... or decide to keep it. And as we are moving towards 3.0 if we want >>>> to change the API... now is the time :-D >>>> >>>> Looking forward for more opinions :-D >>>> >>>> Jens >>>> >>>> On 14.11.24 12:23, Kuettelwesch Marco (XC-DX/ETV5) wrote: >>>>> Hi all, >>>>> >>>>> I´m currently working on an PR to enable the EdgeWorker with >>>>> slot/resource handling. PR:https://github.com/apache/airflow/pull/43737. >>>>> >>>>> In the PR we decided to make a devlist discussion about how to get >>>>> additional task_instance data into the executor. This can be managed in >>>>> different ways, and this is the idea of this discussion. >>>>> >>>>> What I´m talking about: >>>>> Main idea is that the EdgeWorker supports a slot/resource handling. >>>>> E.g.: A worker has 3 slots available and executes one task which needs 2 >>>>> slots, cannot fetch a parallel task which needs 2 slots but can fetch a >>>>> task which needs 1 slot. >>>>> This allows the EdgeWorker to have a resource handling as a task which >>>>> consumes more resources can block other tasks from running on worker. The >>>>> handling follows same logic like the pools feature of Airflow. >>>>> But for this the executor needs the information about how many slots are >>>>> required to execute the task. >>>>> >>>>> My first idea was to add the number of slots which is needed by the task >>>>> into the executor_config as the execute_async function of the >>>>> BaseExecutor does only use the TaskInstanceKey to get task details. >>>>> The KubernetesExecutor uses the executor_config parameter to allow some >>>>> pod-overriding. >>>>> (See:https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/kubernetes_executor.html#pod-override) >>>>> >>>>> But it feels like a misuse of executor_config parameter to add needed >>>>> slots of a task into the EdgeExecutor. >>>>> The discussion went into the direction to change the interface of the >>>>> executor execute_async function to get more information about the >>>>> task_instance into the executor. Currently we have the following options: >>>>> >>>>> 1) Add Executor specific task metadata into the executor like >>>>> KubernetesExecutor does. >>>>> 2) Enable the executor to access the task_instance properties. >>>>> >>>>> Regarding option 2: >>>>> I prepared an example PR (https://github.com/apache/airflow/pull/44016) >>>>> which adds a new execute function into the BaseExecutor. >>>>> >>>>> What is your opinion about this? Looking forward for your feedback. >>>>> >>>>> Marco >>>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org >>>> For additional commands, e-mail:dev-h...@airflow.apache.org >>>> >>>> > >--------------------------------------------------------------------- >To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org >For additional commands, e-mail: dev-h...@airflow.apache.org >