amoghrajesh opened a new pull request, #67473:
URL: https://github.com/apache/airflow/pull/67473

    <!-- SPDX-License-Identifier: Apache-2.0
         https://www.apache.org/licenses/LICENSE-2.0 -->
   
   <!--
   Thank you for contributing!
   
   Please provide above a brief description of the changes made in this pull 
request.
   Write a good git commit message following this guide: 
http://chris.beams.io/posts/git-commit/
   
   Please make sure that your code changes are covered with tests.
   And in case of new features or big changes remember to adjust the 
documentation.
   
   Feel free to ping (in general) for the review if you do not see reaction for 
a few days
   (72 Hours is the minimum reaction time you can expect from volunteers) - we 
sometimes miss notifications.
   
   In case of an existing issue, reference it using one of the following:
   
   * closes: #ISSUE
   * related: #ISSUE
   -->
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes: Claude Sonnet 4.6
   
   <!--
   Generated-by: [Tool Name] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   -->
   
   Built on top of a few PRs: https://github.com/apache/airflow/pull/67118 and 
even https://github.com/apache/airflow/pull/65991
   
   
   ### What problem are we solving?
   `SparkSubmitOperator` in YARN cluster mode submits the Spark driver as a 
YARN AM — it runs independently on the cluster. If the Airflow worker dies 
mid-poll, the YARN app keeps running but Airflow loses track of it. The only 
recovery today is to submit a brand new application, wasting already-done 
compute or causing conflicts if the job is not idempotent.
   
   ### Proposed change
   Extends `ResumableJobMixin` crash recovery (introduced in #67118 for Spark 
standalone cluster mode) to YARN cluster mode.
   
   **On first run:**
   
   - Injects `spark.yarn.submit.waitAppCompletion=false` so spark-submit exits 
after YARN accepts the app
   - Parses the YARN application ID from spark-submit output and persists it to 
`task_state`
   - Polls the YARN ResourceManager REST API (GET 
`/ws/v1/cluster/apps/{appId}`) until terminal, built on top of 
https://github.com/apache/airflow/pull/65991
   
   
   **On retry (worker crash):**
   
   - Reads the application ID from `task_state`
   - Queries YARN resourcemanager for current status, reconnects if still 
running, skips polling if already succeeded, resubmits only if failed/killed
   
   
   **Status mapping:**
   
   The hook queries `GET /ws/v1/cluster/apps/{appId}` and synthesizes YARN 
API's two-field response into a single string for the mixin interface:
   
   | YARN `state` | `finalStatus` | Synthesized status | Mixin behaviour |
   |---|---|---|---|
   | `NEW` / `NEW_SAVING` / `SUBMITTED` / `ACCEPTED` / `RUNNING` | `UNDEFINED` 
| state as-is | Poll again |
   | `FINISHED` | `SUCCEEDED` | `SUCCEEDED` | Return result |
   | `FINISHED` | `FAILED` / `KILLED` / `UNDEFINED` | `FAILED` | Resubmit |
   | `FAILED` | `FAILED` / `KILLED` | `FAILED` | Resubmit |
   | `KILLED` | `KILLED` | `FAILED` | Resubmit |
   
   `state` determines whether the app is still running; `finalStatus` resolves 
the outcome when `state=FINISHED`. Using `finalStatus` alone would be 
unreliable because YARN can report `UNDEFINED` for a dead app if the RM 
recovered from a crash and lost the final status.
   
   **Observability:** Logs status transitions such as ACCEPTED → RUNNING and a 
heartbeat every 10 polls so users can see the job is alive without log spam.
   
   **on_kill**: Since spark-submit has already exited 
(`waitAppCompletion=false`), the hook's CLI kill has nothing to terminate. YARN 
cluster mode `on_kill` uses the Resource Manager REST API instead (`PUT 
/ws/v1/cluster/apps/{appId}/state`).
   
   
   ### What changes from the standalone PR (#67118)
   The standalone PR covered `spark://` masters only. This PR adds the parallel 
YARN path. The two paths are kept as separate if branches in `execute()` for 
readability.
   
   ### User impact and backcompat
   
   #### New behaviour (opt-in by existing default):
   
   - Any `SparkSubmitOperator` with `--master yarn --deploy-mode cluster` now 
gets crash recovery automatically. `reconnect_on_retry=True` is the default
   - Requires one new connection `extra: yarn_resourcemanager_webapp_address` 
(e.g. `http://rm.example.com:8088`). Without it, the resumable path raises 
`ValueError` at submit time with a clear message.
   - Trade-off to be aware of: Full spark-submit log streaming 
(infrastructure-level YARN orchestrator logs) is replaced by RM REST API 
polling. Actual driver/executor logs are unaffected — they remain in YARN log 
aggregation (`yarn logs -applicationId ...`). Most users will not notice; users 
relying on orchestrator log streaming should set `reconnect_on_retry=False`
   
   #### Unaffected paths:
   
   - YARN client mode — unaffected, still uses the blocking hook.submit() path
   - Kubernetes — unaffected, still raises `NotImplementedError` (sister PR)
   - `reconnect_on_retry=False` — skips crash recovery, submits and polls 
without task_state persistence
   
   
   ### Testing Details
   
   #### Setup
   
   Using this docker compose setup to spin up a 4 node / container Hadoop 3.2.1 
cluster: NN, DN, RM, NM.
   
   ```shell
   services:
     namenode:
       image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8
       container_name: yarn-namenode
       ports:
         - "9870:9870"
         - "9000:9000"
       volumes:
         - namenode_data:/hadoop/dfs/name
       environment:
         - CLUSTER_NAME=yarn-test
       env_file:
         - hadoop.env
       networks:
         - yarn-net
   
     datanode:
       image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8
       container_name: yarn-datanode
       volumes:
         - datanode_data:/hadoop/dfs/data
       env_file:
         - hadoop.env
       depends_on:
         - namenode
       networks:
         - yarn-net
   
     resourcemanager:
       image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8
       container_name: yarn-resourcemanager
       ports:
         - "8088:8088"
         - "8030:8030"
         - "8031:8031"
         - "8032:8032"
       env_file:
         - hadoop.env
       depends_on:
         - namenode
         - datanode
       networks:
         - yarn-net
   
     nodemanager:
       image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8
       container_name: yarn-nodemanager
       env_file:
         - hadoop.env
       depends_on:
         - resourcemanager
       networks:
         - yarn-net
   
   volumes:
     namenode_data:
     datanode_data:
   
   networks:
     yarn-net:
       driver: bridge
   
   ```
   
   Prepared a script to setup few things / bootstrap them for running spark on 
hadoop -- setup Java, stages Spark JARs to HDFS, write `core-site.xml` and 
`yarn-site.xml` config, register `spark_yarn` airflow connection with the right 
`yarn_resourcemanager_webapp_address`:
   
   Connection
   
   ```
   airflow connections add spark_yarn \
     --conn-type spark \
     --conn-host yarn \
     --conn-extra '{"deploy-mode": "cluster", 
"yarn_resourcemanager_webapp_address": "http://host.docker.internal:8088"}'
   
   ```
   
   Using this DAG:
   
   ```
   
   from datetime import datetime
   
   from airflow.sdk import DAG
   from airflow.providers.apache.spark.operators.spark_submit import 
SparkSubmitOperator
   
   with DAG(
       dag_id="spark_yarn_mode_repro",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
       tags=["aip-103", "yarn", "repro"],
   ) as dag:
       SparkSubmitOperator(
           task_id="submit_long_running_job",
           conn_id="spark_yarn",
           application="/opt/airflow/dev/spark-cluster-yarn/spark-examples.jar",
           java_class="org.apache.spark.examples.SparkPi",
           application_args=["100"],
           conf={
               "spark.yarn.archive": "hdfs:///spark-jars.zip",
               "spark.executor.memory": "512m",
               "spark.driver.memory": "512m",
               "spark.executor.cores": "1",
           },
           env_vars={
               "HADOOP_CONF_DIR": "/tmp/hadoop-conf",
               "HADOOP_USER_NAME": "root",
           },
           num_executors=1,
           retries=2,
           retry_delay=5,
       )
   ```
   
   The DAG submits SparkPi with 10,000 iterations (~2 min runtime) — long 
enough to kill the worker mid-poll and verify reconnection on retry. Check the 
YARN UI to confirm only one application was submitted across multiple Airflow 
task attempt.
   
   
   #### Steps to Verify reconnect behaviour
   
   1. Trigger the dag and wait for yarn application to spin up
   
   <img width="2551" height="1344" alt="image" 
src="https://github.com/user-attachments/assets/ebe3565a-21e5-4de1-8204-ebffb98e6103";
 />
   
   <img width="1723" height="1026" alt="image" 
src="https://github.com/user-attachments/assets/4567463f-fd2f-44f4-bcfc-8f557138cb9d";
 />
   
   
   2. Kill the worker mid way and watch the logs
   
   <img width="1723" height="1026" alt="image" 
src="https://github.com/user-attachments/assets/ca60e709-fa0b-45df-b19b-3bb1754a3c3a";
 />
   
   
   3. Observe on yarn UI, application continues running and completes
   
   <img width="1723" height="1026" alt="image" 
src="https://github.com/user-attachments/assets/af83ce81-6f0f-41c1-983c-219f4e76a067";
 />
   
   
   4. Resume worker once yarn app is complete to observe no resubmission
   
   <img width="1723" height="1026" alt="image" 
src="https://github.com/user-attachments/assets/f0ce61f7-b9f4-43be-a7a5-63dee1c31d92";
 />
   
   
   
   
   
   
   ---
   
   * Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information. Note: commit author/co-author name and email in commits 
become permanently public when merged.
   * For fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   * When adding dependency, check compliance with the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   * For significant user-facing changes create newsfragment: 
`{pr_number}.significant.rst`, in 
[airflow-core/newsfragments](https://github.com/apache/airflow/tree/main/airflow-core/newsfragments).
 You can add this file in a follow-up commit after the PR is created so you 
know the PR number.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to