amoghrajesh commented on code in PR #67473:
URL: https://github.com/apache/airflow/pull/67473#discussion_r3368777792
##########
providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -1298,3 +1326,36 @@ def on_kill(self) -> None:
self._kill_yarn_application(self._yarn_application_id)
self._run_post_submit_commands()
+
+ def query_yarn_application_status(self, application_id: str) -> str:
+ """
+ Return a normalised single string status for the ResumableJobMixin
interface.
+
+ - Active states (NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING) are
returned as-is.
+ - Terminal states are collapsed to "SUCCEEDED" or "FAILED" with the
following rules:
+ - FINISHED + finalStatus SUCCEEDED -> "SUCCEEDED"
+ - FINISHED + any other finalStatus -> "FAILED"
+ - FAILED or KILLED -> "FAILED"
+ """
+ state, final_status =
self._query_yarn_application_status(application_id)
+ if state in {"NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED", "RUNNING"}:
+ return state
+ if state == "FINISHED" and final_status == self._YARN_FINAL_SUCCESS:
+ return "SUCCEEDED"
+ return "FAILED"
+
+ def kill_yarn_application(self, application_id: str) -> None:
Review Comment:
Thanks for the repro, it was very very helpful. Rather than making the
public method delegate, I removed it entirely cos it had exactly one caller
(`operator.on_kill()`), which now calls `_kill_yarn_application()` directly.
No public wrapper needed.
Also added `test_on_kill_sends_authenticated_kill_to_yarn_rm` at the operator
level.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]