mshober commented on issue #35490:
URL: https://github.com/apache/airflow/issues/35490#issuecomment-1806100741

   Thanks for starting this @o-nikolas! 
   
   It is important to note that you can use `propagateTags=TASK_DEFINITION` as 
a way of setting default tags to all the tasks that Airflow launches. I haven't 
implemented tagging for my own environment yet, but that would be my approach 
to global tagging. That would render settings tags via `run_task_kwargs` 
obsolete and then tags set with `executor_config` could always be the complete 
set of tags that are passed to the RunTask API.
   
   I'm also personally not a huge fan of the `RUN_TASK_KWARGS` config option.  
We already have explicit config options for several attributes (cluster, 
networkConfiguration, taskDefinition, platformVersion, launchType). I'd rather 
have more of the same. If we dictate what attributes can be set on a config 
level then it makes handling the `executor_config` much simpler. For example, I 
don't see the need of having `overrides` supported as a global config option 
when all of those properties can be set in the task definition.
   
   > As well as from @mshober who has an implementation of this, so I'd be 
interested to hear what approach they took and how it's working out for them:
   
   My implementation is very specific and abstracted to my use case:
   ```python
   def _run_task_kwargs(self, task_id: TaskInstanceKeyType, cmd: CommandType, 
queue: str, exec_config: ExecutorConfigType) -> dict:
       capacity_provider = exec_config.get("capacity_provider")
       memory_reservation_mib: int | None = 
exec_config.get("memory_reservation_mib")
       memory_limit_mib: int | None = exec_config.get("memory_limit_mib")
       cpu_reservation: int | None = exec_config.get("cpu_reservation")
   
       run_task_api = deepcopy(self.run_task_kwargs)
       container_override = self.get_container(run_task_api["overrides"])
       container_override["command"] = cmd
       if cpu_reservation is not None:
           container_override["cpu"] = int(cpu_reservation)
       if memory_reservation_mib is not None:
           container_override["memoryReservation"] = int(memory_reservation_mib)
       if memory_limit_mib is not None:
           container_override["memory"] = int(memory_limit_mib)
       if capacity_provider:
           run_task_api["capacityProviderStrategy"] = [{"capacityProvider": 
capacity_provider}]
       return run_task_api
   ```
   so my code is likely not too helpful. I'll spend some time thinking about 
how it can be improved to fit all use cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to