kaxil commented on code in PR #67715:
URL: https://github.com/apache/airflow/pull/67715#discussion_r3325823484


##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
                         f"returncode = {returncode}"
                     )
 
+    def _poll_k8s_driver_via_api(self) -> None:
+        """Poll the K8s driver pod phase until it reaches a terminal state."""
+        pod_name = self._kubernetes_driver_pod
+        namespace = self._connection["namespace"]
+        app_id = self._kubernetes_application_id or pod_name
+
+        if not pod_name:
+            raise ValueError("K8s driver pod name not set; cannot poll 
status.")
+
+        client = kube_client.get_kube_client(in_cluster=False)

Review Comment:
   Hardcoding `in_cluster=False` forces `config.load_kube_config()`, which 
needs a kubeconfig at `~/.kube/config`. When Airflow workers run inside the 
cluster (KubernetesExecutor, Airflow-on-K8s) that file doesn't exist, so this 
poll loop can't authenticate. `get_kube_client()` with no arg resolves 
`in_cluster` from `[kubernetes_executor] in_cluster`. Drop the arg and let it 
resolve from config, or read it from the Spark connection extra, so both in- 
and out-of-cluster workers are covered.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -865,7 +968,7 @@ def on_kill(self) -> None:
                 try:
                     import kubernetes
 
-                    client = kube_client.get_kube_client()
+                    client = kube_client.get_kube_client(in_cluster=False)

Review Comment:
   Same issue here, and this one is a regression: `on_kill` previously called 
`get_kube_client()` (resolving `in_cluster` from config). Forcing `False` 
breaks driver-pod cleanup on kill for in-cluster deployments. Suggest reverting 
to `get_kube_client()`.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
                         f"returncode = {returncode}"
                     )
 
+    def _poll_k8s_driver_via_api(self) -> None:
+        """Poll the K8s driver pod phase until it reaches a terminal state."""
+        pod_name = self._kubernetes_driver_pod
+        namespace = self._connection["namespace"]
+        app_id = self._kubernetes_application_id or pod_name
+
+        if not pod_name:
+            raise ValueError("K8s driver pod name not set; cannot poll 
status.")
+
+        client = kube_client.get_kube_client(in_cluster=False)
+        poll_interval = max(self._status_poll_interval, 20)
+        # similar to `missed_job_status_reports` tolerance in 
`_start_driver_status_tracking`:
+        # tolerate transient `Unknown` phases (node temporarily unreachable) 
before giving up.
+        consecutive_unknown = 0
+        max_consecutive_unknown = 3
+
+        while True:
+            pod = client.read_namespaced_pod(pod_name, namespace)

Review Comment:
   `read_namespaced_pod` is called bare inside `while True`. For the 
long-running jobs this feature targets, a transient `ApiException` (network 
blip, API token refresh, a brief 404 right after submit) crashes the loop and 
fails the task while the Spark job is still healthy. 
`_start_driver_status_tracking` handles exactly this with its 
`missed_job_status_reports` / `max_missed_job_status_reports` budget. Wrap the 
read in the same retry tolerance.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
                         f"returncode = {returncode}"
                     )
 
+    def _poll_k8s_driver_via_api(self) -> None:
+        """Poll the K8s driver pod phase until it reaches a terminal state."""
+        pod_name = self._kubernetes_driver_pod
+        namespace = self._connection["namespace"]
+        app_id = self._kubernetes_application_id or pod_name
+
+        if not pod_name:
+            raise ValueError("K8s driver pod name not set; cannot poll 
status.")
+
+        client = kube_client.get_kube_client(in_cluster=False)
+        poll_interval = max(self._status_poll_interval, 20)
+        # similar to `missed_job_status_reports` tolerance in 
`_start_driver_status_tracking`:
+        # tolerate transient `Unknown` phases (node temporarily unreachable) 
before giving up.
+        consecutive_unknown = 0
+        max_consecutive_unknown = 3
+
+        while True:

Review Comment:
   This loop only exits on `Succeeded`, `Failed`, or repeated `Unknown`. A pod 
stuck in `Pending` (unschedulable, an ImagePullBackOff that never terminates) 
polls forever and relies on `execution_timeout` being set. Consider a max 
wall-clock bound or surfacing prolonged `Pending`.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
                         f"returncode = {returncode}"
                     )
 
+    def _poll_k8s_driver_via_api(self) -> None:
+        """Poll the K8s driver pod phase until it reaches a terminal state."""
+        pod_name = self._kubernetes_driver_pod
+        namespace = self._connection["namespace"]
+        app_id = self._kubernetes_application_id or pod_name
+
+        if not pod_name:
+            raise ValueError("K8s driver pod name not set; cannot poll 
status.")
+
+        client = kube_client.get_kube_client(in_cluster=False)
+        poll_interval = max(self._status_poll_interval, 20)
+        # similar to `missed_job_status_reports` tolerance in 
`_start_driver_status_tracking`:
+        # tolerate transient `Unknown` phases (node temporarily unreachable) 
before giving up.
+        consecutive_unknown = 0
+        max_consecutive_unknown = 3
+
+        while True:
+            pod = client.read_namespaced_pod(pod_name, namespace)
+            phase = pod.status.phase or "Initializing"
+            self.log.info("Application status for %s (phase: %s)", app_id, 
phase)
+            if phase == "Succeeded":
+                break
+            if phase == "Failed":
+                container_state = ""
+                if pod.status.container_statuses:
+                    cs = pod.status.container_statuses[0]
+                    if cs.state and cs.state.terminated:
+                        container_state = (
+                            f" exit_code={cs.state.terminated.exit_code} 
reason={cs.state.terminated.reason}"
+                        )
+                raise RuntimeError(f"Spark application {app_id} failed 
(phase=Failed{container_state})")

Review Comment:
   `_run_post_submit_commands()` only runs on the success path below; the 
`Failed` and `Unknown` branches raise before reaching it. The existing contract 
(asserted by the standalone test "must be called even on driver failure") is 
that post-submit cleanup runs on failure too, so Istio-style sidecars leak when 
the K8s job fails. Run the post-submit commands on all terminal exits, e.g. via 
`finally`.



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -234,6 +242,13 @@ def execute(self, context: Context) -> None:
             driver_id = self.submit_job(context)
             self.poll_until_complete(driver_id, context)
             return self.get_job_result(driver_id, context)
+        if hook._should_track_driver_via_k8s_api():
+            hook._validate_track_driver_via_k8s_api_config()

Review Comment:
   `_should_track_driver_via_k8s_api()` already requires k8s + cluster, so by 
the time `_validate...` runs its not-k8s and not-cluster checks can never fire. 
A user who sets `track_driver_via_k8s_api=True` on a YARN/standalone connection 
gets a silent no-op (falls through to plain `submit`), not the `ValueError` the 
description promises -- only the `waitAppCompletion=true` check is reachable. 
Call the validation unconditionally when the flag is set, before the 
`_should_track...` guard, so misconfiguration is rejected loudly.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -41,6 +41,9 @@
 DEFAULT_SPARK_BINARY = "spark-submit"
 ALLOWED_SPARK_BINARIES = [DEFAULT_SPARK_BINARY, "spark2-submit", 
"spark3-submit"]
 
+_K8S_WAIT_APP_COMPLETION_CONF = "spark.kubernetes.submission.waitAppCompletion"
+_K8S_DELETE_ON_TERMINATION_CONF = "spark.kubernetes.driver.deleteOnTermination"

Review Comment:
   `_K8S_DELETE_ON_TERMINATION_CONF` isn't referenced anywhere. Either wire it 
in (the pod-deletion behavior reads like it was meant to be configurable 
through this) or drop it.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
                         f"returncode = {returncode}"
                     )
 
+    def _poll_k8s_driver_via_api(self) -> None:
+        """Poll the K8s driver pod phase until it reaches a terminal state."""
+        pod_name = self._kubernetes_driver_pod
+        namespace = self._connection["namespace"]

Review Comment:
   `namespace` defaults to `None` when neither the connection extra nor 
`spark.kubernetes.namespace` is set, so `read_namespaced_pod(pod, None)` fails 
confusingly. The validation (once it's made reachable) should also require a 
namespace when the flag is on.



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -802,6 +857,54 @@ def _start_driver_status_tracking(self) -> None:
                         f"returncode = {returncode}"
                     )
 
+    def _poll_k8s_driver_via_api(self) -> None:
+        """Poll the K8s driver pod phase until it reaches a terminal state."""
+        pod_name = self._kubernetes_driver_pod
+        namespace = self._connection["namespace"]
+        app_id = self._kubernetes_application_id or pod_name
+
+        if not pod_name:
+            raise ValueError("K8s driver pod name not set; cannot poll 
status.")
+
+        client = kube_client.get_kube_client(in_cluster=False)
+        poll_interval = max(self._status_poll_interval, 20)

Review Comment:
   `max(self._status_poll_interval, 20)` silently bumps a user-set 
`status_poll_interval=5` to 20. It's documented in the docstring, but a 
one-time `log.info` when the floor is applied would avoid surprise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to