This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 6d2b42773 [REST]: Cancelling a completed/failed job should return
"false" (#1494)
6d2b42773 is described below
commit 6d2b427730ea8c173a8a63841178e26b01173fce
Author: Martin Grigorov <[email protected]>
AuthorDate: Mon Mar 9 19:09:37 2026 +0200
[REST]: Cancelling a completed/failed job should return "false" (#1494)
---
ballista/scheduler/src/api/handlers.rs | 59 +++++++++++++++++++++++++++-------
1 file changed, 48 insertions(+), 11 deletions(-)
diff --git a/ballista/scheduler/src/api/handlers.rs
b/ballista/scheduler/src/api/handlers.rs
index 3230a8be3..c1bf42bab 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -60,6 +60,8 @@ pub struct JobResponse {
#[derive(Debug, serde::Serialize)]
struct CancelJobResponse {
pub cancelled: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub reason: Option<String>,
}
#[derive(Debug, serde::Serialize)]
@@ -180,24 +182,59 @@ pub async fn cancel_job<
State(data_server): State<Arc<SchedulerServer<T, U>>>,
Path(job_id): Path<String>,
) -> Result<impl IntoResponse, StatusCode> {
- // 404 if job doesn't exist
- data_server
+ // 404 if the job doesn't exist
+ let job_status = data_server
.state
.task_manager
.get_job_status(&job_id)
.await
- .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
+ .map_err(|err| {
+ tracing::error!("Error getting job status: {err:?}");
+ StatusCode::INTERNAL_SERVER_ERROR
+ })?
.ok_or(StatusCode::NOT_FOUND)?;
- data_server
- .query_stage_event_loop
- .get_sender()
- .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
- .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
- .await
- .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
+ match &job_status.status {
+ None | Some(Status::Queued(_)) | Some(Status::Running(_)) => {
+ data_server
+ .query_stage_event_loop
+ .get_sender()
+ .map_err(|err| {
+ tracing::error!(
+ "Error getting query stage event loop sender: {err:?}"
+ );
+ StatusCode::INTERNAL_SERVER_ERROR
+ })?
+ .post_event(QueryStageSchedulerEvent::JobCancel(job_id))
+ .await
+ .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
- Ok(Json(CancelJobResponse { cancelled: true }))
+ Ok((
+ StatusCode::OK,
+ Json(CancelJobResponse {
+ cancelled: true,
+ reason: None,
+ }),
+ )
+ .into_response())
+ }
+ Some(Status::Failed(_)) => Ok((
+ StatusCode::CONFLICT,
+ Json(CancelJobResponse {
+ cancelled: false,
+ reason: Some("The job has failed".into()),
+ }),
+ )
+ .into_response()),
+ Some(Status::Successful(_)) => Ok((
+ StatusCode::CONFLICT,
+ Json(CancelJobResponse {
+ cancelled: false,
+ reason: Some("The job is already completed".into()),
+ }),
+ )
+ .into_response()),
+ }
}
#[derive(Debug, serde::Serialize)]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]