amoghrajesh opened a new pull request, #67118:
URL: https://github.com/apache/airflow/pull/67118
<!-- SPDX-License-Identifier: Apache-2.0
https://www.apache.org/licenses/LICENSE-2.0 -->
<!--
Thank you for contributing!
Please provide above a brief description of the changes made in this pull
request.
Write a good git commit message following this guide:
http://chris.beams.io/posts/git-commit/
Please make sure that your code changes are covered with tests.
And in case of new features or big changes remember to adjust the
documentation.
Feel free to ping (in general) for the review if you do not see reaction for
a few days
(72 Hours is the minimum reaction time you can expect from volunteers) - we
sometimes miss notifications.
In case of an existing issue, reference it using one of the following:
* closes: #ISSUE
* related: #ISSUE
-->
### Background
Long-running Spark jobs (and similar remote execution patterns) have
historically had two options in Airflow:
1. Deferrable operators — the task defers, hands control to the Triggerer,
and the worker slot is freed. In theory this handles the "worker dies" case
because the Triggerer owns continuity. In practice, deferrable adoption has
been low: the async programming model is a different mental model from standard
data engineering, requires a separate Triggerer component to operate and scale,
and the `defer()` boundary means a killed task during `deferral` (e.g. user
clearing while deferred) still loses the job handle. Zero custom deferrable
operators have been written by teams that do write custom regular operators,
for these reasons.
2. Custom operators with manual application ID tracking — some teams build
their own operators that capture the YARN application ID or Spark driver ID
immediately after submission, persist it to some storage, and on retry look it
up before deciding whether to resubmit. It works but requires significant
per-team investment and the storage layer is bespoke — no standard interface,
no UI visibility, no composability with Airflow's retry policies.
**This PR takes a third path: synchronous execution with checkpointing. The
operator runs in a worker slot (same mental model as today, no Triggerer
needed), persists the job handle to the AIP-103 `task_state` before polling
begins, and on retry reads it back. From the data engineers perspective the
operator looks and behaves identically to before — it just survives worker
disruptions without losing its place.**
### What problem are we solving?
This is the side benefit of AIP 103 on a test subject.
When an Airflow worker running a `SparkSubmitOperator` task dies
mid-execution, the spark driver keeps running on the spark cluster
independently. Airflow has no recollection of the previous submission, ie: on
retry it creates a fresh operator instance and submits a new Spark job
irrespective of what happened with the earlier submitted spark job. The
original driver's work is wasted, and both jobs may run concurrently consuming
duplicate compute.
This was reproduced empirically: triggered a `SparkPi` job in spark cluster
mode, killed the Airflow worker while the driver was running, observed and
waited for the driver complete on the spark cluster, then watched the retry
submit a second driver with a different ID.
### Current behaviour
`SparkSubmitOperator.execute()` calls `self._hook.submit()`, a single
blocking call that submits the job, extracts the driver ID, and polls — all in
one function. The driver ID is stored on `self._hook._driver_id`, in memory on
the worker side. When the worker dies, that memory is gone. Retry submits a new
job with no knowledge of the previous one.
### Proposed change
`ResumableJobMixin`: a new mixin for operators that submit one long-running
job to an external system and poll for completion. It provides a single method,
`execute_resumable(context)`, that operators call from their own `execute()`:
* First run: calls `submit_job()`, persists the returned external ID to
`task_state` before polling starts, then calls `poll_until_complete()`
* Retry with active job: reads the ID from `task_state`, checks its status,
skips resubmission and reconnects directly to the running job
* Retry with already-finished job: returns the result immediately — no
resubmission, no polling
* Retry with failed job: falls through and resubmits fresh
The mixin is generic — it knows nothing about spark and that is intention.
Six abstract methods (`submit_job`, `get_job_status`, `is_job_active`,
`is_job_succeeded`, `poll_until_complete`, `get_job_result`) are implemented by
each operator for its external system.
SparkSubmitOperator is wired to the mixin for Spark Standalone cluster mode,
and this is how its going to operate:
- `execute()` routes to `execute_resumable()` when
`hook._should_track_driver_status` is True (standalone cluster mode only — the
existing flag that YARN and K8s set to False). Adding YARN/K8s resumable
support in a follow-up only requires updating that flag; `execute()` routing is
stable.
- `get_job_status()` queries the Spark REST API, replacing the existing
`spark-submit --status` subprocess approach which is a blocking sync call that
ran a binary
- `external_id_key = "spark_job_id"` is intentionally generic — works for
standalone (driver-xxx), YARN (application-xxx), and K8s (pod name)
- `hook.submit()` now returns the driver ID instead of None, with the
`internal _start_driver_status_tracking()` call removed — polling is now owned
by `poll_until_complete()` in the operator.
See below to understand better:
Before:
<img width="602" height="952" alt="image"
src="https://github.com/user-attachments/assets/cf0a178c-e395-445f-b0e1-3b6befb856fe"
/>
After:
<img width="602" height="1032" alt="image"
src="https://github.com/user-attachments/assets/3b69d4c9-9530-4ff2-8d02-2578aeb04e84"
/>
### User implications / backcompat
- Non-cluster modes (YARN, K8s, local/client): behaviour is unchanged.
- Standalone cluster mode: on first run, the driver ID is now persisted to
`task_state` before polling. On retry, Airflow reconnects instead of
resubmitting. No DAG changes required.
- Airflow 2: the operator falls back to a no-op stub via `try/except` —
always submits fresh, identical to todays behaviour.
- `hook.submit()` return type changed from `None` to `str | None` — not a
breaking change; no existing caller used the return value.
### Notes worth knowing
- There is a ~very small crash window between `submit_job()` returning and
`task_state.set()` completing. If the worker dies in this window, the retry
submits a duplicate.
- The mixin is designed as a trial: after Spark (this PR) and EMR
(follow-up) both fit the 6-method interface cleanly, the mixin ships
permanently. If either fights back (multi-job fan-out, Flink savepoint
semantics), the fallback is a doc-only execute() template.
### Testing
TO BE ADDED
### What is next
- YARN cluster mode: implement get_job_status via YARN RM REST API and
implement `poll_until_complete` polling loop.
- K8s cluster mode: same pattern with K8s pod phase API.
- Other potential providers: `ResumableJobMixin` is ready for Flink, EMR,
Databricks, KPO
- Progress tracking: `task_state` can store `spark_progress` (percentage
complete) by querying the Spark master stage API.
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [ ] Yes (please specify the tool below)
<!--
Generated-by: [Tool Name] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
-->
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
* For significant user-facing changes create newsfragment:
`{pr_number}.significant.rst`, in
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
You can add this file in a follow-up commit after the PR is created so you
know the PR number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]