amoghrajesh commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3332018712
##########
providers/apache/spark/docs/operators.rst:
##########
@@ -202,3 +202,58 @@ deferrable operators, which free the worker slot but may
come with added complex
.. note::
Crash recovery in cluster mode requires Airflow 3.3+ (``task_state``
support). On earlier
versions the operator falls back to the previous behavior of always
submitting fresh.
+
+YARN ResourceManager API tracking
+"""""""""""""""""""""""""""""""""
+
+When running Spark applications on YARN in cluster deploy mode, the default
Spark submit path keeps
+the local ``spark-submit`` JVM alive on the Airflow worker while the YARN
+application runs. For long-running Spark applications this can keep worker
memory tied up for the
+whole application lifetime.
+
+Set ``yarn_track_via_rm_api=True`` to release the local ``spark-submit`` JVM
after YARN accepts the
+application, then poll the YARN ResourceManager REST API until the application
reaches a terminal
+state. The ResourceManager API polling interval is controlled by
``status_poll_interval`` with a
+minimum of 10 seconds.
+
+This mode requires the Spark connection extra to set
``yarn_resourcemanager_webapp_address`` before
+the application is submitted:
+
+.. code-block:: bash
+
+ airflow connections add spark_yarn_rm \
+ --conn-type spark \
+ --conn-host yarn \
+ --conn-extra '{
+ "deploy-mode": "cluster",
+ "yarn_resourcemanager_webapp_address": "http://rm.example.com:8088"
+ }'
+
+.. code-block:: python
+
+ SparkSubmitOperator(
+ task_id="spark_pi",
+ conn_id="spark_yarn_rm",
+ application="/path/to/spark-examples.jar",
+ java_class="org.apache.spark.examples.SparkPi",
+ deploy_mode="cluster",
+ yarn_track_via_rm_api=True,
+ )
+
+Kerberized clusters also need an authentication object for the ResourceManager
REST requests.
+Install ``requests-kerberos`` in the Airflow environment and pass
``HTTPKerberosAuth()`` via
+``yarn_rm_auth``:
+
+.. code-block:: python
+
+ from requests_kerberos import HTTPKerberosAuth
+
+ SparkSubmitOperator(
+ task_id="spark_pi",
+ conn_id="spark_yarn_rm",
+ application="/path/to/spark-examples.jar",
+ java_class="org.apache.spark.examples.SparkPi",
+ deploy_mode="cluster",
+ yarn_track_via_rm_api=True,
+ yarn_rm_auth=HTTPKerberosAuth(),
+ )
Review Comment:
The `yarn_rm_auth` parameter works, but I think we can make the Kerberos
case transparent without requiring users to pass anything extra.
The hook already knows Kerberos is configured when `keytab` + `principal`
are set on the connection (same signal the existing `on_kill` path uses to call
`renew_from_kt`). We could auto-construct `HTTPKerberosAuth()` from that,
something like:
```python
def _resolve_yarn_rm_auth(self) -> AuthBase | None:
if self._yarn_rm_auth is not None:
return self._yarn_rm_auth
if self._connection.get("keytab") and self._connection.get("principal"):
try:
from requests_kerberos import HTTPKerberosAuth
return HTTPKerberosAuth()
except ImportError:
raise RuntimeError(
"Kerberos credentials found but `requests-kerberos` is not
installed. "
"Run: pip install requests-kerberos"
)
return None
```
Then `_query_yarn_application_status` and `_kill_yarn_application` call
`_resolve_yarn_rm_auth()` instead of `self._yarn_rm_auth` directly.
This way Kerberized clusters just work with the same connection config they
already have. `yarn_rm_auth` stays as an escape hatch for custom auth schemes.
The docs section on Kerberos could also be simplified to just "make sure
requests-kerberos is installed".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]