gingeekrishna commented on code in PR #67030:
URL: https://github.com/apache/airflow/pull/67030#discussion_r3383764618
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -824,6 +831,133 @@ def _adopt_completed_pods(self, kube_client:
client.CoreV1Api) -> None:
)
)
+ # Operators whose pods this cleanup manages. EksPodOperator and
+ # GKEStartPodOperator are intentionally excluded: their pods run in
+ # external clusters where this kube_client has no authority.
+ _KPO_OPERATORS: frozenset[str] = frozenset(
+ [
+ "KubernetesPodOperator",
+ "SparkKubernetesOperator",
+ ]
+ )
+
+ @provide_session
+ def _cleanup_zombie_kpo_pods(
+ self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
+ ) -> None:
+ """
+ Force-delete KubernetesPodOperator pods whose TaskInstance is no
longer active.
+
+ A pod is a zombie when either:
+
+ - No matching non-terminal TaskInstance exists in the DB (the TI
already
+ finished or was never recorded), or
+ - The pod's ``try_number`` label is less than the active TI's current
+ ``try_number`` (the pod is a leftover from a previous retry attempt).
+
+ Force-deletion (``grace_period_seconds=0``) is used so that sidecar
+ containers that ignore SIGTERM cannot delay pod termination.
+ """
+ from sqlalchemy import or_
+
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.providers.cncf.kubernetes.pod_generator import
make_safe_label_value
+ from airflow.utils.state import State
+
+ pod_list = self._list_pods({"label_selector":
"kubernetes_pod_operator=True"})
+ if not pod_list:
+ return
+
+ # Parse pod labels. Values were already normalized by
make_safe_label_value()
+ # at pod-creation time (operators/pod.py get_labels()), so they are
ready to
+ # compare directly against normalized DB values.
+ pod_identities = []
+ for pod in pod_list:
+ labels = pod.metadata.labels or {}
+ dag_id = labels.get("dag_id")
+ task_id = labels.get("task_id")
+ run_id = labels.get("run_id")
+ if not (dag_id and task_id and run_id):
+ continue
+ map_index = int(labels.get("map_index", -1))
+ try_number = int(labels.get("try_number", 0))
+ pod_identities.append((pod, dag_id, task_id, run_id, map_index,
try_number))
+
+ if not pod_identities:
+ return
+
+ # Single batch query: all non-terminal TIs for KPO-managed operators.
+ # We match on notin_(finished) rather than in_(unfinished) because SQL
+ # evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit
is_(None)
+ # arm ensures we also capture TIs with no state set yet.
+ terminal_state_values = [s.value for s in State.finished]
+ active_tis = session.execute(
+ select(
+ TaskInstance.dag_id,
+ TaskInstance.task_id,
+ TaskInstance.run_id,
+ TaskInstance.map_index,
+ TaskInstance.try_number,
+ ).where(
+ or_(
+ TaskInstance.state.notin_(terminal_state_values),
+ TaskInstance.state.is_(None),
+ ),
+ TaskInstance.operator.in_(self._KPO_OPERATORS),
+ )
+ ).all()
Review Comment:
Good catch — fixed in the latest commit. The query is now scoped to
`TaskInstance.dag_id.in_(pod_dag_ids)`, where `pod_dag_ids` is the set of DAG
IDs extracted from the live KPO pod labels. This bounds the result set to TIs
from DAGs that actually have running KPO pods, rather than scanning all
non-terminal KPO TIs across the entire table.
A note on correctness: pod label values are normalized by
`make_safe_label_value()` at pod-creation time. For `dag_id` specifically, the
encoding is almost always a no-op (dag IDs are short alphanumeric strings), so
the filter correctly scopes the query in practice. The remaining normalization
comparison (for `task_id`, `run_id`, `map_index`) still happens in Python on
the reduced result set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]