This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d2b42773 [REST]: Cancelling a completed/failed job should return 
"false" (#1494)
6d2b42773 is described below

commit 6d2b427730ea8c173a8a63841178e26b01173fce
Author: Martin Grigorov <[email protected]>
AuthorDate: Mon Mar 9 19:09:37 2026 +0200

    [REST]: Cancelling a completed/failed job should return "false" (#1494)
---
 ballista/scheduler/src/api/handlers.rs | 59 +++++++++++++++++++++++++++-------
 1 file changed, 48 insertions(+), 11 deletions(-)

diff --git a/ballista/scheduler/src/api/handlers.rs 
b/ballista/scheduler/src/api/handlers.rs
index 3230a8be3..c1bf42bab 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -60,6 +60,8 @@ pub struct JobResponse {
 #[derive(Debug, serde::Serialize)]
 struct CancelJobResponse {
     pub cancelled: bool,
+    #[serde(skip_serializing_if = "Option::is_none")]
+    pub reason: Option<String>,
 }
 
 #[derive(Debug, serde::Serialize)]
@@ -180,24 +182,59 @@ pub async fn cancel_job<
     State(data_server): State<Arc<SchedulerServer<T, U>>>,
     Path(job_id): Path<String>,
 ) -> Result<impl IntoResponse, StatusCode> {
-    // 404 if job doesn't exist
-    data_server
+    // 404 if the job doesn't exist
+    let job_status = data_server
         .state
         .task_manager
         .get_job_status(&job_id)
         .await
-        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
+        .map_err(|err| {
+            tracing::error!("Error getting job status: {err:?}");
+            StatusCode::INTERNAL_SERVER_ERROR
+        })?
         .ok_or(StatusCode::NOT_FOUND)?;
 
-    data_server
-        .query_stage_event_loop
-        .get_sender()
-        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
-        .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
-        .await
-        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
+    match &job_status.status {
+        None | Some(Status::Queued(_)) | Some(Status::Running(_)) => {
+            data_server
+                .query_stage_event_loop
+                .get_sender()
+                .map_err(|err| {
+                    tracing::error!(
+                        "Error getting query stage event loop sender: {err:?}"
+                    );
+                    StatusCode::INTERNAL_SERVER_ERROR
+                })?
+                .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
+                .await
+                .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
 
-    Ok(Json(CancelJobResponse { cancelled: true }))
+            Ok((
+                StatusCode::OK,
+                Json(CancelJobResponse {
+                    cancelled: true,
+                    reason: None,
+                }),
+            )
+                .into_response())
+        }
+        Some(Status::Failed(_)) => Ok((
+            StatusCode::CONFLICT,
+            Json(CancelJobResponse {
+                cancelled: false,
+                reason: Some("The job has failed".into()),
+            }),
+        )
+            .into_response()),
+        Some(Status::Successful(_)) => Ok((
+            StatusCode::CONFLICT,
+            Json(CancelJobResponse {
+                cancelled: false,
+                reason: Some("The job is already completed".into()),
+            }),
+        )
+            .into_response()),
+    }
 }
 
 #[derive(Debug, serde::Serialize)]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to