amoghrajesh commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3340063074


##########
providers/apache/spark/docs/operators.rst:
##########
@@ -214,3 +214,61 @@ See :doc:`connections/spark-submit` for how to configure 
these fields.
 .. note::
     Crash recovery in cluster mode requires Airflow 3.3+ (``task_state`` 
support). On earlier
     versions the operator falls back to the previous behavior of always 
submitting fresh.
+
+YARN ResourceManager API tracking
+"""""""""""""""""""""""""""""""""
+
+When running Spark applications on YARN in cluster deploy mode, the default 
Spark submit path keeps
+the local ``spark-submit`` JVM alive on the Airflow worker while the YARN
+application runs. For long-running Spark applications this can keep worker 
memory tied up for the
+whole application lifetime.
+
+Set ``yarn_track_via_rm_api=True`` to release the local ``spark-submit`` JVM 
after YARN accepts the
+application, then poll the YARN ResourceManager REST API until the application 
reaches a terminal
+state. The ResourceManager API polling interval is controlled by 
``status_poll_interval`` with a
+minimum of 10 seconds.
+
+This mode requires the Spark connection extra to set 
``yarn_resourcemanager_webapp_address`` before
+the application is submitted:
+
+.. code-block:: bash
+
+    airflow connections add spark_yarn_rm \
+        --conn-type spark \
+        --conn-host yarn \
+        --conn-extra '{
+            "deploy-mode": "cluster",
+            "yarn_resourcemanager_webapp_address": "http://rm.example.com:8088";
+        }'
+
+.. code-block:: python
+
+    SparkSubmitOperator(
+        task_id="spark_pi",
+        conn_id="spark_yarn_rm",
+        application="/path/to/spark-examples.jar",
+        java_class="org.apache.spark.examples.SparkPi",
+        deploy_mode="cluster",
+        yarn_track_via_rm_api=True,
+    )
+
+For Kerberized clusters, install ``requests-kerberos`` in the Airflow 
environment. When the
+Spark connection has both ``keytab`` and ``principal`` configured, Airflow 
automatically uses
+``HTTPKerberosAuth()`` for the ResourceManager REST requests.
+
+Use ``yarn_rm_auth`` only when the ResourceManager needs a custom ``requests`` 
authentication
+object:
+
+.. code-block:: python
+
+    import requests
+
+    SparkSubmitOperator(
+        task_id="spark_pi",
+        conn_id="spark_yarn_rm",
+        application="/path/to/spark-examples.jar",
+        java_class="org.apache.spark.examples.SparkPi",
+        deploy_mode="cluster",
+        yarn_track_via_rm_api=True,
+        yarn_rm_auth=requests.auth.HTTPBasicAuth("user", "password"),
+    )

Review Comment:
   ```suggestion
   For Kerberized clusters, install ``requests-kerberos`` in the Airflow 
environment. When the
   Spark connection has both ``keytab`` and ``principal`` configured, Airflow 
automatically uses
   ``HTTPKerberosAuth()`` for the ResourceManager REST requests. For 
ccache-based Kerberos
   (``use_krb5ccache=True``), pass ``yarn_rm_auth=HTTPKerberosAuth()`` 
explicitly.
   
   Use ``yarn_rm_auth`` only when the ResourceManager needs a custom 
``requests`` authentication
   object:
   
   .. code-block:: python
   
       import requests
   
       SparkSubmitOperator(
           task_id="spark_pi",
           conn_id="spark_yarn_rm",
           application="/path/to/spark-examples.jar",
           java_class="org.apache.spark.examples.SparkPi",
           deploy_mode="cluster",
           yarn_track_via_rm_api=True,
           yarn_rm_auth=requests.auth.HTTPBasicAuth("user", "password"),
       )
   ```



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -712,6 +808,155 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll the YARN RM REST API until the application reaches a terminal 
state."""
+        self.log.info(
+            "Tracking YARN application %s via ResourceManager REST API 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 10)
+        # Tolerate transient RM REST API failures (RM hiccup, network blip, 
request
+        # timeout) the same way `_start_driver_status_tracking` does for spark
+        # standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling YARN RM REST API for application %s", 
application_id)
+            try:
+                state, final_status = 
self._query_yarn_application_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive YARN RM REST 
API "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient YARN RM REST API failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if state in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with state: 
{state}, "
+                    f"final status: {final_status}"
+                )
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            if final_status != self._YARN_FINAL_UNDEFINED:
+                raise RuntimeError(
+                    f"YARN application {application_id} returned unexpected 
final status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _get_yarn_rm_base_url(self) -> str:
+        """
+        Resolve the YARN ResourceManager webapp base URL from the Spark 
connection.
+
+        Reads the ``yarn_resourcemanager_webapp_address`` key from the Spark
+        connection's ``extra`` JSON. Bare ``host:port`` values get ``http://``
+        prepended; fully-qualified URLs are used as-is. Trailing slashes 
stripped.
+        The resolved URL is cached on the hook instance so the polling loop 
does
+        not re-fetch the connection (or re-hit any Secrets Backend) on every 
iteration.
+        """
+        if self._yarn_rm_base_url is not None:
+            return self._yarn_rm_base_url
+        try:
+            conn = self.get_connection(self._conn_id)
+        except AirflowNotFoundException:
+            conn = None
+        raw = ""
+        if conn is not None:
+            raw = 
(conn.extra_dejson.get(self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY) or "").strip()
+        if not raw:
+            raise ValueError(
+                f"`yarn_track_via_rm_api=True` requires the Spark connection's 
`extra` to set "
+                f"`{self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY}` (e.g. 
`http://rm.example.com:8088`)."
+            )
+        url = raw if "://" in raw else f"http://{raw}";
+        self._yarn_rm_base_url = url.rstrip("/")
+        return self._yarn_rm_base_url
+
+    @cached_property
+    def _resolved_yarn_rm_auth(self) -> AuthBase | None:
+        """
+        Resolve the auth object for YARN ResourceManager REST API requests.
+
+        Explicit ``yarn_rm_auth`` wins. If omitted, Kerberos-enabled Spark
+        connections automatically use ``requests_kerberos.HTTPKerberosAuth``.
+        """
+        if self._yarn_rm_auth is not None:
+            return self._yarn_rm_auth
+        if self._connection.get("keytab") and 
self._connection.get("principal"):

Review Comment:
   ```suggestion
           if self._connection.get("keytab") and 
self._connection.get("principal") or self._use_krb5ccache::
   ```
   
   Without this change, `resolved_yarn_rm_auth` returns None for ccache users, 
so the REST calls to the RM go out with no `Authorization` header. The RM on a 
Kerberized cluster responds with 401 and polling fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to