This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cb582b51f fix: Missing 'slots_occupied' in `CeleryKubernetesExecutor` 
and `LocalKubernetesExecutor` (#41602)
5cb582b51f is described below

commit 5cb582b51fb17a7724d22ed2eeca80b1a50610f3
Author: Niko Oliveira <oniko...@amazon.com>
AuthorDate: Mon Aug 19 23:11:58 2024 -0700

    fix: Missing 'slots_occupied' in `CeleryKubernetesExecutor` and 
`LocalKubernetesExecutor` (#41602)
    
    Each time the base executor changes, often those need to be copied into
    the old (soon to be deprecated) hybrid executors. This was missed for
    the new slots_occupied property.
---
 airflow/providers/celery/executors/celery_kubernetes_executor.py     | 5 +++++
 .../providers/cncf/kubernetes/executors/local_kubernetes_executor.py | 5 +++++
 2 files changed, 10 insertions(+)

diff --git a/airflow/providers/celery/executors/celery_kubernetes_executor.py 
b/airflow/providers/celery/executors/celery_kubernetes_executor.py
index e981e75fa1..bc2ed7904f 100644
--- a/airflow/providers/celery/executors/celery_kubernetes_executor.py
+++ b/airflow/providers/celery/executors/celery_kubernetes_executor.py
@@ -116,6 +116,11 @@ class CeleryKubernetesExecutor(LoggingMixin):
         """Number of new tasks this executor instance can accept."""
         return self.celery_executor.slots_available
 
+    @property
+    def slots_occupied(self):
+        """Number of tasks this executor instance is currently managing."""
+        return len(self.running) + len(self.queued_tasks)
+
     def queue_command(
         self,
         task_instance: TaskInstance,
diff --git 
a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
index 8c948b0d64..75de1101c5 100644
--- a/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py
@@ -103,6 +103,11 @@ class LocalKubernetesExecutor(LoggingMixin):
         """Number of new tasks this executor instance can accept."""
         return self.local_executor.slots_available
 
+    @property
+    def slots_occupied(self):
+        """Number of tasks this executor instance is currently managing."""
+        return len(self.running) + len(self.queued_tasks)
+
     def queue_command(
         self,
         task_instance: TaskInstance,

Reply via email to