This is an automated email from the ASF dual-hosted git repository.

nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 169ffa5e Upgrade DataFusion to 27.0.0 (#834)
169ffa5e is described below

commit 169ffa5eff5e41721053201c28f2f591a3614822
Author: r.4ntix <[email protected]>
AuthorDate: Thu Jul 13 09:53:42 2023 +0800

    Upgrade DataFusion to 27.0.0 (#834)
    
    * Upgrade DataFusion to 27.0.0
    
    * Fix optimizer rule 'push_down_projection' failed
    
    * Fix tests error of bencmarks
    
    * Fix physical plan encode/decode error
---
 Cargo.toml                                         |  33 +++---
 ballista/client/README.md                          |   4 +-
 .../core/src/execution_plans/distributed_query.rs  |  37 ++++---
 .../core/src/execution_plans/shuffle_reader.rs     |  28 ++---
 .../core/src/execution_plans/shuffle_writer.rs     |  37 ++++---
 .../core/src/execution_plans/unresolved_shuffle.rs |  29 ++---
 ballista/core/src/serde/scheduler/from_proto.rs    |  14 ++-
 ballista/core/src/serde/scheduler/mod.rs           |  13 ++-
 ballista/core/src/utils.rs                         |   2 +-
 ballista/executor/src/collect.rs                   |  29 ++---
 ballista/executor/src/execution_loop.rs            |   5 +
 ballista/executor/src/executor.rs                  |  23 +++-
 ballista/executor/src/executor_server.rs           |   3 +
 ballista/scheduler/src/flight_sql.rs               | 122 ++++++++++++++++++---
 ballista/scheduler/src/planner.rs                  |   4 +-
 ballista/scheduler/src/state/execution_graph.rs    |   2 +-
 .../src/state/execution_graph/execution_stage.rs   |   8 +-
 .../scheduler/src/state/execution_graph_dot.rs     |   6 +-
 ballista/scheduler/src/state/mod.rs                |   2 +-
 ballista/scheduler/src/test_utils.rs               |  25 ++++-
 benchmarks/src/bin/tpch.rs                         |   8 +-
 21 files changed, 306 insertions(+), 128 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c47dfdce..42f356f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,21 +19,21 @@
 members = ["ballista-cli", "ballista/client", "ballista/core", 
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
 
 [workspace.dependencies]
-arrow = { version = "40.0.0" }
-arrow-flight = { version = "40.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "40.0.0", default-features = false }
-configure_me = { version = "0.4.0" }
-configure_me_codegen = { version = "0.4.4" }
-datafusion = "26.0.0"
-datafusion-cli = "26.0.0"
-datafusion-proto = "26.0.0"
-object_store = "0.5.6"
-sqlparser = "0.34.0"
-tonic = { version = "0.9" }
-tonic-build = { version = "0.9", default-features = false, features = 
["transport", "prost"] }
+arrow = {version = "43.0.0"}
+arrow-flight = {version = "43.0.0", features = ["flight-sql-experimental"]}
+arrow-schema = {version = "43.0.0", default-features = false}
+configure_me = {version = "0.4.0"}
+configure_me_codegen = {version = "0.4.4"}
+datafusion = "27.0.0"
+datafusion-cli = "27.0.0"
+datafusion-proto = "27.0.0"
+object_store = "0.6.1"
+sqlparser = "0.35.0"
+tonic = {version = "0.9"}
+tonic-build = {version = "0.9", default-features = false, features = 
["transport", "prost"]}
 tracing = "0.1.36"
 tracing-appender = "0.2.2"
-tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
+tracing-subscriber = {version = "0.3.15", features = ["env-filter"]}
 
 # cargo build --profile release-lto
 [profile.release-lto]
@@ -54,3 +54,10 @@ opt-level = 3
 overflow-checks = false
 panic = 'unwind'
 rpath = false
+
+[patch.crates-io]
+# TODO remove on upgrade to DataFusion 28.0.0
+# fix for https://github.com/apache/arrow-datafusion/issues/6819 and 
https://github.com/apache/arrow-datafusion/issues/6898
+datafusion = {git = "https://github.com/apache/arrow-datafusion.git";, rev = 
"4e2a72f6c7109d40a4986e3d05360524be078dd4"}
+datafusion-cli = {git = "https://github.com/apache/arrow-datafusion.git";, rev 
= "4e2a72f6c7109d40a4986e3d05360524be078dd4"}
+datafusion-proto = {git = "https://github.com/apache/arrow-datafusion.git";, 
rev = "4e2a72f6c7109d40a4986e3d05360524be078dd4"}
diff --git a/ballista/client/README.md b/ballista/client/README.md
index 26cd90cc..86238a62 100644
--- a/ballista/client/README.md
+++ b/ballista/client/README.md
@@ -84,8 +84,8 @@ To build a simple ballista example, add the following 
dependencies to your `Carg
 
 ```toml
 [dependencies]
-ballista = "0.10"
-datafusion = "21.0.0"
+ballista = "0.11"
+datafusion = "27.0.0"
 tokio = "1.0"
 ```
 
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index a7d42b82..ccb26206 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -33,7 +33,8 @@ use datafusion::logical_expr::LogicalPlan;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
+    Statistics,
 };
 use datafusion_proto::logical_plan::{
     AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -119,6 +120,24 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
     }
 }
 
+impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "DistributedQueryExec: scheduler_url={}",
+                    self.scheduler_url
+                )
+            }
+        }
+    }
+}
+
 impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
     fn as_any(&self) -> &dyn Any {
         self
@@ -191,22 +210,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(
-                    f,
-                    "DistributedQueryExec: scheduler_url={}",
-                    self.scheduler_url
-                )
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // This execution plan sends the logical plan to the scheduler without
         // performing the node by node conversion to a full physical plan.
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index f80002cb..fa3f9f69 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -37,7 +37,7 @@ use datafusion::error::Result;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
 use futures::{Stream, StreamExt, TryStreamExt};
@@ -83,6 +83,20 @@ impl ShuffleReaderExec {
     }
 }
 
+impl DisplayAs for ShuffleReaderExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ShuffleReaderExec: partitions={}", 
self.partition.len())
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ShuffleReaderExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -154,18 +168,6 @@ impl ExecutionPlan for ShuffleReaderExec {
         Ok(Box::pin(result))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(f, "ShuffleReaderExec: partitions={}", 
self.partition.len())
-            }
-        }
-    }
-
     fn metrics(&self) -> Option<MetricsSet> {
         Some(self.metrics.clone_inner())
     }
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 742f0d4e..24869b2c 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -47,7 +47,8 @@ use datafusion::physical_plan::metrics::{
 };
 
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
+    Statistics,
 };
 use futures::{StreamExt, TryFutureExt, TryStreamExt};
 
@@ -294,6 +295,24 @@ impl ShuffleWriterExec {
     }
 }
 
+impl DisplayAs for ShuffleWriterExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "ShuffleWriterExec: {:?}",
+                    self.shuffle_output_partitioning
+                )
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for ShuffleWriterExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -399,22 +418,6 @@ impl ExecutionPlan for ShuffleWriterExec {
         Some(self.metrics.clone_inner())
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(
-                    f,
-                    "ShuffleWriterExec: {:?}",
-                    self.shuffle_output_partitioning
-                )
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         self.plan.statistics()
     }
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index 7c799741..0557529f 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -23,7 +23,8 @@ use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
+    Statistics,
 };
 
 /// UnresolvedShuffleExec represents a dependency on the results of a 
ShuffleWriterExec node which hasn't computed yet.
@@ -57,6 +58,20 @@ impl UnresolvedShuffleExec {
     }
 }
 
+impl DisplayAs for UnresolvedShuffleExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "UnresolvedShuffleExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for UnresolvedShuffleExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -100,18 +115,6 @@ impl ExecutionPlan for UnresolvedShuffleExec {
         ))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(f, "UnresolvedShuffleExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         // The full statistics are computed in the `ShuffleReaderExec` node
         // that replaces this one once the previous stage is completed.
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs 
b/ballista/core/src/serde/scheduler/from_proto.rs
index 545896d8..a38578cd 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -18,7 +18,7 @@
 use chrono::{TimeZone, Utc};
 use datafusion::common::tree_node::{Transformed, TreeNode};
 use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
+use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 use datafusion::physical_plan::metrics::{
     Count, Gauge, MetricValue, MetricsSet, Time, Timestamp,
 };
@@ -279,6 +279,7 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     runtime: Arc<RuntimeEnv>,
     scalar_functions: HashMap<String, Arc<ScalarUDF>>,
     aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+    window_functions: HashMap<String, Arc<WindowUDF>>,
     codec: BallistaCodec<T, U>,
 ) -> Result<TaskDefinition, BallistaError> {
     let mut props = HashMap::new();
@@ -289,6 +290,7 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
 
     let mut task_scalar_functions = HashMap::new();
     let mut task_aggregate_functions = HashMap::new();
+    let mut task_window_functions = HashMap::new();
     // TODO combine the functions from Executor's functions and 
TaskDefinition's function resources
     for scalar_func in scalar_functions {
         task_scalar_functions.insert(scalar_func.0, scalar_func.1);
@@ -296,9 +298,13 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     for agg_func in aggregate_functions {
         task_aggregate_functions.insert(agg_func.0, agg_func.1);
     }
+    for agg_func in window_functions {
+        task_window_functions.insert(agg_func.0, agg_func.1);
+    }
     let function_registry = Arc::new(SimpleFunctionRegistry {
         scalar_functions: task_scalar_functions,
         aggregate_functions: task_aggregate_functions,
+        window_functions: task_window_functions,
     });
 
     let encoded_plan = task.plan.as_slice();
@@ -342,6 +348,7 @@ pub fn get_task_definition_vec<
     runtime: Arc<RuntimeEnv>,
     scalar_functions: HashMap<String, Arc<ScalarUDF>>,
     aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+    window_functions: HashMap<String, Arc<WindowUDF>>,
     codec: BallistaCodec<T, U>,
 ) -> Result<Vec<TaskDefinition>, BallistaError> {
     let mut props = HashMap::new();
@@ -352,6 +359,7 @@ pub fn get_task_definition_vec<
 
     let mut task_scalar_functions = HashMap::new();
     let mut task_aggregate_functions = HashMap::new();
+    let mut task_window_functions = HashMap::new();
     // TODO combine the functions from Executor's functions and 
TaskDefinition's function resources
     for scalar_func in scalar_functions {
         task_scalar_functions.insert(scalar_func.0, scalar_func.1);
@@ -359,9 +367,13 @@ pub fn get_task_definition_vec<
     for agg_func in aggregate_functions {
         task_aggregate_functions.insert(agg_func.0, agg_func.1);
     }
+    for agg_func in window_functions {
+        task_window_functions.insert(agg_func.0, agg_func.1);
+    }
     let function_registry = Arc::new(SimpleFunctionRegistry {
         scalar_functions: task_scalar_functions,
         aggregate_functions: task_aggregate_functions,
+        window_functions: task_window_functions,
     });
 
     let encoded_plan = multi_task.plan.as_slice();
diff --git a/ballista/core/src/serde/scheduler/mod.rs 
b/ballista/core/src/serde/scheduler/mod.rs
index 96c4e0fa..0ced200e 100644
--- a/ballista/core/src/serde/scheduler/mod.rs
+++ b/ballista/core/src/serde/scheduler/mod.rs
@@ -25,7 +25,7 @@ use datafusion::arrow::array::{
 use datafusion::arrow::datatypes::{DataType, Field};
 use datafusion::common::DataFusionError;
 use datafusion::execution::FunctionRegistry;
-use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
+use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::Partitioning;
 use serde::Serialize;
@@ -295,6 +295,7 @@ pub struct TaskDefinition {
 pub struct SimpleFunctionRegistry {
     pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
     pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+    pub window_functions: HashMap<String, Arc<WindowUDF>>,
 }
 
 impl FunctionRegistry for SimpleFunctionRegistry {
@@ -321,4 +322,14 @@ impl FunctionRegistry for SimpleFunctionRegistry {
             ))
         })
     }
+
+    fn udwf(&self, name: &str) -> datafusion::common::Result<Arc<WindowUDF>> {
+        let result = self.window_functions.get(name);
+
+        result.cloned().ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "There is no UDWF named \"{name}\" in the TaskContext"
+            ))
+        })
+    }
 }
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 2d19889c..5504e1c4 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -27,6 +27,7 @@ use datafusion::arrow::{ipc::writer::FileWriter, 
record_batch::RecordBatch};
 use datafusion::datasource::object_store::{
     DefaultObjectStoreRegistry, ObjectStoreRegistry,
 };
+use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::{
     QueryPlanner, SessionConfig, SessionContext, SessionState,
@@ -38,7 +39,6 @@ use 
datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use datafusion::physical_plan::common::batch_byte_size;
 use datafusion::physical_plan::empty::EmptyExec;
-use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::joins::HashJoinExec;
 use datafusion::physical_plan::metrics::MetricsSet;
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 65e1c51c..8dbccc32 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -27,7 +27,8 @@ use datafusion::error::DataFusionError;
 use datafusion::execution::context::TaskContext;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
-    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, 
Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
+    Statistics,
 };
 use datafusion::{error::Result, physical_plan::RecordBatchStream};
 use futures::stream::SelectAll;
@@ -46,6 +47,20 @@ impl CollectExec {
     }
 }
 
+impl DisplayAs for CollectExec {
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "CollectExec")
+            }
+        }
+    }
+}
+
 impl ExecutionPlan for CollectExec {
     fn as_any(&self) -> &dyn Any {
         self
@@ -93,18 +108,6 @@ impl ExecutionPlan for CollectExec {
         }))
     }
 
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(f, "CollectExec")
-            }
-        }
-    }
-
     fn statistics(&self) -> Statistics {
         self.plan.statistics()
     }
diff --git a/ballista/executor/src/execution_loop.rs 
b/ballista/executor/src/execution_loop.rs
index 47593ebd..6d7b4aec 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -181,6 +181,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
 
     let mut task_scalar_functions = HashMap::new();
     let mut task_aggregate_functions = HashMap::new();
+    let mut task_window_functions = HashMap::new();
     // TODO combine the functions from Executor's functions and 
TaskDefintion's function resources
     for scalar_func in executor.scalar_functions.clone() {
         task_scalar_functions.insert(scalar_func.0.clone(), scalar_func.1);
@@ -188,6 +189,9 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     for agg_func in executor.aggregate_functions.clone() {
         task_aggregate_functions.insert(agg_func.0, agg_func.1);
     }
+    for window_func in executor.window_functions.clone() {
+        task_window_functions.insert(window_func.0, window_func.1);
+    }
     let runtime = executor.runtime.clone();
     let session_id = task.session_id.clone();
     let task_context = Arc::new(TaskContext::new(
@@ -196,6 +200,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
         session_config,
         task_scalar_functions,
         task_aggregate_functions,
+        task_window_functions,
         runtime.clone(),
     ));
 
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 90d960d0..f5828bbb 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -28,6 +28,7 @@ use ballista_core::serde::scheduler::PartitionId;
 use dashmap::DashMap;
 use datafusion::execution::context::TaskContext;
 use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::logical_expr::WindowUDF;
 use datafusion::physical_plan::udaf::AggregateUDF;
 use datafusion::physical_plan::udf::ScalarUDF;
 use futures::future::AbortHandle;
@@ -68,6 +69,9 @@ pub struct Executor {
     /// Aggregate functions registered in the Executor
     pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
 
+    /// Window functions registered in the Executor
+    pub window_functions: HashMap<String, Arc<WindowUDF>>,
+
     /// Runtime environment for Executor
     pub runtime: Arc<RuntimeEnv>,
 
@@ -101,6 +105,7 @@ impl Executor {
             // TODO add logic to dynamically load UDF/UDAFs libs from files
             scalar_functions: HashMap::new(),
             aggregate_functions: HashMap::new(),
+            window_functions: HashMap::new(),
             runtime,
             metrics_collector,
             concurrent_tasks,
@@ -189,8 +194,8 @@ mod test {
     use datafusion::error::DataFusionError;
     use datafusion::physical_expr::PhysicalSortExpr;
     use datafusion::physical_plan::{
-        ExecutionPlan, Partitioning, RecordBatchStream, 
SendableRecordBatchStream,
-        Statistics,
+        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
+        SendableRecordBatchStream, Statistics,
     };
     use datafusion::prelude::SessionContext;
     use futures::Stream;
@@ -225,6 +230,20 @@ mod test {
     #[derive(Debug)]
     pub struct NeverendingOperator;
 
+    impl DisplayAs for NeverendingOperator {
+        fn fmt_as(
+            &self,
+            t: DisplayFormatType,
+            f: &mut std::fmt::Formatter,
+        ) -> std::fmt::Result {
+            match t {
+                DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                    write!(f, "NeverendingOperator")
+                }
+            }
+        }
+    }
+
     impl ExecutionPlan for NeverendingOperator {
         fn as_any(&self) -> &dyn Any {
             self
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index 2892cb0b..6341a38d 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -351,6 +351,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
                 session_config,
                 function_registry.scalar_functions.clone(),
                 function_registry.aggregate_functions.clone(),
+                function_registry.window_functions.clone(),
                 runtime,
             ))
         };
@@ -634,6 +635,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
                         self.executor.runtime.clone(),
                         self.executor.scalar_functions.clone(),
                         self.executor.aggregate_functions.clone(),
+                        self.executor.window_functions.clone(),
                         self.codec.clone(),
                     )
                     .map_err(|e| Status::invalid_argument(format!("{e}")))?,
@@ -661,6 +663,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorGrpc
                 self.executor.runtime.clone(),
                 self.executor.scalar_functions.clone(),
                 self.executor.aggregate_functions.clone(),
+                self.executor.window_functions.clone(),
                 self.codec.clone(),
             )
             .map_err(|e| Status::invalid_argument(format!("{e}")))?;
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
index db13b518..a5329680 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -19,13 +19,17 @@ use arrow_flight::flight_descriptor::DescriptorType;
 use arrow_flight::flight_service_server::FlightService;
 use arrow_flight::sql::server::FlightSqlService;
 use arrow_flight::sql::{
+    ActionBeginSavepointRequest, ActionBeginSavepointResult,
+    ActionBeginTransactionRequest, ActionBeginTransactionResult,
+    ActionCancelQueryRequest, ActionCancelQueryResult,
     ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
-    ActionCreatePreparedStatementResult, CommandGetCatalogs, 
CommandGetCrossReference,
-    CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
-    CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes, 
CommandGetTables,
-    CommandGetXdbcTypeInfo, CommandPreparedStatementQuery,
-    CommandPreparedStatementUpdate, CommandStatementQuery, 
CommandStatementUpdate,
-    SqlInfo, TicketStatementQuery,
+    ActionCreatePreparedStatementResult, 
ActionCreatePreparedSubstraitPlanRequest,
+    ActionEndSavepointRequest, ActionEndTransactionRequest, CommandGetCatalogs,
+    CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
+    CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
+    CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
+    CommandPreparedStatementQuery, CommandPreparedStatementUpdate, 
CommandStatementQuery,
+    CommandStatementSubstraitPlan, CommandStatementUpdate, SqlInfo, 
TicketStatementQuery,
 };
 use arrow_flight::{
     Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, 
HandshakeRequest,
@@ -401,6 +405,7 @@ impl FlightSqlServiceImpl {
             endpoint: fieps,
             total_records: num_rows,
             total_bytes: num_bytes,
+            ordered: false,
         };
         Response::new(info)
     }
@@ -895,17 +900,104 @@ impl FlightSqlService for FlightSqlServiceImpl {
         &self,
         handle: ActionClosePreparedStatementRequest,
         _request: Request<Action>,
-    ) {
+    ) -> Result<(), Status> {
         debug!("do_action_close_prepared_statement");
-        let handle = 
Uuid::from_slice(handle.prepared_statement_handle.as_ref());
-        let handle = if let Ok(handle) = handle {
-            debug!("Closing {}", handle);
-            handle
-        } else {
-            return;
-        };
-        let _ = self.remove_plan(handle);
+        let handle = 
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
+            .map(|id| {
+                debug!("Closing {}", id);
+                id
+            })
+            .map_err(|e| Status::internal(format!("Failed to parse handle: 
{e:?}")))?;
+
+        self.remove_plan(handle)
+    }
+
+    /// Get a FlightInfo for executing a substrait plan.
+    async fn get_flight_info_substrait_plan(
+        &self,
+        _query: CommandStatementSubstraitPlan,
+        _request: Request<FlightDescriptor>,
+    ) -> Result<Response<FlightInfo>, Status> {
+        debug!("get_flight_info_substrait_plan");
+        Err(Status::unimplemented(
+            "Implement get_flight_info_substrait_plan",
+        ))
+    }
+
+    /// Execute a substrait plan
+    async fn do_put_substrait_plan(
+        &self,
+        _query: CommandStatementSubstraitPlan,
+        _request: Request<Streaming<FlightData>>,
+    ) -> Result<i64, Status> {
+        debug!("do_put_substrait_plan");
+        Err(Status::unimplemented("Implement do_put_substrait_plan"))
+    }
+
+    /// Create a prepared substrait plan.
+    async fn do_action_create_prepared_substrait_plan(
+        &self,
+        _query: ActionCreatePreparedSubstraitPlanRequest,
+        _request: Request<Action>,
+    ) -> Result<ActionCreatePreparedStatementResult, Status> {
+        debug!("do_action_create_prepared_substrait_plan");
+        Err(Status::unimplemented(
+            "Implement do_action_create_prepared_substrait_plan",
+        ))
+    }
+
+    /// Begin a transaction
+    async fn do_action_begin_transaction(
+        &self,
+        _query: ActionBeginTransactionRequest,
+        _request: Request<Action>,
+    ) -> Result<ActionBeginTransactionResult, Status> {
+        debug!("do_action_begin_transaction");
+        Err(Status::unimplemented(
+            "Implement do_action_begin_transaction",
+        ))
+    }
+
+    /// End a transaction
+    async fn do_action_end_transaction(
+        &self,
+        _query: ActionEndTransactionRequest,
+        _request: Request<Action>,
+    ) -> Result<(), Status> {
+        debug!("do_action_end_transaction");
+        Err(Status::unimplemented("Implement do_action_end_transaction"))
+    }
+
+    /// Begin a savepoint
+    async fn do_action_begin_savepoint(
+        &self,
+        _query: ActionBeginSavepointRequest,
+        _request: Request<Action>,
+    ) -> Result<ActionBeginSavepointResult, Status> {
+        debug!("do_action_begin_savepoint");
+        Err(Status::unimplemented("Implement do_action_begin_savepoint"))
+    }
+
+    /// End a savepoint
+    async fn do_action_end_savepoint(
+        &self,
+        _query: ActionEndSavepointRequest,
+        _request: Request<Action>,
+    ) -> Result<(), Status> {
+        debug!("do_action_end_savepoint");
+        Err(Status::unimplemented("Implement do_action_end_savepoint"))
+    }
+
+    /// Cancel a query
+    async fn do_action_cancel_query(
+        &self,
+        _query: ActionCancelQueryRequest,
+        _request: Request<Action>,
+    ) -> Result<ActionCancelQueryResult, Status> {
+        debug!("do_action_cancel_query");
+        Err(Status::unimplemented("Implement do_action_cancel_query"))
     }
 
+    /// Register a new SqlInfo result, making it available when calling 
GetSqlInfo.
     async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
 }
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index 763a7ef2..3e8bdc6c 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -345,7 +345,7 @@ mod test {
         let job_uuid = Uuid::new_v4();
         let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
         for stage in &stages {
-            println!("{}", displayable(stage.as_ref()).indent());
+            println!("{}", displayable(stage.as_ref()).indent(false));
         }
 
         /* Expected result:
@@ -451,7 +451,7 @@ order by
         let job_uuid = Uuid::new_v4();
         let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
         for stage in &stages {
-            println!("{}", displayable(stage.as_ref()).indent());
+            println!("{}", displayable(stage.as_ref()).indent(false));
         }
 
         /* Expected result:
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index dde1268c..f5ea22d0 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1622,7 +1622,7 @@ pub struct TaskDescription {
 
 impl Debug for TaskDescription {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let plan = 
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
         write!(
             f,
             "TaskDescription[session_id: {},job: {}, stage: {}.{}, partition: 
{} task_id {}, task attempt {}]\n{}",
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index a4cdf238..fcac54d5 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -413,7 +413,7 @@ impl UnresolvedStage {
 
 impl Debug for UnresolvedStage {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let plan = 
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
 
         write!(
             f,
@@ -529,7 +529,7 @@ impl ResolvedStage {
 
 impl Debug for ResolvedStage {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let plan = 
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
 
         write!(
             f,
@@ -860,7 +860,7 @@ impl RunningStage {
 
 impl Debug for RunningStage {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let plan = 
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
 
         write!(
             f,
@@ -1147,7 +1147,7 @@ impl FailedStage {
 
 impl Debug for FailedStage {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+        let plan = 
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
 
         write!(
             f,
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 254b6072..ce672f26 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -22,12 +22,12 @@ use ballista_core::execution_plans::{
     ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
 };
 use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{
+    AvroExec, CsvExec, FileScanConfig, NdJsonExec, ParquetExec,
+};
 use datafusion::physical_plan::aggregates::AggregateExec;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::file_format::{
-    AvroExec, CsvExec, FileScanConfig, NdJsonExec, ParquetExec,
-};
 use datafusion::physical_plan::filter::FilterExec;
 use datafusion::physical_plan::joins::CrossJoinExec;
 use datafusion::physical_plan::joins::HashJoinExec;
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 957bbcfe..ba8647ee 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -405,7 +405,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
         let plan = session_ctx.state().create_physical_plan(plan).await?;
         debug!(
             "Physical plan: {}",
-            DisplayableExecutionPlan::new(plan.as_ref()).indent()
+            DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
         );
 
         self.task_manager
diff --git a/ballista/scheduler/src/test_utils.rs 
b/ballista/scheduler/src/test_utils.rs
index fe274688..051b41ad 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -813,7 +813,10 @@ pub async fn test_aggregation_plan_with_job_id(
         .await
         .unwrap();
 
-    println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+    println!(
+        "{}",
+        DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+    );
 
     ExecutionGraph::new("localhost:50050", job_id, "", "session", plan, 
0).unwrap()
 }
@@ -845,7 +848,10 @@ pub async fn test_two_aggregations_plan(partition: usize) 
-> ExecutionGraph {
         .await
         .unwrap();
 
-    println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+    println!(
+        "{}",
+        DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+    );
 
     ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap()
 }
@@ -917,7 +923,10 @@ pub async fn test_join_plan(partition: usize) -> 
ExecutionGraph {
         .await
         .unwrap();
 
-    println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+    println!(
+        "{}",
+        DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+    );
 
     let graph =
         ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap();
@@ -946,7 +955,10 @@ pub async fn test_union_all_plan(partition: usize) -> 
ExecutionGraph {
         .await
         .unwrap();
 
-    println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+    println!(
+        "{}",
+        DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+    );
 
     let graph =
         ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap();
@@ -975,7 +987,10 @@ pub async fn test_union_plan(partition: usize) -> 
ExecutionGraph {
         .await
         .unwrap();
 
-    println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+    println!(
+        "{}",
+        DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+    );
 
     let graph =
         ExecutionGraph::new("localhost:50050", "job", "", "session", plan, 
0).unwrap();
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1a58714f..b2ffb1ba 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -712,7 +712,7 @@ async fn execute_query(
     if debug {
         println!(
             "=== Physical plan ===\n{}\n",
-            displayable(physical_plan.as_ref()).indent()
+            displayable(physical_plan.as_ref()).indent(false)
         );
     }
     let task_ctx = ctx.task_ctx();
@@ -720,7 +720,7 @@ async fn execute_query(
     if debug {
         println!(
             "=== Physical plan with metrics ===\n{}\n",
-            
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent()
+            
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(false)
         );
         if !result.is_empty() {
             pretty::print_batches(&result)?;
@@ -1696,8 +1696,8 @@ mod ballista_round_trip {
                 )
                 .unwrap();
             assert_eq!(
-                format!("{}", displayable(physical_plan.as_ref()).indent()),
-                format!("{}", displayable(round_trip.as_ref()).indent()),
+                format!("{}", 
displayable(physical_plan.as_ref()).indent(false)),
+                format!("{}", displayable(round_trip.as_ref()).indent(false)),
                 "physical plan round trip failed"
             );
         }


Reply via email to