This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 41dfac70a22 Deleting spark job before raising exception "Job took too 
long to start" in start_spark_job method (#63824) (#63922)
41dfac70a22 is described below

commit 41dfac70a228441ad25b2c5038d489c3bd0f3410
Author: aalopatin <[email protected]>
AuthorDate: Fri Mar 20 02:16:38 2026 +0300

    Deleting spark job before raising exception "Job took too long to start" in 
start_spark_job method (#63824) (#63922)
    
    * Deleting spark job before raising exception "Job took too long to start" 
in start_spark_job method (#63824)
    
    * The variable has been renamed to a more appropriate name (#63824)
    
    * Add unit test to verify SparkApplication deletion on timeout (#63824)
---
 .../cncf/kubernetes/operators/custom_object_launcher.py      |  3 +++
 .../cncf/kubernetes/operators/test_custom_object_launcher.py | 12 ++++++++++++
 2 files changed, 15 insertions(+)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
index bb7453eaf27..bbe507716f9 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/custom_object_launcher.py
@@ -309,6 +309,9 @@ class CustomObjectLauncher(LoggingMixin):
                 delta = dt.now() - curr_time
                 if delta.total_seconds() >= startup_timeout:
                     pod_status = 
self.pod_manager.read_pod(self.pod_spec).status.container_statuses
+                    spark_job_name = self.spark_obj_spec["metadata"]["name"]
+                    self.log.warning("Deleting spark job: %s", spark_job_name)
+                    self.delete_spark_job(spark_job_name=spark_job_name)
                     raise AirflowException(f"Job took too long to start. pod 
status: {pod_status}")
                 time.sleep(10)
         except Exception as e:
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
index 5ae01c0af8c..3496a43b0e3 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_custom_object_launcher.py
@@ -238,6 +238,18 @@ class TestCustomObjectLauncher:
     def test_start_spark_job_no_error(self, mock_pod_manager, mock_launcher):
         mock_launcher.start_spark_job()
 
+    @patch(
+        
"airflow.providers.cncf.kubernetes.operators.custom_object_launcher.CustomObjectLauncher.delete_spark_job"
+    )
+    def test_start_spark_job_deletes_on_timeout(self, mock_delete_spark_job, 
mock_launcher):
+        mock_launcher.spark_job_not_running = MagicMock(return_value=True)
+        mock_launcher.check_pod_start_failure = MagicMock()
+
+        with pytest.raises(AirflowException):
+            mock_launcher.start_spark_job(startup_timeout=0)
+
+        
mock_delete_spark_job.assert_called_once_with(spark_job_name=mock_launcher.name)
+
     
@patch("airflow.providers.cncf.kubernetes.operators.custom_object_launcher.PodManager")
     def test_check_pod_start_failure_no_error(self, mock_pod_manager, 
mock_launcher):
         mock_pod_manager.return_value.read_pod.return_value.status = 
self.get_pod_status("ContainerCreating")

Reply via email to