amoghrajesh commented on code in PR #65991: URL: https://github.com/apache/airflow/pull/65991#discussion_r3311352887
########## providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py: ########## @@ -29,15 +29,25 @@ import uuid from collections.abc import Iterator from pathlib import Path -from typing import Any +from typing import TYPE_CHECKING, Any -from airflow.providers.common.compat.sdk import AirflowException, BaseHook, conf as airflow_conf +import requests Review Comment: requests still isn't a declared dependency in `providers/apache/spark/pyproject.toml`, I am doing it in https://github.com/apache/airflow/pull/67118 anyways, so if this lands after that one, we are good. ########## providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py: ########## @@ -704,6 +792,133 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> None: self.log.info(line) + def _track_yarn_application(self, application_id: str) -> None: + """Poll the YARN RM REST API until ``app.finalStatus`` reaches a terminal value.""" + self.log.info( + "Tracking YARN application %s via ResourceManager REST API polling", + application_id, + ) + poll_interval = max(self._status_poll_interval, 1) Review Comment: `_status_poll_interval` defaults to 1s but for RM REST calls over the network on a job running hours, 1s means tens of thousands of requests. Worth either defaulting higher for the RM path (10-30s) or documenting strongly that users should set `status_poll_interval` higher when using this feature. ########## providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py: ########## @@ -256,6 +308,10 @@ def _resolve_should_track_driver_status(self) -> bool: """ return "spark://" in self._connection["master"] and self._connection["deploy_mode"] == "cluster" + def _should_track_yarn_application_via_rm_api(self) -> bool: + """Return whether this submit should switch to YARN RM REST API polling.""" + return self._yarn_track_via_rm_api and self._is_yarn and self._connection["deploy_mode"] == "cluster" Review Comment: If `yarn_track_via_rm_api=True` but `deploy_mode=client`, this silently returns False and falls back to blocking spark-submit with no signal to the user. Should add a log here or raise at submit() time so users are not confused when the flag appears to have no effect. ########## providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py: ########## @@ -704,6 +792,133 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> None: self.log.info(line) + def _track_yarn_application(self, application_id: str) -> None: + """Poll the YARN RM REST API until ``app.finalStatus`` reaches a terminal value.""" + self.log.info( + "Tracking YARN application %s via ResourceManager REST API polling", + application_id, + ) + poll_interval = max(self._status_poll_interval, 1) + # Tolerate transient RM REST API failures (RM hiccup, network blip, request + # timeout) the same way `_start_driver_status_tracking` does for spark + # standalone — only give up after this many consecutive failures. + consecutive_failures = 0 + max_consecutive_failures = 10 + while True: + self.log.debug("Polling YARN RM REST API for application %s", application_id) + try: + final_status = self._query_yarn_application_final_status(application_id) + except RuntimeError as exc: + consecutive_failures += 1 + if consecutive_failures > max_consecutive_failures: + raise RuntimeError( + f"Giving up tracking YARN application {application_id} after " + f"{max_consecutive_failures} consecutive YARN RM REST API " + f"failures. Last error: {exc}" + ) from exc + self.log.warning( + "Transient YARN RM REST API failure (%d/%d): %s", + consecutive_failures, + max_consecutive_failures, + exc, + ) + time.sleep(poll_interval) + continue + consecutive_failures = 0 + if final_status == self._YARN_FINAL_SUCCESS: + self.log.info("YARN application %s finished with SUCCEEDED", application_id) + return + if final_status in self._YARN_FINAL_FAILURES: + raise RuntimeError( + f"YARN application {application_id} ended with final status: {final_status}" + ) + if final_status != self._YARN_FINAL_UNDEFINED: + raise RuntimeError( + f"YARN application {application_id} returned unexpected final status: {final_status}" + ) + time.sleep(poll_interval) + + def _get_yarn_rm_base_url(self) -> str: + """ + Resolve the YARN ResourceManager webapp base URL from the Spark connection. + + Reads the ``yarn_resourcemanager_webapp_address`` key from the Spark + connection's ``extra`` JSON. Bare ``host:port`` values get ``http://`` + prepended; fully-qualified URLs are used as-is. Trailing slashes stripped. + The resolved URL is cached on the hook instance so the polling loop does + not re-fetch the connection (or re-hit any Secrets Backend) on every iteration. + """ + if self._yarn_rm_base_url is not None: + return self._yarn_rm_base_url + try: + conn = self.get_connection(self._conn_id) + except AirflowNotFoundException: + conn = None + raw = "" + if conn is not None: + raw = (conn.extra_dejson.get(self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY) or "").strip() + if not raw: + raise ValueError( + f"`yarn_track_via_rm_api=True` requires the Spark connection's `extra` to set " + f"`{self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY}` (e.g. `http://rm.example.com:8088`)." + ) + url = raw if "://" in raw else f"http://{raw}" + self._yarn_rm_base_url = url.rstrip("/") + return self._yarn_rm_base_url + + def _query_yarn_application_final_status(self, application_id: str) -> str: + """GET ``/ws/v1/cluster/apps/{id}`` once and return ``app.finalStatus``.""" + url = f"{self._get_yarn_rm_base_url()}/ws/v1/cluster/apps/{application_id}" + try: + resp = requests.get(url, auth=self._yarn_rm_auth, timeout=self._HTTP_TIMEOUT) + except requests.exceptions.RequestException as exc: + raise RuntimeError( + f"YARN RM REST API request for application {application_id} failed: {exc}" + ) from exc + if resp.status_code != 200: + raise RuntimeError( + f"YARN RM REST API returned HTTP {resp.status_code} for application " + f"{application_id}: {resp.text[:200]}" + ) + try: + return resp.json()["app"]["finalStatus"] + except (ValueError, KeyError, TypeError) as exc: + raise RuntimeError( + f"YARN RM REST API returned unexpected payload for application " + f"{application_id}: {resp.text[:200]}" + ) from exc + + def _kill_yarn_application(self, application_id: str) -> None: + """PUT ``/ws/v1/cluster/apps/{id}/state`` to kill the application (best-effort).""" + try: + url = f"{self._get_yarn_rm_base_url()}/ws/v1/cluster/apps/{application_id}/state" + except ValueError as exc: + self.log.warning( + "Cannot send YARN kill for %s: %s", + application_id, + exc, + ) + return + try: + resp = requests.put( + url, + json={"state": "KILLED"}, + auth=self._yarn_rm_auth, + timeout=self._HTTP_TIMEOUT, + ) + except requests.exceptions.RequestException as exc: + self.log.warning("YARN kill request for %s failed: %s", application_id, exc) + return + self.log.info("YARN kill request for %s returned HTTP %s", application_id, resp.status_code) + + @staticmethod + def _is_yarn_application_submitted(line: str, application_id: str) -> bool: + """Return whether a YARN log line means ResourceManager received the application.""" + return ( + f"Submitted application {application_id}" in line + or f"Application report for {application_id}" in line + ) Review Comment: Hard dependency on spark job's "Submitted application" log line appearing in stderr. Log level filtering, custom log formatters, or a future Spark version changing this message would cause the signal to never fire — raising a confusing error even though the app launched. Since the app ID is already extracted from logs, is the second "submitted" signal actually needed, or can this check be dropped? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
