amoghrajesh opened a new pull request, #67118:
URL: https://github.com/apache/airflow/pull/67118

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   
   ### Background
   Long-running Spark jobs (and similar remote execution patterns) have 
historically had two options in Airflow:
   
   1. Deferrable operators — the task defers, hands control to the Triggerer, 
and the worker slot is freed. In theory this handles the "worker dies" case 
because the Triggerer owns continuity. In practice, deferrable adoption has 
been low: the async programming model is a different mental model from standard 
data engineering, requires a separate Triggerer component to operate and scale, 
and the `defer()` boundary means a killed task during `deferral` (e.g. user 
clearing while deferred) still loses the job handle. Zero custom deferrable 
operators have been written by teams that do write custom regular operators, 
for these reasons.
   
   2. Custom operators with manual application ID tracking — some teams build 
their own operators that capture the YARN application ID or Spark driver ID 
immediately after submission, persist it to some storage, and on retry look it 
up before deciding whether to resubmit. It works but requires significant 
per-team investment and the storage layer is bespoke — no standard interface, 
no UI visibility, no composability with Airflow's retry policies.
   
   **This PR takes a third path: synchronous execution with checkpointing. The 
operator runs in a worker slot (same mental model as today, no Triggerer 
needed), persists the job handle to the AIP-103 `task_state` before polling 
begins, and on retry reads it back. From the data engineers perspective the 
operator looks and behaves identically to before — it just survives worker 
disruptions without losing its place.**
   
   ### What problem are we solving?
   
   This is the side benefit of AIP 103 on a test subject.
   
   When an Airflow worker running a `SparkSubmitOperator` task dies 
mid-execution, the spark driver keeps running on the spark cluster 
independently. Airflow has no recollection of the previous submission, ie: on 
retry it creates a fresh operator instance and submits a new Spark job 
irrespective of what happened with the earlier submitted spark job. The 
original driver's work is wasted, and both jobs may run concurrently consuming 
duplicate compute.
   
   This was reproduced empirically: triggered a `SparkPi` job in spark cluster 
mode, killed the Airflow worker while the driver was running, observed and 
waited for the driver complete on the spark cluster, then watched the retry 
submit a second driver with a different ID. 
   
   
   ### Current behaviour
   
   `SparkSubmitOperator.execute()` calls `self._hook.submit()`, a single 
blocking call that submits the job, extracts the driver ID, and polls — all in 
one function. The driver ID is stored on `self._hook._driver_id`, in memory on 
the worker side. When the worker dies, that memory is gone. Retry submits a new 
job with no knowledge of the previous one.
   
   
   ### Proposed change
   `ResumableJobMixin`: a new mixin for operators that submit one long-running 
job to an external system and poll for completion. It provides a single method, 
`execute_resumable(context)`, that operators call from their own `execute()`:
   
   * First run: calls `submit_job()`, persists the returned external ID to 
`task_state` before polling starts, then calls `poll_until_complete()`
   * Retry with active job: reads the ID from `task_state`, checks its status, 
skips resubmission and reconnects directly to the running job
   * Retry with already-finished job: returns the result immediately — no 
resubmission, no polling
   * Retry with failed job: falls through and resubmits fresh
   
   The mixin is generic — it knows nothing about spark and that is intention. 
Six abstract methods (`submit_job`, `get_job_status`, `is_job_active`, 
`is_job_succeeded`, `poll_until_complete`, `get_job_result`) are implemented by 
each operator for its external system.
   
   SparkSubmitOperator is wired to the mixin for Spark Standalone cluster mode, 
and this is how its going to operate:
   
   - `execute()` routes to `execute_resumable()` when 
`hook._should_track_driver_status` is True (standalone cluster mode only — the 
existing flag that YARN and K8s set to False). Adding YARN/K8s resumable 
support in a follow-up only requires updating that flag; `execute()` routing is 
stable.
   - `get_job_status()` queries the Spark REST API, replacing the existing 
`spark-submit --status` subprocess approach which is a blocking sync call that 
ran a binary
   - `external_id_key = "spark_job_id"` is intentionally generic — works for 
standalone (driver-xxx), YARN (application-xxx), and K8s (pod name)
   - `hook.submit()` now returns the driver ID instead of None, with the 
`internal _start_driver_status_tracking()` call removed — polling is now owned 
by `poll_until_complete()` in the operator.
   
   See below to understand better:
   
   
   Before:
   
   <img width="602" height="952" alt="image" 
src="https://github.com/user-attachments/assets/cf0a178c-e395-445f-b0e1-3b6befb856fe";
 />
   
   After:
   
   <img width="602" height="1032" alt="image" 
src="https://github.com/user-attachments/assets/3b69d4c9-9530-4ff2-8d02-2578aeb04e84";
 />
   
   
   
   ### User implications / backcompat
   
   - Non-cluster modes (YARN, K8s, local/client): behaviour is unchanged.
   - Standalone cluster mode: on first run, the driver ID is now persisted to 
`task_state` before polling. On retry, Airflow reconnects instead of 
resubmitting. No DAG changes required.
   - Airflow 2: the operator falls back to a no-op stub via `try/except` — 
always submits fresh, identical to todays behaviour.
   - `hook.submit()` return type changed from `None` to `str | None` — not a 
breaking change; no existing caller used the return value.
   
   ### Notes worth knowing
   
   - There is a ~very small crash window between `submit_job()` returning and 
`task_state.set()` completing. If the worker dies in this window, the retry 
submits a duplicate. 
   - The mixin is designed as a trial: after Spark (this PR) and EMR 
(follow-up) both fit the 6-method interface cleanly, the mixin ships 
permanently. If either fights back (multi-job fan-out, Flink savepoint 
semantics), the fallback is a doc-only execute() template.
   
   
   
   ### Testing
   
   TO BE ADDED
   
   ### What is next
   
   - YARN cluster mode: implement get_job_status via YARN RM REST API and 
implement `poll_until_complete` polling loop. 
   - K8s cluster mode: same pattern with K8s pod phase API.
   - Other potential providers: `ResumableJobMixin` is ready for Flink, EMR, 
Databricks, KPO
   - Progress tracking: `task_state` can store `spark_progress` (percentage 
complete) by querying the Spark master stage API. 
   
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [ ] Yes (please specify the tool below)
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to