1fanwang opened a new pull request, #68956:
URL: https://github.com/apache/airflow/pull/68956
## Why
`LivyOperator` waiting synchronously (`deferrable=False` with
`polling_interval > 0`) holds the
Spark batch on the worker. If the worker is lost mid-poll, the retry posts a
**brand-new** Livy
batch — the original Spark application keeps running and the work is
duplicated.
`ResumableJobMixin` (Airflow 3.3, AIP-103) exists to make exactly this
synchronous-wait path
crash-safe: persist the external job id before polling, and on retry
reconnect to the running job
instead of resubmitting. Livy is a clean fit — a synchronous
submit-then-poll operator, the same
shape the mixin was built for and the `SparkSubmitOperator` already uses.
## What
`LivyOperator` now subclasses `ResumableJobMixin` and routes its
synchronous-poll path through
`execute_resumable`:
- `submit_job` posts the batch and returns its id; `get_job_status` /
`is_job_active` /
`is_job_succeeded` classify Livy `BatchState`; `poll_until_complete`
reuses the existing
`poll_for_termination`; `get_job_result` pushes the `app_id` XCom.
- The batch id is persisted to `task_state_store` before polling, so a retry
reads it back and
reconnects to the running batch.
- **Deferrable** (Triggerer owns the wait) and **fire-and-forget**
(`polling_interval=0`, nothing
to reconnect to) paths are untouched. An Airflow-2 stub keeps the provider
importable on 2.x.
Crash-safety is opt-in through the mixin's `durable` flag (default on); set
`durable=False` to keep
the always-resubmit behaviour.
## Tests
A new `TestLivyOperatorResumable` suite (gated on Airflow 3.3+) covers
fresh-submit-persists-before-poll,
the three retry decisions (reconnect / return / resubmit) across real
`BatchState` values, graceful
degradation without a `task_state_store`, and `durable=False`. The existing
`LivyOperator` suite is
unchanged.
### End-to-end (live, Breeze)
A real worker crash during the synchronous wait, against an in-memory Livy
stand-in that counts
`POST /batches`. A `LivyOperator(durable=True, deferrable=False,
polling_interval=3)` submits a
batch; the worker is `SIGKILL`ed mid-poll; the scheduler retries. Attempt 2
reads the persisted
batch id back, reconnects to the still-running batch, and finishes it — with
no second submit.
<details><summary>Raw</summary>
```text
attempt 1: POST /batches -> batch id 1, worker polling (POST count = 1)
SIGKILL worker (pid 406)
attempt 2 (try_number=2):
"Reconnecting to existing job"
"Batch with id 1 terminated with state: success"
task run_batch: success
POST /batches count after both attempts: 1 <- single submit, reconnected;
no duplicate batch
```
</details>
## Risk
Only the synchronous-poll path changes; deferrable and fire-and-forget are
byte-for-byte the same.
The reconnect logic is the shared mixin core, already covered by its own
tests.
---
##### Was generative AI tooling used to co-author this PR?
- [ ] Yes (please specify the tool below)
---
* Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information. Note: commit author/co-author name and email in commits
become permanently public when merged.
* For fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
* When adding dependency, check compliance with the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]