jscheffl commented on code in PR #67030:
URL: https://github.com/apache/airflow/pull/67030#discussion_r3383332322
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -824,6 +831,133 @@ def _adopt_completed_pods(self, kube_client:
client.CoreV1Api) -> None:
)
)
+ # Operators whose pods this cleanup manages. EksPodOperator and
+ # GKEStartPodOperator are intentionally excluded: their pods run in
+ # external clusters where this kube_client has no authority.
+ _KPO_OPERATORS: frozenset[str] = frozenset(
+ [
+ "KubernetesPodOperator",
+ "SparkKubernetesOperator",
+ ]
+ )
+
+ @provide_session
+ def _cleanup_zombie_kpo_pods(
+ self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
+ ) -> None:
+ """
+ Force-delete KubernetesPodOperator pods whose TaskInstance is no
longer active.
+
+ A pod is a zombie when either:
+
+ - No matching non-terminal TaskInstance exists in the DB (the TI
already
+ finished or was never recorded), or
+ - The pod's ``try_number`` label is less than the active TI's current
+ ``try_number`` (the pod is a leftover from a previous retry attempt).
+
+ Force-deletion (``grace_period_seconds=0``) is used so that sidecar
+ containers that ignore SIGTERM cannot delay pod termination.
+ """
+ from sqlalchemy import or_
+
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.providers.cncf.kubernetes.pod_generator import
make_safe_label_value
+ from airflow.utils.state import State
+
+ pod_list = self._list_pods({"label_selector":
"kubernetes_pod_operator=True"})
+ if not pod_list:
+ return
+
+ # Parse pod labels. Values were already normalized by
make_safe_label_value()
+ # at pod-creation time (operators/pod.py get_labels()), so they are
ready to
+ # compare directly against normalized DB values.
+ pod_identities = []
+ for pod in pod_list:
+ labels = pod.metadata.labels or {}
+ dag_id = labels.get("dag_id")
+ task_id = labels.get("task_id")
+ run_id = labels.get("run_id")
+ if not (dag_id and task_id and run_id):
+ continue
+ map_index = int(labels.get("map_index", -1))
+ try_number = int(labels.get("try_number", 0))
+ pod_identities.append((pod, dag_id, task_id, run_id, map_index,
try_number))
+
+ if not pod_identities:
+ return
+
+ # Single batch query: all non-terminal TIs for KPO-managed operators.
+ # We match on notin_(finished) rather than in_(unfinished) because SQL
+ # evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit
is_(None)
+ # arm ensures we also capture TIs with no state set yet.
+ terminal_state_values = [s.value for s in State.finished]
+ active_tis = session.execute(
+ select(
+ TaskInstance.dag_id,
+ TaskInstance.task_id,
+ TaskInstance.run_id,
+ TaskInstance.map_index,
+ TaskInstance.try_number,
+ ).where(
+ or_(
+ TaskInstance.state.notin_(terminal_state_values),
+ TaskInstance.state.is_(None),
+ ),
+ TaskInstance.operator.in_(self._KPO_OPERATORS),
+ )
+ ).all()
Review Comment:
This is an very expensive operation, on one side because by this select you
might spool-out thousands of tasks as well as the source table might contain
millions of rows. In my view a better selection strategy should be used. The
approach might be feasible for small installations only.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -824,6 +831,133 @@ def _adopt_completed_pods(self, kube_client:
client.CoreV1Api) -> None:
)
)
+ # Operators whose pods this cleanup manages. EksPodOperator and
+ # GKEStartPodOperator are intentionally excluded: their pods run in
+ # external clusters where this kube_client has no authority.
+ _KPO_OPERATORS: frozenset[str] = frozenset(
+ [
+ "KubernetesPodOperator",
+ "SparkKubernetesOperator",
+ ]
+ )
+
+ @provide_session
+ def _cleanup_zombie_kpo_pods(
+ self, kube_client: client.CoreV1Api, *, session: Session = NEW_SESSION
+ ) -> None:
+ """
+ Force-delete KubernetesPodOperator pods whose TaskInstance is no
longer active.
+
+ A pod is a zombie when either:
+
+ - No matching non-terminal TaskInstance exists in the DB (the TI
already
+ finished or was never recorded), or
+ - The pod's ``try_number`` label is less than the active TI's current
+ ``try_number`` (the pod is a leftover from a previous retry attempt).
+
+ Force-deletion (``grace_period_seconds=0``) is used so that sidecar
+ containers that ignore SIGTERM cannot delay pod termination.
+ """
+ from sqlalchemy import or_
+
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.providers.cncf.kubernetes.pod_generator import
make_safe_label_value
+ from airflow.utils.state import State
+
+ pod_list = self._list_pods({"label_selector":
"kubernetes_pod_operator=True"})
+ if not pod_list:
+ return
+
+ # Parse pod labels. Values were already normalized by
make_safe_label_value()
+ # at pod-creation time (operators/pod.py get_labels()), so they are
ready to
+ # compare directly against normalized DB values.
+ pod_identities = []
+ for pod in pod_list:
+ labels = pod.metadata.labels or {}
+ dag_id = labels.get("dag_id")
+ task_id = labels.get("task_id")
+ run_id = labels.get("run_id")
+ if not (dag_id and task_id and run_id):
+ continue
+ map_index = int(labels.get("map_index", -1))
+ try_number = int(labels.get("try_number", 0))
+ pod_identities.append((pod, dag_id, task_id, run_id, map_index,
try_number))
+
+ if not pod_identities:
+ return
+
+ # Single batch query: all non-terminal TIs for KPO-managed operators.
+ # We match on notin_(finished) rather than in_(unfinished) because SQL
+ # evaluates "NULL NOT IN (...)" as NULL, not TRUE — the explicit
is_(None)
+ # arm ensures we also capture TIs with no state set yet.
+ terminal_state_values = [s.value for s in State.finished]
+ active_tis = session.execute(
+ select(
+ TaskInstance.dag_id,
+ TaskInstance.task_id,
+ TaskInstance.run_id,
+ TaskInstance.map_index,
+ TaskInstance.try_number,
+ ).where(
+ or_(
+ TaskInstance.state.notin_(terminal_state_values),
+ TaskInstance.state.is_(None),
+ ),
+ TaskInstance.operator.in_(self._KPO_OPERATORS),
Review Comment:
This is somehow mis-placed. if you want to clean KPO then this is not
correct from side of KubernetesExecutor. If you use K8sExecutor you typically
do not use KPO. And if you use another executor then the code path is not
active.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]