This is an automated email from the ASF dual-hosted git repository.

milenkovicm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new a9b06240 feat: Support distributed plan in `EXPLAIN` command (#1309)
a9b06240 is described below

commit a9b06240921e7a60a9e24f1687bbaadaa0a5dff5
Author: danielhumanmod <[email protected]>
AuthorDate: Wed Nov 19 07:26:02 2025 -0800

    feat: Support distributed plan in `EXPLAIN` command (#1309)
    
    * support distributed plan in explan
    
    * fix test + lint warning
    
    * add const table explain test
    
    * remove flakey test
---
 ballista/client/tests/context_checks.rs            |  59 +++++++
 ballista/client/tests/context_unsupported.rs       |  50 ------
 .../scheduler/src/state/distributed_explain.rs     | 187 +++++++++++++++++++++
 ballista/scheduler/src/state/execution_graph.rs    |   2 +-
 ballista/scheduler/src/state/mod.rs                |  38 ++++-
 5 files changed, 284 insertions(+), 52 deletions(-)

diff --git a/ballista/client/tests/context_checks.rs 
b/ballista/client/tests/context_checks.rs
index 48895345..a8508f72 100644
--- a/ballista/client/tests/context_checks.rs
+++ b/ballista/client/tests/context_checks.rs
@@ -979,4 +979,63 @@ mod supported {
 
         Ok(())
     }
+
+    #[rstest]
+    #[case::standalone(standalone_context())]
+    #[case::remote(remote_context())]
+    #[tokio::test]
+    async fn should_execute_explain_query_correctly(
+        #[future(awt)]
+        #[case]
+        ctx: SessionContext,
+    ) {
+        let result = ctx
+            .sql("EXPLAIN select count(*), id from (select unnest([1,2,3,4,5]) 
as id) group by id")
+            .await
+            .unwrap()
+            .collect()
+            .await
+            .unwrap();
+
+        let expected: Vec<&str> = vec![
+                
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
+                "| plan_type        | plan                                     
                                                                                
                                                        |",
+                
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
+                "| logical_plan     | Projection: count(Int64(1)) AS count(*), 
id                                                                              
                                                        |",
+                "|                  |   Aggregate: groupBy=[[id]], 
aggr=[[count(Int64(1))]]                                                        
                                                                    |",
+                "|                  |     Projection: 
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)
 AS UNNEST(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5))) AS id |",
+                "|                  |       Unnest: 
lists[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))|depth=1]
 structs[]                                                            |",
+                "|                  |         Projection: List([1, 2, 3, 4, 
5]) AS 
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))  
                                                    |",
+                "|                  |           EmptyRelation: rows=1          
                                                                                
                                                        |",
+                "| physical_plan    | ProjectionExec: expr=[count(Int64(1))@1 
as count(*), id@0 as id]                                                        
                                                         |",
+                "|                  |   AggregateExec: mode=FinalPartitioned, 
gby=[id@0 as id], aggr=[count(Int64(1))]                                        
                                                         |",
+                "|                  |     CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                        |",
+                "|                  |       RepartitionExec: 
partitioning=Hash([id@0], 16), input_partitions=1                               
                                                                          |",
+                "|                  |         AggregateExec: mode=Partial, 
gby=[id@0 as id], aggr=[count(Int64(1))]                                        
                                                            |",
+                "|                  |           ProjectionExec: 
expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0
 as id]                                                  |",
+                "|                  |             UnnestExec                   
                                                                                
                                                        |",
+                "|                  |               ProjectionExec: expr=[[1, 
2, 3, 4, 5] as 
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] 
                                          |",
+                "|                  |                 PlaceholderRowExec       
                                                                                
                                                        |",
+                "|                  |                                          
                                                                                
                                                        |",
+                "| distributed_plan | =========ResolvedStage[stage_id=1.0, 
partitions=1]=========                                                          
                                                            |",
+                "|                  | ShuffleWriterExec: 
partitioning:Some(Hash([Column { name: \"id\", index: 0 }], 16))                
                                                                                
|",
+                "|                  |   AggregateExec: mode=Partial, gby=[id@0 
as id], aggr=[count(Int64(1))]                                                  
                                                        |",
+                "|                  |     ProjectionExec: 
expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0
 as id]                                                        |",
+                "|                  |       UnnestExec                         
                                                                                
                                                        |",
+                "|                  |         ProjectionExec: expr=[[1, 2, 3, 
4, 5] as 
__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] 
                                                |",
+                "|                  |           PlaceholderRowExec             
                                                                                
                                                        |",
+                "|                  |                                          
                                                                                
                                                        |",
+                "|                  | =========UnResolvedStage[stage_id=2.0, 
children=1]=========                                                            
                                                          |",
+                "|                  | Inputs{1: StageOutput { 
partition_locations: {}, complete: false }}                                     
                                                                         |",
+                "|                  | ShuffleWriterExec: partitioning:None     
                                                                                
                                                        |",
+                "|                  |   ProjectionExec: 
expr=[count(Int64(1))@1 as count(*), id@0 as id]                                
                                                                               
|",
+                "|                  |     AggregateExec: 
mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))]                 
                                                                              
|",
+                "|                  |       CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                      |",
+                "|                  |         UnresolvedShuffleExec: 
partitioning=Hash([Column { name: \"id\", index: 0 }], 16)                      
                                                                    |",
+                "|                  |                                          
                                                                                
                                                        |",
+                "|                  |                                          
                                                                                
                                                        |",
+                
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
+        ];
+        assert_batches_eq!(expected, &result);
+    }
 }
diff --git a/ballista/client/tests/context_unsupported.rs 
b/ballista/client/tests/context_unsupported.rs
index 0b97e20a..7d5c91f8 100644
--- a/ballista/client/tests/context_unsupported.rs
+++ b/ballista/client/tests/context_unsupported.rs
@@ -33,56 +33,6 @@ mod unsupported {
         crate::common::example_test_data()
     }
 
-    #[rstest]
-    #[case::standalone(standalone_context())]
-    #[case::remote(remote_context())]
-    #[tokio::test]
-    #[should_panic]
-    async fn should_execute_explain_query_correctly(
-        #[future(awt)]
-        #[case]
-        ctx: SessionContext,
-        test_data: String,
-    ) {
-        ctx.register_parquet(
-            "test",
-            &format!("{test_data}/alltypes_plain.parquet"),
-            Default::default(),
-        )
-        .await
-        .unwrap();
-
-        let result = ctx
-            .sql("EXPLAIN select count(*), id from test where id > 4 group by 
id")
-            .await
-            .unwrap()
-            .collect()
-            .await
-            .unwrap();
-
-        let expected = vec![
-            
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
-            "| plan_type     | plan                                            
                                                                                
                                                                                
                                                                                
 |",
-            
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
-            "| logical_plan  | Projection: count(*), test.id                   
                                                                                
                                                                                
                                                                                
 |",
-            "|               |   Aggregate: groupBy=[[test.id]], 
aggr=[[count(Int64(1)) AS count(*)]]                                            
                                                                                
                                                                                
               |",
-            "|               |     Filter: test.id > Int32(4)                  
                                                                                
                                                                                
                                                                                
 |",
-            "|               |       TableScan: test projection=[id], 
partial_filters=[test.id > Int32(4)]                                            
                                                                                
                                                                                
          |",
-            "| physical_plan | ProjectionExec: expr=[count(*)@1 as count(*), 
id@0 as id]                                                                     
                                                                                
                                                                                
   |",
-            "|               |   AggregateExec: mode=FinalPartitioned, 
gby=[id@0 as id], aggr=[count(*)]                                               
                                                                                
                                                                                
         |",
-            "|               |     CoalesceBatchesExec: target_batch_size=8192 
                                                                                
                                                                                
                                                                                
 |",
-            "|               |       RepartitionExec: 
partitioning=Hash([id@0], 16), input_partitions=1                               
                                                                                
                                                                                
                          |",
-            "|               |         AggregateExec: mode=Partial, gby=[id@0 
as id], aggr=[count(*)]                                                         
                                                                                
                                                                                
  |",
-            "|               |           CoalesceBatchesExec: 
target_batch_size=8192                                                          
                                                                                
                                                                                
                  |",
-            "|               |             FilterExec: id@0 > 4                
                                                                                
                                                                                
                                                                                
 |",
-            "|               |               ParquetExec: file_groups={1 
group: 
[[Users/ballista/git/datafusion-ballista/ballista/client/testdata/alltypes_plain.parquet]]},
 projection=[id], predicate=id@0 > 4, pruning_predicate=CASE WHEN 
id_null_count@1 = id_row_count@2 THEN false ELSE id_max@0 > 4 END, 
required_guarantees=[] |",
-            "|               |                                                 
                                                                                
                                                                                
                                                                                
 |",
-            
"+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",
        
-        ];
-
-        assert_batches_eq!(expected, &result);
-    }
-
     #[rstest]
     #[case::standalone(standalone_context())]
     #[case::remote(remote_context())]
diff --git a/ballista/scheduler/src/state/distributed_explain.rs 
b/ballista/scheduler/src/state/distributed_explain.rs
new file mode 100644
index 00000000..5aee48f3
--- /dev/null
+++ b/ballista/scheduler/src/state/distributed_explain.rs
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Write as _;
+use std::sync::Arc;
+
+use ballista_core::error::Result;
+use datafusion::arrow::array::{ListArray, ListBuilder, StringBuilder};
+use datafusion::arrow::datatypes::{DataType, Field, Schema};
+use datafusion::common::{ScalarValue, UnnestOptions};
+use datafusion::logical_expr::{LogicalPlan, PlanType, StringifiedPlan};
+use datafusion::physical_expr::PhysicalExpr;
+use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
+use datafusion::physical_plan::expressions::col;
+use datafusion::physical_plan::expressions::lit;
+use datafusion::physical_plan::placeholder_row::PlaceholderRowExec;
+use datafusion::physical_plan::projection::ProjectionExec;
+use datafusion::physical_plan::unnest::{ListUnnest, UnnestExec};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::SessionContext;
+
+use crate::state::execution_graph::ExecutionStage;
+use crate::{
+    planner::{DefaultDistributedPlanner, DistributedPlanner},
+    state::execution_graph::ExecutionStageBuilder,
+};
+
+pub(crate) async fn generate_distributed_explain_plan(
+    job_id: &str,
+    session_ctx: Arc<SessionContext>,
+    plan: Arc<LogicalPlan>,
+) -> Result<String> {
+    let session_config = Arc::new(session_ctx.copied_config());
+
+    let plan = session_ctx.state().create_physical_plan(&plan).await?;
+
+    let mut planner = DefaultDistributedPlanner::new();
+    let shuffle_stages =
+        planner.plan_query_stages(job_id, plan, session_config.options())?;
+    let builder = ExecutionStageBuilder::new(session_config.clone());
+    let stages = builder.build(shuffle_stages)?;
+
+    Ok(render_stages(stages))
+}
+
+pub(crate) fn extract_logical_and_physical_plans(
+    plans: &[StringifiedPlan],
+) -> (String, String) {
+    let logical_txt = plans
+        .iter()
+        .rev()
+        .find(|p| matches!(p.plan_type, PlanType::FinalAnalyzedLogicalPlan))
+        .or_else(|| plans.first())
+        .map(|p| p.plan.to_string())
+        .unwrap_or("logical plan not available".to_string());
+
+    let physical_txt = plans
+        .iter()
+        .find(|p| matches!(p.plan_type, PlanType::FinalPhysicalPlan))
+        .map(|p| p.plan.to_string())
+        .unwrap_or_else(|| "<physical plan not available>".to_string());
+
+    (logical_txt, physical_txt)
+}
+
+/// Build a distributed explain execution plan that produces a two-column 
table:
+///
+/// | plan_type        | plan            |
+/// |------------------|-----------------|
+/// | logical_plan     | logical_txt     |
+/// | physical_plan    | physical_txt    |
+/// | distributed_plan | distributed_txt |
+///
+/// The transformed physical tree looks like:
+///     CoalescePartitionsExec
+///       └─ ProjectionExec: expr=[list_type -> plan_type, list_plan -> plan]
+///            └─ UnnestExec
+///                 └─ ProjectionExec: expr=[list_type, list_plan]
+///                      └─ PlaceholderRowExec
+pub(crate) fn construct_distributed_explain_exec(
+    logical_txt: String,
+    physical_txt: String,
+    distributed_txt: String,
+) -> Result<Arc<dyn ExecutionPlan>> {
+    let place_holder_row: Arc<PlaceholderRowExec> =
+        Arc::new(PlaceholderRowExec::new(Arc::new(Schema::empty())));
+
+    // construct list_type as 
["logical_plan","physical_plan","distributed_plan"]
+    let mut type_list_builder = ListBuilder::new(StringBuilder::new());
+    {
+        let vb = type_list_builder.values();
+        vb.append_value("logical_plan");
+        vb.append_value("physical_plan");
+        vb.append_value("distributed_plan");
+    }
+    type_list_builder.append(true);
+    let list_type_array: Arc<ListArray> = Arc::new(type_list_builder.finish());
+
+    // construct list_plan as [<logical_txt>, <physical_txt>, 
<distributed_txt>]
+    let mut plan_list_builder = ListBuilder::new(StringBuilder::new());
+    {
+        let vb = plan_list_builder.values();
+        vb.append_value(&logical_txt);
+        vb.append_value(&physical_txt);
+        vb.append_value(&distributed_txt);
+    }
+    plan_list_builder.append(true);
+    let list_plan_array: Arc<ListArray> = Arc::new(plan_list_builder.finish());
+
+    // Project both List literals onto the placeholder row
+    let exprs_lists: Vec<(Arc<dyn PhysicalExpr>, String)> = vec![
+        (
+            lit(ScalarValue::List(list_type_array)),
+            "list_type".to_string(),
+        ),
+        (
+            lit(ScalarValue::List(list_plan_array)),
+            "list_plan".to_string(),
+        ),
+    ];
+    let proj_lists = Arc::new(ProjectionExec::try_new(exprs_lists, 
place_holder_row)?);
+
+    // Build the Unnest operator to expand the two List columns row-wise
+    let lists = vec![
+        ListUnnest {
+            index_in_input_schema: 0,
+            depth: 1,
+        },
+        ListUnnest {
+            index_in_input_schema: 1,
+            depth: 1,
+        },
+    ];
+    let out_schema = Arc::new(Schema::new(vec![
+        Field::new("list_type", DataType::Utf8, true),
+        Field::new("list_plan", DataType::Utf8, true),
+    ]));
+    let unnest = Arc::new(UnnestExec::new(
+        proj_lists,
+        lists,
+        Vec::new(),
+        out_schema,
+        UnnestOptions::default(),
+    ));
+
+    // Final projection: rename columns to (plan_type, plan)
+    let proj_final = Arc::new(ProjectionExec::try_new(
+        vec![
+            (
+                col("list_type", unnest.schema().as_ref())?,
+                "plan_type".into(),
+            ),
+            (col("list_plan", unnest.schema().as_ref())?, "plan".into()),
+        ],
+        unnest,
+    )?);
+
+    // CoalescePartitionsExec → merge all partitions into one
+    // ensuring deterministic single-partition output.
+    Ok(Arc::new(CoalescePartitionsExec::new(proj_final)) as Arc<dyn 
ExecutionPlan>)
+}
+
+fn render_stages(stages: HashMap<usize, ExecutionStage>) -> String {
+    let mut buf = String::new();
+    let mut keys: Vec<_> = stages.keys().cloned().collect();
+    keys.sort();
+    for k in keys {
+        let stage = &stages[&k];
+        writeln!(buf, "{:#?}", stage).ok();
+    }
+    buf
+}
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 16744141..ce2eb717 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1350,7 +1350,7 @@ pub fn create_task_info(executor_id: String, task_id: 
usize) -> TaskInfo {
 ///
 /// This will infer the dependency structure for the stages
 /// so that we can construct a DAG from the stages.
-struct ExecutionStageBuilder {
+pub(crate) struct ExecutionStageBuilder {
     /// Stage ID which is currently being visited
     current_stage_id: usize,
     /// Map from stage ID -> List of child stage IDs
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 42577488..76139272 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -27,6 +27,10 @@ use std::time::Instant;
 
 use crate::scheduler_server::event::QueryStageSchedulerEvent;
 
+use crate::state::distributed_explain::{
+    construct_distributed_explain_exec, extract_logical_and_physical_plans,
+    generate_distributed_explain_plan,
+};
 use crate::state::executor_manager::ExecutorManager;
 use crate::state::session_manager::SessionManager;
 use crate::state::task_manager::{TaskLauncher, TaskManager};
@@ -47,6 +51,7 @@ use datafusion_proto::physical_plan::AsExecutionPlan;
 use log::{debug, error, info, warn};
 use prost::Message;
 
+mod distributed_explain;
 pub mod execution_graph;
 pub mod execution_graph_dot;
 pub mod execution_stage;
@@ -363,7 +368,8 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             debug!("Optimized plan: {}", optimized_plan.display_indent());
         }
 
-        plan. apply(&mut |plan: &LogicalPlan| {
+        let mut explain_inner_logical_plan: Option<Arc<LogicalPlan>> = None;
+        plan.apply(&mut |plan: &LogicalPlan| {
             if let LogicalPlan::TableScan(scan) = plan {
                 let provider = source_as_provider(&scan.source)?;
                 if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>() {
@@ -398,10 +404,22 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                             })?;
                     }
                 }
+            } else if let LogicalPlan::Explain(explain_plan) = plan {
+                explain_inner_logical_plan = Some(explain_plan.plan.clone());
             }
             Ok(TreeNodeRecursion::Continue)
         })?;
 
+        let explain_distributed_plan = if let Some(inner_lp) = 
explain_inner_logical_plan
+        {
+            Some(
+                generate_distributed_explain_plan(job_id, session_ctx.clone(), 
inner_lp)
+                    .await?,
+            )
+        } else {
+            None
+        };
+
         let plan = session_ctx.state().create_physical_plan(plan).await?;
         debug!(
             "Physical plan: {}",
@@ -413,6 +431,24 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                 let empty: Arc<dyn ExecutionPlan> =
                     Arc::new(EmptyExec::new(node.schema()));
                 Ok(Transformed::yes(empty))
+            } else if let (Some(explain), Some(explain_distributed_plan)) = (
+                node.as_any()
+                    
.downcast_ref::<datafusion::physical_plan::explain::ExplainExec>(),
+                &explain_distributed_plan,
+            ) {
+                let plans = explain.stringified_plans();
+                let (logical_txt, physical_txt) =
+                    extract_logical_and_physical_plans(plans);
+                let distributed_txt = explain_distributed_plan.clone();
+
+                let replaced: Arc<dyn ExecutionPlan> =
+                    construct_distributed_explain_exec(
+                        logical_txt,
+                        physical_txt,
+                        distributed_txt,
+                    )
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+                Ok(Transformed::yes(replaced))
             } else {
                 Ok(Transformed::no(node))
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to