1fanwang opened a new pull request, #68956:
URL: https://github.com/apache/airflow/pull/68956

   ## Why
   
   `LivyOperator` waiting synchronously (`deferrable=False` with 
`polling_interval > 0`) holds the
   Spark batch on the worker. If the worker is lost mid-poll, the retry posts a 
**brand-new** Livy
   batch — the original Spark application keeps running and the work is 
duplicated.
   
   `ResumableJobMixin` (Airflow 3.3, AIP-103) exists to make exactly this 
synchronous-wait path
   crash-safe: persist the external job id before polling, and on retry 
reconnect to the running job
   instead of resubmitting. Livy is a clean fit — a synchronous 
submit-then-poll operator, the same
   shape the mixin was built for and the `SparkSubmitOperator` already uses.
   
   ## What
   
   `LivyOperator` now subclasses `ResumableJobMixin` and routes its 
synchronous-poll path through
   `execute_resumable`:
   
   - `submit_job` posts the batch and returns its id; `get_job_status` / 
`is_job_active` /
     `is_job_succeeded` classify Livy `BatchState`; `poll_until_complete` 
reuses the existing
     `poll_for_termination`; `get_job_result` pushes the `app_id` XCom.
   - The batch id is persisted to `task_state_store` before polling, so a retry 
reads it back and
     reconnects to the running batch.
   - **Deferrable** (Triggerer owns the wait) and **fire-and-forget** 
(`polling_interval=0`, nothing
     to reconnect to) paths are untouched. An Airflow-2 stub keeps the provider 
importable on 2.x.
   
   Crash-safety is opt-in through the mixin's `durable` flag (default on); set 
`durable=False` to keep
   the always-resubmit behaviour.
   
   ## Tests
   
   A new `TestLivyOperatorResumable` suite (gated on Airflow 3.3+) covers 
fresh-submit-persists-before-poll,
   the three retry decisions (reconnect / return / resubmit) across real 
`BatchState` values, graceful
   degradation without a `task_state_store`, and `durable=False`. The existing 
`LivyOperator` suite is
   unchanged.
   
   ### End-to-end (live, Breeze)
   
   A real worker crash during the synchronous wait, against an in-memory Livy 
stand-in that counts
   `POST /batches`. A `LivyOperator(durable=True, deferrable=False, 
polling_interval=3)` submits a
   batch; the worker is `SIGKILL`ed mid-poll; the scheduler retries. Attempt 2 
reads the persisted
   batch id back, reconnects to the still-running batch, and finishes it — with 
no second submit.
   
   <details><summary>Raw</summary>
   
   ```text
   attempt 1: POST /batches -> batch id 1, worker polling      (POST count = 1)
   SIGKILL worker (pid 406)
   attempt 2 (try_number=2):
      "Reconnecting to existing job"
      "Batch with id 1 terminated with state: success"
   task run_batch: success
   POST /batches count after both attempts: 1   <- single submit, reconnected; 
no duplicate batch
   ```
   
   </details>
   
   ## Risk
   
   Only the synchronous-poll path changes; deferrable and fire-and-forget are 
byte-for-byte the same.
   The reconnect logic is the shared mixin core, already covered by its own 
tests.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [ ] Yes (please specify the tool below)
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to