kaxil commented on code in PR #67118:
URL: https://github.com/apache/airflow/pull/67118#discussion_r3270202986
##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,8 +221,63 @@ def execute(self, context: Context) -> None:
self.conf =
inject_transport_information_into_spark_properties(self.conf, context)
if self._hook is None:
self._hook = self._get_hook()
+ if self._hook._should_track_driver_status:
+ return self.execute_resumable(context)
self._hook.submit(self.application)
+ def submit_job(self, context: Context) -> str:
+ driver_id = self._hook.submit(self.application)
+ if not driver_id:
+ raise RuntimeError("spark-submit did not return a driver ID")
+ self.log.info("Spark driver submitted: %s", driver_id)
+ return driver_id
+
+ def get_job_status(self, external_id: str) -> str:
+ if self._hook._is_yarn:
+ # TODO: call YARN ResourceManager REST API
+ # GET http://rm:8088/ws/v1/cluster/apps/{external_id}
+ raise NotImplementedError("YARN job status not yet implemented")
+ if self._hook._is_kubernetes:
+ # TODO: call K8s pod status API
+ raise NotImplementedError("K8s job status not yet implemented")
+ host = self._hook._connection["master"].replace("spark://",
"").split(":")[0]
+ response =
requests.get(f"http://{host}:6066/v1/submissions/status/{external_id}",
timeout=30)
+ response.raise_for_status()
+ status = response.json()["driverState"]
+ self.log.info("Driver %s status: %s", external_id, status)
+ return status
+
+ def is_job_active(self, status: str) -> bool:
+ if self._hook._is_yarn:
+ #
https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html
+ return status in ("NEW", "NEW_SAVING", "SUBMITTED", "ACCEPTED",
"RUNNING")
+ if self._hook._is_kubernetes:
+ return status in ("Pending", "Running")
+ return status in ("SUBMITTED", "RUNNING")
Review Comment:
`RELAUNCHING` is missing from the active-status list. Standalone Spark has a
real `RELAUNCHING` driver state (the hook calls it out in
`_handle_spark_status_request` as "Exited non-zero or due to worker failure,
but has not yet started running again"), and the existing in-process polling
loop in `_start_driver_status_tracking` treats it as active by polling while
status is not in `[FINISHED, UNKNOWN, KILLED, FAILED, ERROR]`.
If a Spark worker fails and the driver supervisor relaunches the driver
right when the Airflow worker is restarting, retry will read
`driverState=RELAUNCHING`, fall into the "terminal failure" branch in
`execute_resumable`, and resubmit a duplicate driver -- exactly the scenario
this PR aims to prevent.
Suggest: `return status in ("SUBMITTED", "RUNNING", "RELAUNCHING")`.
Also worth a note on `UNKNOWN`: the hook treats it as terminal, but Spark's
standalone docs describe it as transient (e.g. during master recovery). Today
the standalone `UNKNOWN` path would resubmit; if that's intentional, would be
good to document.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]