This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 5cb582b51f fix: Missing 'slots_occupied' in `CeleryKubernetesExecutor` and `LocalKubernetesExecutor` (#41602) 5cb582b51f is described below commit 5cb582b51fb17a7724d22ed2eeca80b1a50610f3 Author: Niko Oliveira <oniko...@amazon.com> AuthorDate: Mon Aug 19 23:11:58 2024 -0700 fix: Missing 'slots_occupied' in `CeleryKubernetesExecutor` and `LocalKubernetesExecutor` (#41602) Each time the base executor changes, often those need to be copied into the old (soon to be deprecated) hybrid executors. This was missed for the new slots_occupied property. --- airflow/providers/celery/executors/celery_kubernetes_executor.py | 5 +++++ .../providers/cncf/kubernetes/executors/local_kubernetes_executor.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/airflow/providers/celery/executors/celery_kubernetes_executor.py b/airflow/providers/celery/executors/celery_kubernetes_executor.py index e981e75fa1..bc2ed7904f 100644 --- a/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -116,6 +116,11 @@ class CeleryKubernetesExecutor(LoggingMixin): """Number of new tasks this executor instance can accept.""" return self.celery_executor.slots_available + @property + def slots_occupied(self): + """Number of tasks this executor instance is currently managing.""" + return len(self.running) + len(self.queued_tasks) + def queue_command( self, task_instance: TaskInstance, diff --git a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 8c948b0d64..75de1101c5 100644 --- a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -103,6 +103,11 @@ class LocalKubernetesExecutor(LoggingMixin): """Number of new tasks this executor instance can accept.""" return self.local_executor.slots_available + @property + def slots_occupied(self): + """Number of tasks this executor instance is currently managing.""" + return len(self.running) + len(self.queued_tasks) + def queue_command( self, task_instance: TaskInstance,