This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 41dfac70a22 Deleting spark job before raising exception "Job took too
long to start" in start_spark_job method (#63824) (#63922)
41dfac70a22 is described below
commit 41dfac70a228441ad25b2c5038d489c3bd0f3410
Author: aalopatin <[email protected]>
AuthorDate: Fri Mar 20 02:16:38 2026 +0300
Deleting spark job before raising exception "Job took too long to start" in
start_spark_job method (#63824) (#63922)
* Deleting spark job before raising exception "Job took too long to start"
in start_spark_job method (#63824)
* The variable has been renamed to a more appropriate name (#63824)
* Add unit test to verify SparkApplication deletion on timeout (#63824)
---
.../cncf/kubernetes/operators/custom_object_launcher.py | 3 +++
.../cncf/kubernetes/operators/test_custom_object_launcher.py | 12 ++++++++++++
2 files changed, 15 insertions(+)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
index bb7453eaf27..bbe507716f9 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
@@ -309,6 +309,9 @@ class CustomObjectLauncher(LoggingMixin):
delta = dt.now() - curr_time
if delta.total_seconds() >= startup_timeout:
pod_status =
self.pod_manager.read_pod(self.pod_spec).status.container_statuses
+ spark_job_name = self.spark_obj_spec["metadata"]["name"]
+ self.log.warning("Deleting spark job: %s", spark_job_name)
+ self.delete_spark_job(spark_job_name=spark_job_name)
raise AirflowException(f"Job took too long to start. pod
status: {pod_status}")
time.sleep(10)
except Exception as e:
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
index 5ae01c0af8c..3496a43b0e3 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
@@ -238,6 +238,18 @@ class TestCustomObjectLauncher:
def test_start_spark_job_no_error(self, mock_pod_manager, mock_launcher):
mock_launcher.start_spark_job()
+ @patch(
+
"airflow.providers.cncf.kubernetes.operators.custom_object_launcher.CustomObjectLauncher.delete_spark_job"
+ )
+ def test_start_spark_job_deletes_on_timeout(self, mock_delete_spark_job,
mock_launcher):
+ mock_launcher.spark_job_not_running = MagicMock(return_value=True)
+ mock_launcher.check_pod_start_failure = MagicMock()
+
+ with pytest.raises(AirflowException):
+ mock_launcher.start_spark_job(startup_timeout=0)
+
+
mock_delete_spark_job.assert_called_once_with(spark_job_name=mock_launcher.name)
+
@patch("airflow.providers.cncf.kubernetes.operators.custom_object_launcher.PodManager")
def test_check_pod_start_failure_no_error(self, mock_pod_manager,
mock_launcher):
mock_pod_manager.return_value.read_pod.return_value.status =
self.get_pod_status("ContainerCreating")