amoghrajesh commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3340063074
##########
providers/apache/spark/docs/operators.rst:
##########
@@ -214,3 +214,61 @@ See :doc:`connections/spark-submit` for how to configure
these fields.
.. note::
Crash recovery in cluster mode requires Airflow 3.3+ (``task_state``
support). On earlier
versions the operator falls back to the previous behavior of always
submitting fresh.
+
+YARN ResourceManager API tracking
+"""""""""""""""""""""""""""""""""
+
+When running Spark applications on YARN in cluster deploy mode, the default
Spark submit path keeps
+the local ``spark-submit`` JVM alive on the Airflow worker while the YARN
+application runs. For long-running Spark applications this can keep worker
memory tied up for the
+whole application lifetime.
+
+Set ``yarn_track_via_rm_api=True`` to release the local ``spark-submit`` JVM
after YARN accepts the
+application, then poll the YARN ResourceManager REST API until the application
reaches a terminal
+state. The ResourceManager API polling interval is controlled by
``status_poll_interval`` with a
+minimum of 10 seconds.
+
+This mode requires the Spark connection extra to set
``yarn_resourcemanager_webapp_address`` before
+the application is submitted:
+
+.. code-block:: bash
+
+ airflow connections add spark_yarn_rm \
+ --conn-type spark \
+ --conn-host yarn \
+ --conn-extra '{
+ "deploy-mode": "cluster",
+ "yarn_resourcemanager_webapp_address": "http://rm.example.com:8088"
+ }'
+
+.. code-block:: python
+
+ SparkSubmitOperator(
+ task_id="spark_pi",
+ conn_id="spark_yarn_rm",
+ application="/path/to/spark-examples.jar",
+ java_class="org.apache.spark.examples.SparkPi",
+ deploy_mode="cluster",
+ yarn_track_via_rm_api=True,
+ )
+
+For Kerberized clusters, install ``requests-kerberos`` in the Airflow
environment. When the
+Spark connection has both ``keytab`` and ``principal`` configured, Airflow
automatically uses
+``HTTPKerberosAuth()`` for the ResourceManager REST requests.
+
+Use ``yarn_rm_auth`` only when the ResourceManager needs a custom ``requests``
authentication
+object:
+
+.. code-block:: python
+
+ import requests
+
+ SparkSubmitOperator(
+ task_id="spark_pi",
+ conn_id="spark_yarn_rm",
+ application="/path/to/spark-examples.jar",
+ java_class="org.apache.spark.examples.SparkPi",
+ deploy_mode="cluster",
+ yarn_track_via_rm_api=True,
+ yarn_rm_auth=requests.auth.HTTPBasicAuth("user", "password"),
+ )
Review Comment:
```suggestion
For Kerberized clusters, install ``requests-kerberos`` in the Airflow
environment. When the
Spark connection has both ``keytab`` and ``principal`` configured, Airflow
automatically uses
``HTTPKerberosAuth()`` for the ResourceManager REST requests. For
ccache-based Kerberos
(``use_krb5ccache=True``), pass ``yarn_rm_auth=HTTPKerberosAuth()``
explicitly.
Use ``yarn_rm_auth`` only when the ResourceManager needs a custom
``requests`` authentication
object:
.. code-block:: python
import requests
SparkSubmitOperator(
task_id="spark_pi",
conn_id="spark_yarn_rm",
application="/path/to/spark-examples.jar",
java_class="org.apache.spark.examples.SparkPi",
deploy_mode="cluster",
yarn_track_via_rm_api=True,
yarn_rm_auth=requests.auth.HTTPBasicAuth("user", "password"),
)
```
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -712,6 +808,155 @@ def _process_spark_submit_log(self, itr: Iterator[Any])
-> None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll the YARN RM REST API until the application reaches a terminal
state."""
+ self.log.info(
+ "Tracking YARN application %s via ResourceManager REST API
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 10)
+ # Tolerate transient RM REST API failures (RM hiccup, network blip,
request
+ # timeout) the same way `_start_driver_status_tracking` does for spark
+ # standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling YARN RM REST API for application %s",
application_id)
+ try:
+ state, final_status =
self._query_yarn_application_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive YARN RM REST
API "
+ f"failures. Last error: {exc}"
+ ) from exc
+ self.log.warning(
+ "Transient YARN RM REST API failure (%d/%d): %s",
+ consecutive_failures,
+ max_consecutive_failures,
+ exc,
+ )
+ time.sleep(poll_interval)
+ continue
+ consecutive_failures = 0
+ if state in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with state:
{state}, "
+ f"final status: {final_status}"
+ )
+ if final_status == self._YARN_FINAL_SUCCESS:
+ self.log.info("YARN application %s finished with SUCCEEDED",
application_id)
+ return
+ if final_status in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with final
status: {final_status}"
+ )
+ if final_status != self._YARN_FINAL_UNDEFINED:
+ raise RuntimeError(
+ f"YARN application {application_id} returned unexpected
final status: {final_status}"
+ )
+ time.sleep(poll_interval)
+
+ def _get_yarn_rm_base_url(self) -> str:
+ """
+ Resolve the YARN ResourceManager webapp base URL from the Spark
connection.
+
+ Reads the ``yarn_resourcemanager_webapp_address`` key from the Spark
+ connection's ``extra`` JSON. Bare ``host:port`` values get ``http://``
+ prepended; fully-qualified URLs are used as-is. Trailing slashes
stripped.
+ The resolved URL is cached on the hook instance so the polling loop
does
+ not re-fetch the connection (or re-hit any Secrets Backend) on every
iteration.
+ """
+ if self._yarn_rm_base_url is not None:
+ return self._yarn_rm_base_url
+ try:
+ conn = self.get_connection(self._conn_id)
+ except AirflowNotFoundException:
+ conn = None
+ raw = ""
+ if conn is not None:
+ raw =
(conn.extra_dejson.get(self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY) or "").strip()
+ if not raw:
+ raise ValueError(
+ f"`yarn_track_via_rm_api=True` requires the Spark connection's
`extra` to set "
+ f"`{self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY}` (e.g.
`http://rm.example.com:8088`)."
+ )
+ url = raw if "://" in raw else f"http://{raw}"
+ self._yarn_rm_base_url = url.rstrip("/")
+ return self._yarn_rm_base_url
+
+ @cached_property
+ def _resolved_yarn_rm_auth(self) -> AuthBase | None:
+ """
+ Resolve the auth object for YARN ResourceManager REST API requests.
+
+ Explicit ``yarn_rm_auth`` wins. If omitted, Kerberos-enabled Spark
+ connections automatically use ``requests_kerberos.HTTPKerberosAuth``.
+ """
+ if self._yarn_rm_auth is not None:
+ return self._yarn_rm_auth
+ if self._connection.get("keytab") and
self._connection.get("principal"):
Review Comment:
```suggestion
if self._connection.get("keytab") and
self._connection.get("principal") or self._use_krb5ccache::
```
Without this change, `resolved_yarn_rm_auth` returns None for ccache users,
so the REST calls to the RM go out with no `Authorization` header. The RM on a
Kerberized cluster responds with 401 and polling fails.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]