nailo2c commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3268185351
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any])
-> None:
self.log.info(line)
+ def _track_yarn_application(self, application_id: str) -> None:
+ """Poll ``yarn application -status <id>`` until a final state is
reached."""
+ self.log.info(
+ "Tracking YARN application %s via 'yarn application -status'
polling",
+ application_id,
+ )
+ poll_interval = max(self._status_poll_interval, 1)
+ # Tolerate transient `yarn application -status` failures (RM hiccup,
network
+ # blip, CLI timeout) the same way `_start_driver_status_tracking` does
for
+ # spark standalone — only give up after this many consecutive failures.
+ consecutive_failures = 0
+ max_consecutive_failures = 10
+ while True:
+ self.log.debug("Polling: yarn application -status %s",
application_id)
+ try:
+ final_status =
self._query_yarn_application_final_status(application_id)
+ except RuntimeError as exc:
+ consecutive_failures += 1
+ if consecutive_failures > max_consecutive_failures:
+ raise RuntimeError(
+ f"Giving up tracking YARN application {application_id}
after "
+ f"{max_consecutive_failures} consecutive `yarn
application -status` "
+ f"failures. Last error: {exc}"
+ ) from exc
+ self.log.warning(
+ "Transient `yarn application -status` failure (%d/%d): %s",
+ consecutive_failures,
+ max_consecutive_failures,
+ exc,
+ )
+ time.sleep(poll_interval)
+ continue
+ consecutive_failures = 0
+ if final_status == self._YARN_FINAL_SUCCESS:
+ self.log.info("YARN application %s finished with SUCCEEDED",
application_id)
+ return
+ if final_status in self._YARN_FINAL_FAILURES:
+ raise RuntimeError(
+ f"YARN application {application_id} ended with final
status: {final_status}"
+ )
+ time.sleep(poll_interval)
+
+ def _build_yarn_cli_env(self) -> dict[str, str]:
+ """
+ Build the env for invoking the ``yarn`` CLI.
+
+ Always merges the user-supplied ``env_vars`` (e.g.
``HADOOP_CONF_DIR``).
+ When the connection has both a keytab and a principal, also renews the
+ Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+ can authenticate without piggy-backing on the spark-submit JVM. Renewal
+ failures are tolerated for the same reason as in ``on_kill``: the
+ failure may just be a non-renewable ticket and we still want to attempt
+ the CLI call.
+ """
+ env = {**os.environ, **(self._env or {})}
+ if self._connection["keytab"] is not None and
self._connection["principal"] is not None:
+ renew_from_kt(self._connection["principal"],
self._connection["keytab"], exit_on_fail=False)
+ env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos",
"ccache")
+ return env
+
+ def _query_yarn_application_final_status(self, application_id: str) -> str:
+ """Run ``yarn application -status <id>`` once and return the
Final-State string."""
+ cmd = ["yarn", "application", "-status", application_id]
+ yarn_status_timeout = 30
+ try:
+ proc = subprocess.run(
+ cmd,
+ env=self._build_yarn_cli_env(),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ check=False,
+ timeout=yarn_status_timeout,
+ )
+ except subprocess.TimeoutExpired as exc:
+ raise RuntimeError(
+ f"`yarn application -status {application_id}` timed out after
{yarn_status_timeout} seconds"
+ ) from exc
+ if proc.returncode != 0:
+ raise RuntimeError(
+ f"`yarn application -status {application_id}` failed "
+ f"(rc={proc.returncode}): {proc.stdout.strip()}"
+ )
+ # `yarn application -status` prints a line like "Final-State :
SUCCEEDED".
+ for raw in proc.stdout.splitlines():
+ if "Final-State" in raw and ":" in raw:
+ return raw.split(":", 1)[1].strip()
+ return self._YARN_FINAL_UNDEFINED
Review Comment:
I made the `Final-State` handling explicit now, `UNDEFINED` keeps polling,
known terminal states return/raise, and any unexpected value fails instead of
being treated as still in progress.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]