This is an automated email from the ASF dual-hosted git repository.
milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new a9b06240 feat: Support distributed plan in `EXPLAIN` command (#1309)
a9b06240 is described below
commit a9b06240921e7a60a9e24f1687bbaadaa0a5dff5
Author: danielhumanmod <[email protected]>
AuthorDate: Wed Nov 19 07:26:02 2025 -0800
feat: Support distributed plan in `EXPLAIN` command (#1309)
* support distributed plan in explan
* fix test + lint warning
* add const table explain test
* remove flakey test
---
ballista/client/tests/context_checks.rs | 59 +++++++
ballista/client/tests/context_unsupported.rs | 50 ------
.../scheduler/src/state/distributed_explain.rs | 187 +++++++++++++++++++++
ballista/scheduler/src/state/execution_graph.rs | 2 +-
ballista/scheduler/src/state/mod.rs | 38 ++++-
5 files changed, 284 insertions(+), 52 deletions(-)
diff --git a/ballista/client/tests/context_checks.rs
b/ballista/client/tests/context_checks.rs
index 48895345..a8508f72 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -979,4 +979,63 @@ mod supported {
Ok(())
}
+
+ #[rstest]
+ #[case::standalone(standalone_context())]
+ #[case::remote(remote_context())]
+ #[tokio::test]
+ async fn should_execute_explain_query_correctly(
+ #[future(awt)]
+ #[case]
+ ctx: SessionContext,
+ ) {
+ let result = ctx
+ .sql("EXPLAIN select count(*), id from (select unnest([1,2,3,4,5])
as id) group by id")
+ .await
+ .unwrap()
+ .collect()
+ .await
+ .unwrap();
+
+ let expected: Vec<&str> = vec![
+
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
+ "| plan_type | plan
|",
+
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
+ "| logical_plan | Projection: count(Int64(1)) AS count(*),
id
|",
+ "| | Aggregate: groupBy=[[id]],
aggr=[[count(Int64(1))]]
|",
+ "| | Projection:
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)
AS UNNEST(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5))) AS id |",
+ "| | Unnest:
lists[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))|depth=1]
structs[] |",
+ "| | Projection: List([1, 2, 3, 4,
5]) AS
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))
|",
+ "| | EmptyRelation: rows=1
|",
+ "| physical_plan | ProjectionExec: expr=[count(Int64(1))@1
as count(*), id@0 as id]
|",
+ "| | AggregateExec: mode=FinalPartitioned,
gby=[id@0 as id], aggr=[count(Int64(1))]
|",
+ "| | CoalesceBatchesExec:
target_batch_size=8192
|",
+ "| | RepartitionExec:
partitioning=Hash([id@0], 16), input_partitions=1
|",
+ "| | AggregateExec: mode=Partial,
gby=[id@0 as id], aggr=[count(Int64(1))]
|",
+ "| | ProjectionExec:
expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0
as id] |",
+ "| | UnnestExec
|",
+ "| | ProjectionExec: expr=[[1,
2, 3, 4, 5] as
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))]
|",
+ "| | PlaceholderRowExec
|",
+ "| |
|",
+ "| distributed_plan | =========ResolvedStage[stage_id=1.0,
partitions=1]=========
|",
+ "| | ShuffleWriterExec:
partitioning:Some(Hash([Column { name: \"id\", index: 0 }], 16))
|",
+ "| | AggregateExec: mode=Partial, gby=[id@0
as id], aggr=[count(Int64(1))]
|",
+ "| | ProjectionExec:
expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0
as id] |",
+ "| | UnnestExec
|",
+ "| | ProjectionExec: expr=[[1, 2, 3,
4, 5] as
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))]
|",
+ "| | PlaceholderRowExec
|",
+ "| |
|",
+ "| | =========UnResolvedStage[stage_id=2.0,
children=1]=========
|",
+ "| | Inputs{1: StageOutput {
partition_locations: {}, complete: false }}
|",
+ "| | ShuffleWriterExec: partitioning:None
|",
+ "| | ProjectionExec:
expr=[count(Int64(1))@1 as count(*), id@0 as id]
|",
+ "| | AggregateExec:
mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))]
|",
+ "| | CoalesceBatchesExec:
target_batch_size=8192
|",
+ "| | UnresolvedShuffleExec:
partitioning=Hash([Column { name: \"id\", index: 0 }], 16)
|",
+ "| |
|",
+ "| |
|",
+
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
+ ];
+ assert_batches_eq!(expected, &result);
+ }
}
diff --git a/ballista/client/tests/context_unsupported.rs
b/ballista/client/tests/context_unsupported.rs
index 0b97e20a..7d5c91f8 100644
--- a/ballista/client/tests/context_unsupported.rs
+++ b/ballista/client/tests/context_unsupported.rs
@@ -33,56 +33,6 @@ mod unsupported {
crate::common::example_test_data()
}
- #[rstest]
- #[case::standalone(standalone_context())]
- #[case::remote(remote_context())]
- #[tokio::test]
- #[should_panic]
- async fn should_execute_explain_query_correctly(
- #[future(awt)]
- #[case]
- ctx: SessionContext,
- test_data: String,
- ) {
- ctx.register_parquet(
- "test",
- &format!("{test_data}/alltypes_plain.parquet"),
- Default::default(),
- )
- .await
- .unwrap();
-
- let result = ctx
- .sql("EXPLAIN select count(*), id from test where id > 4 group by
id")
- .await
- .unwrap()
- .collect()
- .await
- .unwrap();
-
- let expected = vec![
-
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
- "| plan_type | plan
|",
-
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
- "| logical_plan | Projection: count(*), test.id
|",
- "| | Aggregate: groupBy=[[test.id]],
aggr=[[count(Int64(1)) AS count(*)]]
|",
- "| | Filter: test.id > Int32(4)
|",
- "| | TableScan: test projection=[id],
partial_filters=[test.id > Int32(4)]
|",
- "| physical_plan | ProjectionExec: expr=[count(*)@1 as count(*),
id@0 as id]
|",
- "| | AggregateExec: mode=FinalPartitioned,
gby=[id@0 as id], aggr=[count(*)]
|",
- "| | CoalesceBatchesExec: target_batch_size=8192
|",
- "| | RepartitionExec:
partitioning=Hash([id@0], 16), input_partitions=1
|",
- "| | AggregateExec: mode=Partial, gby=[id@0
as id], aggr=[count(*)]
|",
- "| | CoalesceBatchesExec:
target_batch_size=8192
|",
- "| | FilterExec: id@0 > 4
|",
- "| | ParquetExec: file_groups={1
group:
[[Users/ballista/git/datafusion-ballista/ballista/client/testdata/alltypes_plain.parquet]]},
projection=[id], predicate=id@0 > 4, pruning_predicate=CASE WHEN
id_null_count@1 = id_row_count@2 THEN false ELSE id_max@0 > 4 END,
required_guarantees=[] |",
- "| |
|",
-
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
- ];
-
- assert_batches_eq!(expected, &result);
- }
-
#[rstest]
#[case::standalone(standalone_context())]
#[case::remote(remote_context())]
diff --git a/ballista/scheduler/src/state/distributed_explain.rs
b/ballista/scheduler/src/state/distributed_explain.rs
new file mode 100644
index 00000000..5aee48f3
--- /dev/null
+++ b/ballista/scheduler/src/state/distributed_explain.rs
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Write as _;
+use std::sync::Arc;
+
+use ballista_core::error::Result;
+use datafusion::arrow::array::{ListArray, ListBuilder, StringBuilder};
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::{ScalarValue, UnnestOptions};
+use datafusion::logical_expr::{LogicalPlan, PlanType, StringifiedPlan};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::expressions::col;
+use datafusion::physical_plan::expressions::lit;
+use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+
+use crate::state::execution_graph::ExecutionStage;
+use crate::{
+ planner::{DefaultDistributedPlanner, DistributedPlanner},
+ state::execution_graph::ExecutionStageBuilder,
+};
+
+pub(crate) async fn generate_distributed_explain_plan(
+ job_id: &str,
+ session_ctx: Arc<SessionContext>,
+ plan: Arc<LogicalPlan>,
+) -> Result<String> {
+ let session_config = Arc::new(session_ctx.copied_config());
+
+ let plan = session_ctx.state().create_physical_plan(&plan).await?;
+
+ let mut planner = DefaultDistributedPlanner::new();
+ let shuffle_stages =
+ planner.plan_query_stages(job_id, plan, session_config.options())?;
+ let builder = ExecutionStageBuilder::new(session_config.clone());
+ let stages = builder.build(shuffle_stages)?;
+
+ Ok(render_stages(stages))
+}
+
+pub(crate) fn extract_logical_and_physical_plans(
+ plans: &[StringifiedPlan],
+) -> (String, String) {
+ let logical_txt = plans
+ .iter()
+ .rev()
+ .find(|p| matches!(p.plan_type, PlanType::FinalAnalyzedLogicalPlan))
+ .or_else(|| plans.first())
+ .map(|p| p.plan.to_string())
+ .unwrap_or("logical plan not available".to_string());
+
+ let physical_txt = plans
+ .iter()
+ .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan))
+ .map(|p| p.plan.to_string())
+ .unwrap_or_else(|| "<physical plan not available>".to_string());
+
+ (logical_txt, physical_txt)
+}
+
+/// Build a distributed explain execution plan that produces a two-column
table:
+///
+/// | plan_type | plan |
+/// |------------------|-----------------|
+/// | logical_plan | logical_txt |
+/// | physical_plan | physical_txt |
+/// | distributed_plan | distributed_txt |
+///
+/// The transformed physical tree looks like:
+/// CoalescePartitionsExec
+/// └─ ProjectionExec: expr=[list_type -> plan_type, list_plan -> plan]
+/// └─ UnnestExec
+/// └─ ProjectionExec: expr=[list_type, list_plan]
+/// └─ PlaceholderRowExec
+pub(crate) fn construct_distributed_explain_exec(
+ logical_txt: String,
+ physical_txt: String,
+ distributed_txt: String,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let place_holder_row: Arc<PlaceholderRowExec> =
+ Arc::new(PlaceholderRowExec::new(Arc::new(Schema::empty())));
+
+ // construct list_type as
["logical_plan","physical_plan","distributed_plan"]
+ let mut type_list_builder = ListBuilder::new(StringBuilder::new());
+ {
+ let vb = type_list_builder.values();
+ vb.append_value("logical_plan");
+ vb.append_value("physical_plan");
+ vb.append_value("distributed_plan");
+ }
+ type_list_builder.append(true);
+ let list_type_array: Arc<ListArray> = Arc::new(type_list_builder.finish());
+
+ // construct list_plan as [<logical_txt>, <physical_txt>,
<distributed_txt>]
+ let mut plan_list_builder = ListBuilder::new(StringBuilder::new());
+ {
+ let vb = plan_list_builder.values();
+ vb.append_value(&logical_txt);
+ vb.append_value(&physical_txt);
+ vb.append_value(&distributed_txt);
+ }
+ plan_list_builder.append(true);
+ let list_plan_array: Arc<ListArray> = Arc::new(plan_list_builder.finish());
+
+ // Project both List literals onto the placeholder row
+ let exprs_lists: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
+ (
+ lit(ScalarValue::List(list_type_array)),
+ "list_type".to_string(),
+ ),
+ (
+ lit(ScalarValue::List(list_plan_array)),
+ "list_plan".to_string(),
+ ),
+ ];
+ let proj_lists = Arc::new(ProjectionExec::try_new(exprs_lists,
place_holder_row)?);
+
+ // Build the Unnest operator to expand the two List columns row-wise
+ let lists = vec![
+ ListUnnest {
+ index_in_input_schema: 0,
+ depth: 1,
+ },
+ ListUnnest {
+ index_in_input_schema: 1,
+ depth: 1,
+ },
+ ];
+ let out_schema = Arc::new(Schema::new(vec![
+ Field::new("list_type", DataType::Utf8, true),
+ Field::new("list_plan", DataType::Utf8, true),
+ ]));
+ let unnest = Arc::new(UnnestExec::new(
+ proj_lists,
+ lists,
+ Vec::new(),
+ out_schema,
+ UnnestOptions::default(),
+ ));
+
+ // Final projection: rename columns to (plan_type, plan)
+ let proj_final = Arc::new(ProjectionExec::try_new(
+ vec![
+ (
+ col("list_type", unnest.schema().as_ref())?,
+ "plan_type".into(),
+ ),
+ (col("list_plan", unnest.schema().as_ref())?, "plan".into()),
+ ],
+ unnest,
+ )?);
+
+ // CoalescePartitionsExec → merge all partitions into one
+ // ensuring deterministic single-partition output.
+ Ok(Arc::new(CoalescePartitionsExec::new(proj_final)) as Arc<dyn
ExecutionPlan>)
+}
+
+fn render_stages(stages: HashMap<usize, ExecutionStage>) -> String {
+ let mut buf = String::new();
+ let mut keys: Vec<_> = stages.keys().cloned().collect();
+ keys.sort();
+ for k in keys {
+ let stage = &stages[&k];
+ writeln!(buf, "{:#?}", stage).ok();
+ }
+ buf
+}
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 16744141..ce2eb717 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1350,7 +1350,7 @@ pub fn create_task_info(executor_id: String, task_id:
usize) -> TaskInfo {
///
/// This will infer the dependency structure for the stages
/// so that we can construct a DAG from the stages.
-struct ExecutionStageBuilder {
+pub(crate) struct ExecutionStageBuilder {
/// Stage ID which is currently being visited
current_stage_id: usize,
/// Map from stage ID -> List of child stage IDs
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 42577488..76139272 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -27,6 +27,10 @@ use std::time::Instant;
use crate::scheduler_server::event::QueryStageSchedulerEvent;
+use crate::state::distributed_explain::{
+ construct_distributed_explain_exec, extract_logical_and_physical_plans,
+ generate_distributed_explain_plan,
+};
use crate::state::executor_manager::ExecutorManager;
use crate::state::session_manager::SessionManager;
use crate::state::task_manager::{TaskLauncher, TaskManager};
@@ -47,6 +51,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
use log::{debug, error, info, warn};
use prost::Message;
+mod distributed_explain;
pub mod execution_graph;
pub mod execution_graph_dot;
pub mod execution_stage;
@@ -363,7 +368,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
debug!("Optimized plan: {}", optimized_plan.display_indent());
}
- plan. apply(&mut |plan: &LogicalPlan| {
+ let mut explain_inner_logical_plan: Option<Arc<LogicalPlan>> = None;
+ plan.apply(&mut |plan: &LogicalPlan| {
if let LogicalPlan::TableScan(scan) = plan {
let provider = source_as_provider(&scan.source)?;
if let Some(table) =
provider.as_any().downcast_ref::<ListingTable>() {
@@ -398,10 +404,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
})?;
}
}
+ } else if let LogicalPlan::Explain(explain_plan) = plan {
+ explain_inner_logical_plan = Some(explain_plan.plan.clone());
}
Ok(TreeNodeRecursion::Continue)
})?;
+ let explain_distributed_plan = if let Some(inner_lp) =
explain_inner_logical_plan
+ {
+ Some(
+ generate_distributed_explain_plan(job_id, session_ctx.clone(),
inner_lp)
+ .await?,
+ )
+ } else {
+ None
+ };
+
let plan = session_ctx.state().create_physical_plan(plan).await?;
debug!(
"Physical plan: {}",
@@ -413,6 +431,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
let empty: Arc<dyn ExecutionPlan> =
Arc::new(EmptyExec::new(node.schema()));
Ok(Transformed::yes(empty))
+ } else if let (Some(explain), Some(explain_distributed_plan)) = (
+ node.as_any()
+
.downcast_ref::<datafusion::physical_plan::explain::ExplainExec>(),
+ &explain_distributed_plan,
+ ) {
+ let plans = explain.stringified_plans();
+ let (logical_txt, physical_txt) =
+ extract_logical_and_physical_plans(plans);
+ let distributed_txt = explain_distributed_plan.clone();
+
+ let replaced: Arc<dyn ExecutionPlan> =
+ construct_distributed_explain_exec(
+ logical_txt,
+ physical_txt,
+ distributed_txt,
+ )
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ Ok(Transformed::yes(replaced))
} else {
Ok(Transformed::no(node))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]