amoghrajesh opened a new pull request, #68067:
URL: https://github.com/apache/airflow/pull/68067

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes: claude sonnet
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ### What is being solved?
   
   This is part of the resumability story for spark with various modes like: 
#67118 (for standalone spark) and #67473 (in flight for yarn). Right now, in 
this mode if the Airflow worker dies while the Spark job is running, Airflow 
loses track of the driver pod entirely and the retry submits a fresh job, 
wasting work already done or causing conflicts when the Spark job is not 
idempotent.
   
   ### Current behaviour
   
   With `track_driver_via_k8s_api=True` (added in #67715), the `spark-submit` 
JVM is released after pod creation and the operator polls via the Python K8s 
client. However, the driver pod name is never persisted, so a worker crash 
still causes the retry to submit a fresh job. That PR was just a building block 
to attempt resumability.
   
   ### Proposed change
   
   Wires the `SparkSubmitOperator` K8s path into `ResumableJobMixin` to benefit 
from resumability.
   
   Flow:
   
   1. `execute()` detects `_should_track_driver_via_k8s_api()` and routes to 
`execute_resumable` (when `reconnect_on_retry=True`, the default) or a plain 
submit-and-poll (when `reconnect_on_retry=False`).
   2. `submit_job()` injects 
`spark.kubernetes.submission.waitAppCompletion=false`, calls `hook.submit()`, 
captures the driver pod name from the submit log, encodes it as 
`"{namespace}:{pod_name}"`, and returns it. The mixin writes this to 
`task_store` before polling begins, this acts as the crash recovery anchor.
   3. `get_job_status()` checks a `k8s_driver_status` cache key in `task_store` 
first (to handle the pod garbage collected after success case without a live 
K8s API call), then queries the live pod phase via 
`kube_client.read_namespaced_pod`.
   4. `is_job_active()` / `is_job_succeeded()` map raw pod phases to the mixin 
semantics.
   5. `poll_until_complete()` sets `hook._kubernetes_driver_pod` from the 
external ID, delegates to the existing `_poll_k8s_driver_via_api()` loop (which 
handles transient API errors, consecutive unknown phases, and pod cleanup on 
success), then writes `"Succeeded"` to `task_store` under `k8s_driver_status` 
before the pod is deleted.
   
   Diagram attached for reference: 
   <img width="2817" height="8192" alt="K8s Job Submission 
and-2026-06-05-084419" 
src="https://github.com/user-attachments/assets/58ba3eca-9513-4e6e-85ec-c019542b23c8";
 />
   
   
   On retry, the mixin reads the saved pod ID, calls `get_job_status`, and 
either reconnects to the running pod or resubmits fresh based on the pod phase:
   
   | Pod phase on retry | Active? | Succeeded? | Mixin action |
   |---|---|---|---|
   | `Running` | Yes | — | Reconnect and continue polling |
   | `Pending` | Yes | — | Reconnect and continue polling |
   | `Succeeded` | No | Yes | Return result, skip resubmit |
   | `Failed` | No | No | Resubmit fresh |
   | `Unknown` | No | No | Resubmit fresh |
   | `NotFound` (pod GC'd) | No | No | Resubmit fresh |
   
   **Why only `"Succeeded"` is cached**
   
   The operator does not delete failed pods — they remain queryable. So 
`Failed` never needs caching. If a failed pod is also GC'd before the retry, 
`NotFound` → resubmit is the correct behaviour anyway. `"Succeeded"` is cached 
specifically because the operator deletes the driver pod on success; without 
the cache, a succeeded-then-GC'd pod would be indistinguishable from a 
failed-then-GC'd one and would trigger a spurious resubmit.
   
   **Why `get_job_status` checks `task_store` before the K8s API**
   
   On a retry, `execute_resumable` calls `get_job_status` to decide whether to 
reconnect or
   resubmit. For K8s, this means querying the live pod phase  but pods are 
ephemeral. The K8s
   API has no record of completed pods: once a pod is deleted (by the operator 
on
   success, by the K8s TTL controller, or by cluster admins), that phase 
information is gone
   permanently. There is no equivalent of YARN's application history server.
   
   This means a live K8s API query for a completed pod always returns `404 
NotFound`, which the
   mixin would otherwise treat as a terminal failure and resubmit. That would 
be wrong if the job
   already succeeded.
   
   The `k8s_driver_status` key written at the end of `poll_until_complete` is 
what bridges the
   gap. By checking `task_store` first and only falling through to the live API 
when no cached
   status is present, `get_job_status` correctly reports the job's terminal 
outcome regardless of
   whether the pod still exists.
   
   **Things of note**
   
   - `spark.kubernetes.driver.deleteOnTermination=false` is **not** injected in 
this PR. K8s will GC the driver pod after it exits normally. The 
`k8s_driver_status` cache covers the succeeded+GC'd case. For crash recovery, 
there is a small window: if the worker crashes after `_poll_k8s_driver_via_api` 
completes but before `task_store.set(k8s_driver_status, "Succeeded")` is 
written, the next retry will see `NotFound` and resubmit fresh rather than 
recognising the job already succeeded. This is a known limitation shared with 
the standalone crash recovery path.
   - If the driver pod name is not captured from the submit log (e.g. submit 
output is suppressed), `submit_job` returns `None` and the mixin falls back to 
a fresh submit on retry. A warning is logged.
   - If your driver namespace has sidecar injection enabled (e.g. Istio), the 
pod phase may not advance to `Succeeded` until all sidecars exit. Set 
`execution_timeout` as a hard bound, this is being tracked in 
https://github.com/apache/airflow/issues/67934
   
   
   ### Testing
   
   - Running Airflow on breeze against a `kind` cluster
   - Breeze is configured to talk to the kind cluster
   
   - Airflow connection defined:
   ```shell
   airflow connections add spark_default \
       --conn-type spark \
       --conn-host "k8s://${K8S_SERVER}" \
       --conn-extra '{"deploy-mode": "cluster", "namespace": "spark"}'
   ```
   
   - Airflow workers have access to the k8s cluster
   
   DAG:
   ```python
   from airflow.sdk import DAG
   from airflow.providers.apache.spark.operators.spark_submit import 
SparkSubmitOperator
   
   with DAG(
       dag_id="spark_k8s_crash_recovery_repro",
       start_date=datetime.datetime(2025, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
       SparkSubmitOperator(
           task_id="submit_long_running_job",
           conn_id="spark_default",
           
application="local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar",
           java_class="org.apache.spark.examples.SparkPi",
           application_args=["100000"],
           conf={
               "spark.kubernetes.container.image": "apache/spark:3.5.3",
               "spark.kubernetes.authenticate.driver.serviceAccountName": 
"spark",
               "spark.driver.extraJavaOptions": "-Djavax.net.ssl.trustAll=true",
               "spark.executor.extraJavaOptions": 
"-Djavax.net.ssl.trustAll=true",
           },
           retries=1,
           retry_delay=datetime.timedelta(seconds=5),
           track_driver_via_k8s_api=True,
       )
   ```
   
   
   #### Test 1: Success State
   
   <img width="2498" height="1150" alt="image" 
src="https://github.com/user-attachments/assets/3ae8d7a0-334a-45c5-8ee9-c9c6ef5a2fe4";
 />
   
   
   Logs before these changes: 
   
[successlogs.txt](https://github.com/user-attachments/files/28631471/successlogs.txt)
   
   
   Logs after changes: 
   
[success_state_logs.txt](https://github.com/user-attachments/files/28631492/success_state_logs.txt)
   
   
   Pod deleted:
   <img width="2551" height="732" alt="image" 
src="https://github.com/user-attachments/assets/2f0721bd-84f6-4509-93e7-65f54a994069";
 />
   
   
   
   #### Test 2: Crash Recovery (kill worker mid run)
   
   ##### Test 2a: Kill worker first and wait for driver pod to complete 
(success state, no reconnect but avoids duplicate submission)
   
   Pod is up
   
   <img width="2551" height="732" alt="image" 
src="https://github.com/user-attachments/assets/a6395862-609f-46d6-b870-76eac18badd3";
 />
   
   Worker down:
   <img width="798" height="683" alt="image" 
src="https://github.com/user-attachments/assets/76cb33f1-c674-4412-bf62-d88c125184ba";
 />
   
   
   <img width="2498" height="903" alt="image" 
src="https://github.com/user-attachments/assets/409b1823-6b6d-4f66-8627-835f9be167a0";
 />
   
   
   Spark Driver completed
   <img width="2555" height="903" alt="image" 
src="https://github.com/user-attachments/assets/fc408823-9df3-41ee-92c3-207b5dca06e3";
 />
   
   Fired the worker back up and this is what we get
   
   <img width="2555" height="903" alt="image" 
src="https://github.com/user-attachments/assets/8c2866aa-0657-4f2d-ae78-6d634ae867b1";
 />
   
   
   ##### Test 2a: Kill worker first and resume worker mid way (reconnect 
abilities)
   
   Same steps as above
   
   But resume worker when spark driver is still running
   
   <img width="2555" height="903" alt="image" 
src="https://github.com/user-attachments/assets/877205ab-c48f-4ae5-85f3-5936101787d6";
 />
   
   <img width="2555" height="903" alt="image" 
src="https://github.com/user-attachments/assets/7edd459f-b23e-4cfc-a0e1-4af8f777bfab";
 />
   
   
   Poll continues till completion:
   
   <img width="2555" height="903" alt="image" 
src="https://github.com/user-attachments/assets/693dfce7-499e-48b0-9b8d-484a504c1447";
 />
   
   
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to