Nataneljpwd commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3237144573


##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -99,13 +99,27 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         job finishes (on both success and on_kill). Useful for cleaning up 
sidecars such
         as Istio (e.g. ``["curl -X POST localhost:15020/quitquitquit"]``). 
Each command
         is executed via the shell; failures produce a warning but do not fail 
the task.
+    :param yarn_track_via_application_status: If True (when master is YARN and

Review Comment:
   Why make it yarn specific? If done correctly, it can support both yarn and 
k8s, I would rather name it "track_application_via_status" or even 
"run_detached_submit" and in this PR at least support it only for yarn, k8s 
support can come later



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -244,6 +259,8 @@ def __init__(
         self._spark_exit_code: int | None = None
         self._env: dict[str, Any] | None = None
         self._post_submit_commands: list[str] = list(post_submit_commands) if 
post_submit_commands else []
+        self._yarn_track_via_application_status = 
yarn_track_via_application_status
+        self._yarn_application_submitted = False

Review Comment:
   Same as the comment above, use spark_application_submitted as if someone 
wants to later implement this not on yarn, it would either be a breaking change 
or another param to keep track of



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -99,13 +99,27 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         job finishes (on both success and on_kill). Useful for cleaning up 
sidecars such
         as Istio (e.g. ``["curl -X POST localhost:15020/quitquitquit"]``). 
Each command
         is executed via the shell; failures produce a warning but do not fail 
the task.
+    :param yarn_track_via_application_status: If True (when master is YARN and
+        deploy_mode is ``cluster``), release the ``spark-submit`` JVM once the
+        application has been submitted to YARN, then poll
+        ``yarn application -status <appId>`` every ``status_poll_interval``

Review Comment:
   This looks a lot like deffered operators, maybe it can be implemented in a 
way to also allow deferral of the operator, thus, allowing it to run on the 
triggerer saving the resources kf the executor, again, for this PR it is enough 
to just make it work without a running jvm



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -611,6 +629,28 @@ def submit(self, application: str = "", **kwargs: Any) -> 
None:
         )
 
         self._process_spark_submit_log(iter(self._submit_sp.stdout))  # type: 
ignore
+
+        if (
+            self._yarn_track_via_application_status
+            and self._is_yarn
+            and self._connection["deploy_mode"] == "cluster"
+            and self._yarn_application_id
+            and self._yarn_application_submitted

Review Comment:
   What sets this? Is it possible for this to be false? Or will the above 
method fail or return/set something that can tell us if the app is submitted?



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10

Review Comment:
   Maybe this should be configurable? As in some multi cluster deployments it 
might fail more than 10 times for a poll interval of 1 second (by default)



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient `yarn application -status` failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _build_yarn_cli_env(self) -> dict[str, str]:
+        """
+        Build the env for invoking the ``yarn`` CLI.
+
+        Always merges the user-supplied ``env_vars`` (e.g. 
``HADOOP_CONF_DIR``).
+        When the connection has both a keytab and a principal, also renews the
+        Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+        can authenticate without piggy-backing on the spark-submit JVM. Renewal
+        failures are tolerated for the same reason as in ``on_kill``: the
+        failure may just be a non-renewable ticket and we still want to attempt
+        the CLI call.
+        """
+        env = {**os.environ, **(self._env or {})}
+        if self._connection["keytab"] is not None and 
self._connection["principal"] is not None:
+            renew_from_kt(self._connection["principal"], 
self._connection["keytab"], exit_on_fail=False)
+            env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos", 
"ccache")
+        return env
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """Run ``yarn application -status <id>`` once and return the 
Final-State string."""
+        cmd = ["yarn", "application", "-status", application_id]
+        yarn_status_timeout = 30
+        try:
+            proc = subprocess.run(
+                cmd,
+                env=self._build_yarn_cli_env(),
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                text=True,
+                check=False,
+                timeout=yarn_status_timeout,
+            )
+        except subprocess.TimeoutExpired as exc:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` timed out after 
{yarn_status_timeout} seconds"
+            ) from exc
+        if proc.returncode != 0:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` failed "
+                f"(rc={proc.returncode}): {proc.stdout.strip()}"
+            )
+        # `yarn application -status` prints a line like "Final-State : 
SUCCEEDED".
+        for raw in proc.stdout.splitlines():
+            if "Final-State" in raw and ":" in raw:
+                return raw.split(":", 1)[1].strip()
+        return self._YARN_FINAL_UNDEFINED

Review Comment:
   This method is very confusing, when an app is running, it will return that 
the state is undefined, and the method calling it only checks for failed 
statuses, what if the app is indeed undefined? This will loop forever, nor 
should it be called get_final_state, I would rather it return either the actual 
state (running) or return nothing and rename accordingly 



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -86,6 +86,13 @@ class SparkSubmitOperator(BaseOperator):
                            on keytab for Kerberos login
     :param post_submit_commands: Optional list of shell commands to run after 
the Spark job finishes.
         Useful for cleaning up sidecars such as Istio. Failures produce a 
warning but do not fail the task.
+    :param yarn_track_via_application_status: If True (and deploy_mode is YARN 
cluster),
+        release the ``spark-submit`` JVM once the application has been 
submitted to
+        YARN, then poll ``yarn application -status <appId>`` every
+        ``status_poll_interval`` seconds until the application reaches a final 
state.
+        This frees the worker from holding the long-lived submit JVM. Requires 
the

Review Comment:
   Yarn cli requires the jvm to start, so we just added the overhead of 
starting a jvm which can take a few seconds, and starting a jvm is more 
resource intensive than having it just run idle, but this change has several 
other benefits 



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",

Review Comment:
   I don't like it that you add the command to poll the app status everywhere, 
as if it changes (i.e a flag is added) some code might stay stale



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -594,6 +611,7 @@ def submit(self, application: str = "", **kwargs: Any) -> 
None:
         :param application: Submitted application, jar or py file
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         """
+        self._yarn_application_submitted = False

Review Comment:
   Why set it to false here? Shouldn't it be false due to the constructor? In 
general, it is a little confusing to set a variable defining submission status 
to false in a method called submit 



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -670,6 +710,16 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> 
None:
                 if match:
                     self._yarn_application_id = match.group(0)
                     self.log.info("Identified spark application id: %s", 
self._yarn_application_id)
+                    # Once the YARN application id is known and the caller 
opted in to
+                    # `yarn application -status` polling, stop consuming the 
JVM stdout
+                    # so submit() can terminate the spark-submit process and 
switch to
+                    # polling ResourceManager.
+                    if self._yarn_track_via_application_status and 
self._is_yarn_application_submitted(
+                        line, self._yarn_application_id
+                    ):
+                        self._yarn_application_submitted = True

Review Comment:
   Why don't we set the app submitted bool either way without the if? Do we 
even need it? I think it might be possible to remove it altogether 



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -611,6 +629,28 @@ def submit(self, application: str = "", **kwargs: Any) -> 
None:
         )
 
         self._process_spark_submit_log(iter(self._submit_sp.stdout))  # type: 
ignore
+
+        if (
+            self._yarn_track_via_application_status
+            and self._is_yarn
+            and self._connection["deploy_mode"] == "cluster"
+            and self._yarn_application_id
+            and self._yarn_application_submitted
+        ):
+            # JVM is idle waiting for its stdout to be drained; terminate and 
reap it,
+            # then switch to polling ResourceManager via `yarn application 
-status`.
+            self._submit_sp.terminate()
+            try:
+                self._submit_sp.wait(timeout=30)
+            except subprocess.TimeoutExpired:
+                self._submit_sp.kill()
+                self._submit_sp.wait()

Review Comment:
   Why not use spark.submit.(yarn|kubernetes).wait for completion=false instead?



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -611,6 +629,28 @@ def submit(self, application: str = "", **kwargs: Any) -> 
None:
         )
 
         self._process_spark_submit_log(iter(self._submit_sp.stdout))  # type: 
ignore
+
+        if (
+            self._yarn_track_via_application_status
+            and self._is_yarn
+            and self._connection["deploy_mode"] == "cluster"
+            and self._yarn_application_id
+            and self._yarn_application_submitted
+        ):
+            # JVM is idle waiting for its stdout to be drained; terminate and 
reap it,
+            # then switch to polling ResourceManager via `yarn application 
-status`.
+            self._submit_sp.terminate()
+            try:
+                self._submit_sp.wait(timeout=30)
+            except subprocess.TimeoutExpired:
+                self._submit_sp.kill()
+                self._submit_sp.wait()
+            try:
+                self._track_yarn_application(self._yarn_application_id)

Review Comment:
   I would rather give this method a more generic name



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)

Review Comment:
   Maybe the min value here should be larger? As spark apps  usually run for 
(at minimum) half an hour, some running for days, and having a lot of spark 
applications (we have around 3000 running concurrently) spamming the RM with 
requests every second might not be ideal as we can probably wait a minute 
before getting the status (maybe even more)



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)

Review Comment:
   I would remove the word final if we are polling



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient `yarn application -status` failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _build_yarn_cli_env(self) -> dict[str, str]:
+        """
+        Build the env for invoking the ``yarn`` CLI.
+
+        Always merges the user-supplied ``env_vars`` (e.g. 
``HADOOP_CONF_DIR``).
+        When the connection has both a keytab and a principal, also renews the
+        Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+        can authenticate without piggy-backing on the spark-submit JVM. Renewal
+        failures are tolerated for the same reason as in ``on_kill``: the
+        failure may just be a non-renewable ticket and we still want to attempt
+        the CLI call.
+        """
+        env = {**os.environ, **(self._env or {})}
+        if self._connection["keytab"] is not None and 
self._connection["principal"] is not None:
+            renew_from_kt(self._connection["principal"], 
self._connection["keytab"], exit_on_fail=False)
+            env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos", 
"ccache")
+        return env
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """Run ``yarn application -status <id>`` once and return the 
Final-State string."""
+        cmd = ["yarn", "application", "-status", application_id]
+        yarn_status_timeout = 30
+        try:
+            proc = subprocess.run(
+                cmd,
+                env=self._build_yarn_cli_env(),
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                text=True,
+                check=False,
+                timeout=yarn_status_timeout,
+            )
+        except subprocess.TimeoutExpired as exc:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` timed out after 
{yarn_status_timeout} seconds"
+            ) from exc
+        if proc.returncode != 0:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` failed "
+                f"(rc={proc.returncode}): {proc.stdout.strip()}"
+            )
+        # `yarn application -status` prints a line like "Final-State : 
SUCCEEDED".
+        for raw in proc.stdout.splitlines():
+            if "Final-State" in raw and ":" in raw:

Review Comment:
   Isn't it just Final-State:
   Maybe it is simpler to look for the possible states instead?



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -141,6 +148,7 @@ def __init__(
         deploy_mode: str | None = None,
         use_krb5ccache: bool = False,
         post_submit_commands: list[str] | None = None,
+        yarn_track_via_application_status: bool = False,

Review Comment:
   Same as before, change to generic name



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient `yarn application -status` failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _build_yarn_cli_env(self) -> dict[str, str]:
+        """
+        Build the env for invoking the ``yarn`` CLI.
+
+        Always merges the user-supplied ``env_vars`` (e.g. 
``HADOOP_CONF_DIR``).
+        When the connection has both a keytab and a principal, also renews the
+        Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+        can authenticate without piggy-backing on the spark-submit JVM. Renewal
+        failures are tolerated for the same reason as in ``on_kill``: the
+        failure may just be a non-renewable ticket and we still want to attempt
+        the CLI call.
+        """
+        env = {**os.environ, **(self._env or {})}
+        if self._connection["keytab"] is not None and 
self._connection["principal"] is not None:
+            renew_from_kt(self._connection["principal"], 
self._connection["keytab"], exit_on_fail=False)
+            env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos", 
"ccache")
+        return env
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """Run ``yarn application -status <id>`` once and return the 
Final-State string."""
+        cmd = ["yarn", "application", "-status", application_id]
+        yarn_status_timeout = 30
+        try:
+            proc = subprocess.run(
+                cmd,
+                env=self._build_yarn_cli_env(),
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                text=True,
+                check=False,
+                timeout=yarn_status_timeout,
+            )
+        except subprocess.TimeoutExpired as exc:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` timed out after 
{yarn_status_timeout} seconds"
+            ) from exc
+        if proc.returncode != 0:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` failed "
+                f"(rc={proc.returncode}): {proc.stdout.strip()}"
+            )
+        # `yarn application -status` prints a line like "Final-State : 
SUCCEEDED".
+        for raw in proc.stdout.splitlines():
+            if "Final-State" in raw and ":" in raw:
+                return raw.split(":", 1)[1].strip()
+        return self._YARN_FINAL_UNDEFINED
+
+    @staticmethod
+    def _is_yarn_application_submitted(line: str, application_id: str) -> bool:
+        """Return whether a YARN log line means ResourceManager received the 
application."""
+        return (
+            f"Submitted application {application_id}" in line
+            or f"Application report for {application_id}" in line
+        )

Review Comment:
   What is the boolean flag above for then?



##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -704,6 +754,103 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) 
-> None:
 
             self.log.info(line)
 
+    def _track_yarn_application(self, application_id: str) -> None:
+        """Poll ``yarn application -status <id>`` until a final state is 
reached."""
+        self.log.info(
+            "Tracking YARN application %s via 'yarn application -status' 
polling",
+            application_id,
+        )
+        poll_interval = max(self._status_poll_interval, 1)
+        # Tolerate transient `yarn application -status` failures (RM hiccup, 
network
+        # blip, CLI timeout) the same way `_start_driver_status_tracking` does 
for
+        # spark standalone — only give up after this many consecutive failures.
+        consecutive_failures = 0
+        max_consecutive_failures = 10
+        while True:
+            self.log.debug("Polling: yarn application -status %s", 
application_id)
+            try:
+                final_status = 
self._query_yarn_application_final_status(application_id)
+            except RuntimeError as exc:
+                consecutive_failures += 1
+                if consecutive_failures > max_consecutive_failures:
+                    raise RuntimeError(
+                        f"Giving up tracking YARN application {application_id} 
after "
+                        f"{max_consecutive_failures} consecutive `yarn 
application -status` "
+                        f"failures. Last error: {exc}"
+                    ) from exc
+                self.log.warning(
+                    "Transient `yarn application -status` failure (%d/%d): %s",
+                    consecutive_failures,
+                    max_consecutive_failures,
+                    exc,
+                )
+                time.sleep(poll_interval)
+                continue
+            consecutive_failures = 0
+            if final_status == self._YARN_FINAL_SUCCESS:
+                self.log.info("YARN application %s finished with SUCCEEDED", 
application_id)
+                return
+            if final_status in self._YARN_FINAL_FAILURES:
+                raise RuntimeError(
+                    f"YARN application {application_id} ended with final 
status: {final_status}"
+                )
+            time.sleep(poll_interval)
+
+    def _build_yarn_cli_env(self) -> dict[str, str]:
+        """
+        Build the env for invoking the ``yarn`` CLI.
+
+        Always merges the user-supplied ``env_vars`` (e.g. 
``HADOOP_CONF_DIR``).
+        When the connection has both a keytab and a principal, also renews the
+        Kerberos TGT into a ccache and exposes it via ``KRB5CCNAME`` so the CLI
+        can authenticate without piggy-backing on the spark-submit JVM. Renewal
+        failures are tolerated for the same reason as in ``on_kill``: the
+        failure may just be a non-renewable ticket and we still want to attempt
+        the CLI call.
+        """
+        env = {**os.environ, **(self._env or {})}
+        if self._connection["keytab"] is not None and 
self._connection["principal"] is not None:
+            renew_from_kt(self._connection["principal"], 
self._connection["keytab"], exit_on_fail=False)
+            env["KRB5CCNAME"] = airflow_conf.get_mandatory_value("kerberos", 
"ccache")
+        return env
+
+    def _query_yarn_application_final_status(self, application_id: str) -> str:
+        """Run ``yarn application -status <id>`` once and return the 
Final-State string."""
+        cmd = ["yarn", "application", "-status", application_id]
+        yarn_status_timeout = 30
+        try:
+            proc = subprocess.run(
+                cmd,
+                env=self._build_yarn_cli_env(),
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                text=True,
+                check=False,
+                timeout=yarn_status_timeout,
+            )
+        except subprocess.TimeoutExpired as exc:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` timed out after 
{yarn_status_timeout} seconds"
+            ) from exc
+        if proc.returncode != 0:
+            raise RuntimeError(
+                f"`yarn application -status {application_id}` failed "
+                f"(rc={proc.returncode}): {proc.stdout.strip()}"
+            )
+        # `yarn application -status` prints a line like "Final-State : 
SUCCEEDED".
+        for raw in proc.stdout.splitlines():
+            if "Final-State" in raw and ":" in raw:
+                return raw.split(":", 1)[1].strip()
+        return self._YARN_FINAL_UNDEFINED
+
+    @staticmethod
+    def _is_yarn_application_submitted(line: str, application_id: str) -> bool:
+        """Return whether a YARN log line means ResourceManager received the 
application."""
+        return (
+            f"Submitted application {application_id}" in line
+            or f"Application report for {application_id}" in line
+        )

Review Comment:
   Can't I just reuse this? Can I have an application id without a submitted 
app?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to