amoghrajesh commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3297280129
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +770,128 @@ def _process_spark_submit_log(self, itr: Iterator[Any])
-> None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll the YARN RM REST API until ``app.finalStatus`` reaches a
terminal value."""
+ self.log.info(
+ "Tracking YARN application %s via ResourceManager REST API
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 1)
+ # Tolerate transient RM REST API failures (RM hiccup, network blip,
request
+ # timeout) the same way `_start_driver_status_tracking` does for spark
+ # standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling YARN RM REST API for application %s",
application_id)
+ try:
+ final_status =
self._query_yarn_application_final_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive YARN RM REST
API "
+ f"failures. Last error: {exc}"
+ ) from exc
+ self.log.warning(
+ "Transient YARN RM REST API failure (%d/%d): %s",
+ consecutive_failures,
+ max_consecutive_failures,
+ exc,
+ )
+ time.sleep(poll_interval)
+ continue
+ consecutive_failures = 0
+ if final_status == self._YARN_FINAL_SUCCESS:
+ self.log.info("YARN application %s finished with SUCCEEDED",
application_id)
+ return
+ if final_status in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with final
status: {final_status}"
+ )
+ if final_status != self._YARN_FINAL_UNDEFINED:
+ raise RuntimeError(
+ f"YARN application {application_id} returned unexpected
final status: {final_status}"
+ )
+ time.sleep(poll_interval)
+
+ def _get_yarn_rm_base_url(self) -> str:
+ """
+ Resolve the YARN ResourceManager webapp base URL from the Spark
connection.
+
+ Reads the ``yarn_resourcemanager_webapp_address`` key from the Spark
+ connection's ``extra`` JSON. Bare ``host:port`` values get ``http://``
+ prepended; fully-qualified URLs are used as-is. Trailing slashes
stripped.
+ """
+ try:
+ conn = self.get_connection(self._conn_id)
+ except AirflowException:
+ conn = None
+ raw = ""
+ if conn is not None:
+ raw =
(conn.extra_dejson.get(self._YARN_RM_WEBAPP_ADDRESS_EXTRA_KEY) or "").strip()
Review Comment:
You will also have to add `yarn_resourcemanager_webapp_address` to extras
right? Add in provider.yml
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]