This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 0516616f1 feat: Expose Logical and Physical plan details in the REST
API (#1498)
0516616f1 is described below
commit 0516616f1034e5aab6655aa7651bf9fd5aae25cd
Author: Marko Milenković <[email protected]>
AuthorDate: Wed Mar 11 21:40:28 2026 +0000
feat: Expose Logical and Physical plan details in the REST API (#1498)
---
.gitignore | 1 +
ballista/scheduler/src/api/handlers.rs | 118 +++++++++++++++------
ballista/scheduler/src/api/mod.rs | 6 +-
ballista/scheduler/src/state/aqe/mod.rs | 16 +++
ballista/scheduler/src/state/execution_graph.rs | 22 ++++
.../scheduler/src/state/execution_graph_dot.rs | 4 +
ballista/scheduler/src/state/mod.rs | 7 ++
ballista/scheduler/src/state/task_manager.rs | 6 ++
ballista/scheduler/src/test_utils.rs | 12 +++
9 files changed, 160 insertions(+), 32 deletions(-)
diff --git a/.gitignore b/.gitignore
index 50983b431..1e47926b2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -108,3 +108,4 @@ logs/
# Claude Code guidance file (local only)
CLAUDE.md
+.claude/
\ No newline at end of file
diff --git a/ballista/scheduler/src/api/handlers.rs
b/ballista/scheduler/src/api/handlers.rs
index de1546c59..a7ade706f 100644
--- a/ballista/scheduler/src/api/handlers.rs
+++ b/ballista/scheduler/src/api/handlers.rs
@@ -56,6 +56,12 @@ pub struct JobResponse {
pub num_stages: usize,
pub completed_stages: usize,
pub percent_complete: u8,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub logical_plan: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub physical_plan: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub stage_plan: Option<String>,
}
#[derive(Debug, serde::Serialize)]
@@ -129,36 +135,8 @@ pub async fn get_jobs<
let jobs: Vec<JobResponse> = jobs
.iter()
.map(|job| {
- let status = &job.status;
- let (plain_status, job_status) = match &status.status {
- Some(Status::Queued(_)) => ("Queued".to_string(),
"Queued".to_string()),
- Some(Status::Running(_)) => ("Running".to_string(),
"Running".to_string()),
- Some(Status::Failed(error)) => ("Failed".to_string(),
format!("Failed: {}", error.error)),
- Some(Status::Successful(completed)) => {
- let num_rows = completed
- .partition_location
- .iter()
- .map(|p| {
- p.partition_stats.as_ref().map(|s|
s.num_rows).unwrap_or(0)
- })
- .sum::<i64>();
- let num_rows_term = if num_rows == 1 { "row" } else {
"rows" };
- let num_partitions = completed.partition_location.len();
- let num_partitions_term = if num_partitions == 1 {
- "partition"
- } else {
- "partitions"
- };
- ("Completed".to_string(),
- format!(
- "Completed. Produced {} {} containing {} {}. Elapsed
time: {} ms.",
- num_partitions, num_partitions_term, num_rows,
num_rows_term,
- job.end_time - job.start_time
- )
- )
- }
- _ => ("Invalid".to_string(), "Invalid State".to_string()),
- };
+ let (plain_status, job_status) =
+ format_job_status(&job.status.status, job.end_time -
job.start_time);
// calculate progress based on completed stages for now, but we
could use completed
// tasks in the future to make this more accurate
@@ -172,6 +150,9 @@ pub async fn get_jobs<
num_stages: job.num_stages,
completed_stages: job.completed_stages,
percent_complete,
+ logical_plan: None,
+ physical_plan: None,
+ stage_plan: None,
}
})
.collect();
@@ -179,6 +160,47 @@ pub async fn get_jobs<
Ok(Json(jobs))
}
+pub async fn get_job<
+ T: AsLogicalPlan + Clone + Send + Sync + 'static,
+ U: AsExecutionPlan + Send + Sync + 'static,
+>(
+ State(data_server): State<Arc<SchedulerServer<T, U>>>,
+ Path(job_id): Path<String>,
+) -> Result<impl IntoResponse, StatusCode> {
+ let graph = data_server
+ .state
+ .task_manager
+ .get_job_execution_graph(&job_id)
+ .await
+ .map_err(|err| {
+ tracing::error!("Error occurred while getting the execution graph
for job '{job_id}': {err:?}");
+ StatusCode::INTERNAL_SERVER_ERROR
+ })?
+ .ok_or(StatusCode::NOT_FOUND)?;
+ let stage_plan = format!("{:?}", graph);
+ let job = graph.as_ref();
+ let (plain_status, job_status) =
+ format_job_status(&job.status().status, job.end_time() -
job.start_time());
+
+ let num_stages = job.stage_count();
+ let completed_stages = job.completed_stages();
+ let percent_complete =
+ ((completed_stages as f32 / num_stages as f32) * 100_f32) as u8;
+
+ Ok(Json(JobResponse {
+ job_id: job.job_id().to_string(),
+ job_name: job.job_name().to_string(),
+ job_status,
+ status: plain_status,
+ num_stages,
+ completed_stages,
+ percent_complete,
+ logical_plan: job.logical_plan().map(str::to_owned),
+ physical_plan: job.physical_plan().map(str::to_owned),
+ stage_plan: Some(stage_plan),
+ }))
+}
+
pub async fn cancel_job<
T: AsLogicalPlan + Clone + Send + Sync + 'static,
U: AsExecutionPlan + Send + Sync + 'static,
@@ -314,6 +336,42 @@ pub async fn get_query_stages<
}
}
+fn format_job_status(status: &Option<Status>, elapsed_ms: u64) -> (String,
String) {
+ match status {
+ Some(Status::Queued(_)) => ("Queued".to_string(),
"Queued".to_string()),
+ Some(Status::Running(_)) => ("Running".to_string(),
"Running".to_string()),
+ Some(Status::Failed(error)) => {
+ ("Failed".to_string(), format!("Failed: {}", error.error))
+ }
+ Some(Status::Successful(completed)) => {
+ let num_rows = completed
+ .partition_location
+ .iter()
+ .map(|p| p.partition_stats.as_ref().map(|s|
s.num_rows).unwrap_or(0))
+ .sum::<i64>();
+ let num_rows_term = if num_rows == 1 { "row" } else { "rows" };
+ let num_partitions = completed.partition_location.len();
+ let num_partitions_term = if num_partitions == 1 {
+ "partition"
+ } else {
+ "partitions"
+ };
+ (
+ "Completed".to_string(),
+ format!(
+ "Completed. Produced {} {} containing {} {}. Elapsed time:
{} ms.",
+ num_partitions,
+ num_partitions_term,
+ num_rows,
+ num_rows_term,
+ elapsed_ms
+ ),
+ )
+ }
+ _ => ("Invalid".to_string(), "Invalid State".to_string()),
+ }
+}
+
fn get_elapsed_compute_nanos(metrics: &[MetricsSet]) -> String {
let nanos: usize = metrics
.iter()
diff --git a/ballista/scheduler/src/api/mod.rs
b/ballista/scheduler/src/api/mod.rs
index 733a60f8c..2662e3eea 100644
--- a/ballista/scheduler/src/api/mod.rs
+++ b/ballista/scheduler/src/api/mod.rs
@@ -13,7 +13,6 @@
mod handlers;
use crate::scheduler_server::SchedulerServer;
-use axum::routing::patch;
use axum::{Router, routing::get};
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
@@ -30,7 +29,10 @@ pub fn get_routes<
.route("/api/state", get(handlers::get_scheduler_state::<T, U>))
.route("/api/executors", get(handlers::get_executors::<T, U>))
.route("/api/jobs", get(handlers::get_jobs::<T, U>))
- .route("/api/job/{job_id}", patch(handlers::cancel_job::<T, U>))
+ .route(
+ "/api/job/{job_id}",
+ get(handlers::get_job::<T, U>).patch(handlers::cancel_job::<T, U>),
+ )
.route(
"/api/job/{job_id}/stages",
get(handlers::get_query_stages::<T, U>),
diff --git a/ballista/scheduler/src/state/aqe/mod.rs
b/ballista/scheduler/src/state/aqe/mod.rs
index 718902bc3..32725097a 100644
--- a/ballista/scheduler/src/state/aqe/mod.rs
+++ b/ballista/scheduler/src/state/aqe/mod.rs
@@ -109,6 +109,10 @@ pub(crate) struct AdaptiveExecutionGraph {
failed_stage_attempts: HashMap<usize, HashSet<usize>>,
/// Session config for this job
session_config: Arc<SessionConfig>,
+ /// Logical plan as a human-readable string, captured at submission time.
+ logical_plan: Option<String>,
+ /// Physical plan as a human-readable string, captured at submission time.
+ physical_plan: Option<String>,
}
impl AdaptiveExecutionGraph {
@@ -125,6 +129,8 @@ impl AdaptiveExecutionGraph {
plan: Arc<dyn ExecutionPlan>,
queued_at: u64,
session_config: Arc<SessionConfig>,
+ logical_plan: Option<String>,
+ physical_plan: Option<String>,
) -> ballista_core::error::Result<Self> {
let mut planner =
AdaptivePlanner::try_new(&session_config, plan,
job_name.to_owned())?;
@@ -178,6 +184,8 @@ impl AdaptiveExecutionGraph {
task_id_gen: 0,
failed_stage_attempts: HashMap::new(),
session_config,
+ logical_plan,
+ physical_plan,
})
}
}
@@ -503,6 +511,14 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
&self.status
}
+ fn logical_plan(&self) -> Option<&str> {
+ self.logical_plan.as_deref()
+ }
+
+ fn physical_plan(&self) -> Option<&str> {
+ self.physical_plan.as_deref()
+ }
+
fn start_time(&self) -> u64 {
self.start_time
}
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 27a325479..c443918ef 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -112,6 +112,12 @@ pub trait ExecutionGraph: Debug {
/// Returns the current job status.
fn status(&self) -> &JobStatus;
+ /// Returns the logical plan as a string, if captured at submission time.
+ fn logical_plan(&self) -> Option<&str>;
+
+ /// Returns the physical plan as a string, if captured at submission time.
+ fn physical_plan(&self) -> Option<&str>;
+
/// Returns the timestamp when this job started execution.
fn start_time(&self) -> u64;
@@ -263,6 +269,10 @@ pub struct StaticExecutionGraph {
failed_stage_attempts: HashMap<usize, HashSet<usize>>,
/// Session config for this job
session_config: Arc<SessionConfig>,
+ /// Logical plan as a human-readable string, captured at submission time.
+ logical_plan: Option<String>,
+ /// Physical plan as a human-readable string, captured at submission time.
+ physical_plan: Option<String>,
}
/// Information about a currently running task.
@@ -298,6 +308,8 @@ impl StaticExecutionGraph {
queued_at: u64,
session_config: Arc<SessionConfig>,
planner: &mut dyn DistributedPlanner,
+ logical_plan: Option<String>,
+ physical_plan: Option<String>,
) -> Result<Self> {
let shuffle_stages =
planner.plan_query_stages(job_id, plan, session_config.options())?;
@@ -330,6 +342,8 @@ impl StaticExecutionGraph {
task_id_gen: 0,
failed_stage_attempts: HashMap::new(),
session_config,
+ logical_plan,
+ physical_plan,
})
}
@@ -635,6 +649,14 @@ impl ExecutionGraph for StaticExecutionGraph {
&self.status
}
+ fn logical_plan(&self) -> Option<&str> {
+ self.logical_plan.as_deref()
+ }
+
+ fn physical_plan(&self) -> Option<&str> {
+ self.physical_plan.as_deref()
+ }
+
fn start_time(&self) -> u64 {
self.start_time
}
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index e08fd663f..28f29711a 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -612,6 +612,8 @@ filter_expr="]
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
}
@@ -648,6 +650,8 @@ filter_expr="]
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
}
}
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 82fa60c97..663494336 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -443,11 +443,16 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
None
};
+ let logical_plan_str = plan.display_indent().to_string();
+
let plan = session_ctx.state().create_physical_plan(plan).await?;
debug!(
"Physical plan: {}",
DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
+ let physical_plan_str = DisplayableExecutionPlan::new(plan.as_ref())
+ .indent(false)
+ .to_string();
let plan = plan.transform_down(&|node: Arc<dyn ExecutionPlan>| {
if node.output_partitioning().partition_count() == 0 {
@@ -490,6 +495,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
queued_at,
session_config,
subscriber,
+ Some(logical_plan_str),
+ Some(physical_plan_str),
)
.await?;
diff --git a/ballista/scheduler/src/state/task_manager.rs
b/ballista/scheduler/src/state/task_manager.rs
index b55806407..7a10de451 100644
--- a/ballista/scheduler/src/state/task_manager.rs
+++ b/ballista/scheduler/src/state/task_manager.rs
@@ -278,6 +278,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
queued_at: u64,
session_config: Arc<SessionConfig>,
subscriber: Option<JobStatusSubscriber>,
+ logical_plan: Option<String>,
+ physical_plan: Option<String>,
) -> Result<()> {
let mut planner = DefaultDistributedPlanner::new();
@@ -294,6 +296,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
plan,
queued_at,
session_config,
+ logical_plan,
+ physical_plan,
)?) as ExecutionGraphBox
} else {
debug!("Using static query planner for job planning");
@@ -306,6 +310,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskManager<T, U>
queued_at,
session_config,
&mut planner,
+ logical_plan,
+ physical_plan,
)?) as ExecutionGraphBox
};
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index 1d4f3633f..80837a0fa 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -920,6 +920,8 @@ pub async fn test_aggregation_plan_with_job_id(
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
.unwrap()
}
@@ -968,6 +970,8 @@ pub async fn test_two_aggregations_plan(partition: usize)
-> StaticExecutionGrap
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
.unwrap()
}
@@ -1008,6 +1012,8 @@ pub async fn test_coalesce_plan(partition: usize) ->
StaticExecutionGraph {
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
.unwrap()
}
@@ -1068,6 +1074,8 @@ pub async fn test_join_plan(partition: usize) ->
StaticExecutionGraph {
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
.unwrap();
@@ -1110,6 +1118,8 @@ pub async fn test_union_all_plan(partition: usize) ->
StaticExecutionGraph {
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
.unwrap();
@@ -1152,6 +1162,8 @@ pub async fn test_union_plan(partition: usize) ->
StaticExecutionGraph {
0,
Arc::new(SessionConfig::new_with_ballista()),
&mut planner,
+ None,
+ None,
)
.unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]