kaxil commented on code in PR #67118:
URL: https://github.com/apache/airflow/pull/67118#discussion_r3270206465


##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -20,18 +20,37 @@
 from collections.abc import Sequence
 from typing import TYPE_CHECKING, Any
 
+import requests

Review Comment:
   `requests` isn't a declared dependency of the Spark provider -- 
`providers/apache/spark/pyproject.toml` lists only `apache-airflow`, 
`apache-airflow-providers-common-compat`, `pyspark-client`, and 
`grpcio-status`. It resolves today only transitively through core.
   
   Worth adding `"requests>=2.32.0"` explicitly to `dependencies` and 
re-running `prek run update-providers-build-files`.



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,8 +221,63 @@ def execute(self, context: Context) -> None:
             self.conf = 
inject_transport_information_into_spark_properties(self.conf, context)
         if self._hook is None:
             self._hook = self._get_hook()
+        if self._hook._should_track_driver_status:
+            return self.execute_resumable(context)
         self._hook.submit(self.application)
 
+    def submit_job(self, context: Context) -> str:
+        driver_id = self._hook.submit(self.application)
+        if not driver_id:
+            raise RuntimeError("spark-submit did not return a driver ID")
+        self.log.info("Spark driver submitted: %s", driver_id)
+        return driver_id
+
+    def get_job_status(self, external_id: str) -> str:
+        if self._hook._is_yarn:
+            # TODO: call YARN ResourceManager REST API
+            # GET http://rm:8088/ws/v1/cluster/apps/{external_id}
+            raise NotImplementedError("YARN job status not yet implemented")
+        if self._hook._is_kubernetes:
+            # TODO: call K8s pod status API
+            raise NotImplementedError("K8s job status not yet implemented")
+        host = self._hook._connection["master"].replace("spark://", 
"").split(":")[0]
+        response = 
requests.get(f"http://{host}:6066/v1/submissions/status/{external_id}";, 
timeout=30)
+        response.raise_for_status()
+        status = response.json()["driverState"]

Review Comment:
   Spark Standalone REST responses include both `driverState` and `success` 
(the existing hook test fixtures contain `'"success" : true,'`). When the 
submission ID isn't recognised, the master is in recovery, or an auth proxy 
intercepts, the response can have `success: false` with `driverState` missing 
or stale -- meaning `response.json()["driverState"]` raises `KeyError`, or 
worse, returns a stale value the operator then trusts.
   
   Suggest checking `success` first and raising with the response's `message` 
field on failure.



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,8 +221,63 @@ def execute(self, context: Context) -> None:
             self.conf = 
inject_transport_information_into_spark_properties(self.conf, context)
         if self._hook is None:
             self._hook = self._get_hook()
+        if self._hook._should_track_driver_status:
+            return self.execute_resumable(context)
         self._hook.submit(self.application)
 
+    def submit_job(self, context: Context) -> str:
+        driver_id = self._hook.submit(self.application)
+        if not driver_id:
+            raise RuntimeError("spark-submit did not return a driver ID")
+        self.log.info("Spark driver submitted: %s", driver_id)
+        return driver_id
+
+    def get_job_status(self, external_id: str) -> str:
+        if self._hook._is_yarn:
+            # TODO: call YARN ResourceManager REST API
+            # GET http://rm:8088/ws/v1/cluster/apps/{external_id}
+            raise NotImplementedError("YARN job status not yet implemented")
+        if self._hook._is_kubernetes:
+            # TODO: call K8s pod status API
+            raise NotImplementedError("K8s job status not yet implemented")
+        host = self._hook._connection["master"].replace("spark://", 
"").split(":")[0]
+        response = 
requests.get(f"http://{host}:6066/v1/submissions/status/{external_id}";, 
timeout=30)

Review Comment:
   Three concerns on this one line:
   
   1. **No transient retry**: a one-second network blip during the retry-time 
status check will raise out of `get_job_status`, fall through 
`execute_resumable`'s error handling, and burn a retry slot. Worth wrapping in 
`tenacity` or `urllib3.Retry` with a short backoff.
   2. **Hardcoded `http://`**: enterprise deployments with TLS in front of the 
standalone REST API can't use this path. Read the scheme from connection extras.
   3. **Hardcoded port 6066**: Spark's `spark.master.rest.port` is 
configurable. Read it from connection extras (or the master URL if it includes 
a port hint).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to