gingeekrishna commented on code in PR #67030:
URL: https://github.com/apache/airflow/pull/67030#discussion_r3383765840


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -824,6 +831,133 @@ def _adopt_completed_pods(self, kube_client: 
client.CoreV1Api) -> None:
                 )
             )
 
+    # Operators whose pods this cleanup manages.  EksPodOperator and
+    # GKEStartPodOperator are intentionally excluded: their pods run in
+    # external clusters where this kube_client has no authority.
+    _KPO_OPERATORS: frozenset[str] = frozenset(
+        [
+            "KubernetesPodOperator",
+            "SparkKubernetesOperator",
+        ]
+    )
+
+    @provide_session
+    def _cleanup_zombie_kpo_pods(
+        self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
+    ) -> None:
+        """
+        Force-delete KubernetesPodOperator pods whose TaskInstance is no 
longer active.
+
+        A pod is a zombie when either:
+
+        - No matching non-terminal TaskInstance exists in the DB (the TI 
already
+          finished or was never recorded), or
+        - The pod's ``try_number`` label is less than the active TI's current
+          ``try_number`` (the pod is a leftover from a previous retry attempt).
+
+        Force-deletion (``grace_period_seconds=0``) is used so that sidecar
+        containers that ignore SIGTERM cannot delay pod termination.
+        """
+        from sqlalchemy import or_
+
+        from airflow.models.taskinstance import TaskInstance
+        from airflow.providers.cncf.kubernetes.pod_generator import 
make_safe_label_value
+        from airflow.utils.state import State
+
+        pod_list = self._list_pods({"label_selector": 
"kubernetes_pod_operator=True"})
+        if not pod_list:
+            return
+
+        # Parse pod labels.  Values were already normalized by 
make_safe_label_value()
+        # at pod-creation time (operators/pod.py get_labels()), so they are 
ready to
+        # compare directly against normalized DB values.
+        pod_identities = []
+        for pod in pod_list:
+            labels = pod.metadata.labels or {}
+            dag_id = labels.get("dag_id")
+            task_id = labels.get("task_id")
+            run_id = labels.get("run_id")
+            if not (dag_id and task_id and run_id):
+                continue
+            map_index = int(labels.get("map_index", -1))
+            try_number = int(labels.get("try_number", 0))
+            pod_identities.append((pod, dag_id, task_id, run_id, map_index, 
try_number))
+
+        if not pod_identities:
+            return
+
+        # Single batch query: all non-terminal TIs for KPO-managed operators.
+        # We match on notin_(finished) rather than in_(unfinished) because SQL
+        # evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit 
is_(None)
+        # arm ensures we also capture TIs with no state set yet.
+        terminal_state_values = [s.value for s in State.finished]
+        active_tis = session.execute(
+            select(
+                TaskInstance.dag_id,
+                TaskInstance.task_id,
+                TaskInstance.run_id,
+                TaskInstance.map_index,
+                TaskInstance.try_number,
+            ).where(
+                or_(
+                    TaskInstance.state.notin_(terminal_state_values),
+                    TaskInstance.state.is_(None),
+                ),
+                TaskInstance.operator.in_(self._KPO_OPERATORS),

Review Comment:
   This is a fair design question. The placement rationale is:
   
   1. **KPO and KubernetesExecutor do coexist**: KubernetesExecutor runs each 
Airflow task in its own worker pod. One of those tasks can be a 
`KubernetesPodOperator` task, which then spawns a *second* pod for its actual 
work. When that KPO task finishes, Airflow signals the KPO-managed pod to 
terminate, but sidecar containers ignoring `SIGTERM` (xcom sidecar, log agents) 
can keep it running indefinitely. This is the scenario from issue #58968.
   
   2. **`kube_client` is already available here**: `KubernetesExecutor`'s 
`sync()` loop already holds an authoritative `kube_client` for the cluster 
where both executor worker pods *and* KPO-managed pods run. It's the natural 
home for any periodic cleanup that needs Kubernetes API access.
   
   3. **The limitation you identified is real**: Users running KPO with 
`CeleryExecutor` or `LocalExecutor` won't benefit from this cleanup since this 
code path is inactive for them. That's a known limitation. An alternative would 
be to put the cleanup in KPO's `execute()` itself, but a task that has already 
reached a terminal state from Airflow's perspective cannot reliably re-enter 
cleanup logic.
   
   Happy to move or restructure it if there's a preferred pattern in the 
codebase — or if another maintainer with K8s executor experience has a stronger 
opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to