This is an automated email from the ASF dual-hosted git repository.
nju_yaho pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 169ffa5e Upgrade DataFusion to 27.0.0 (#834)
169ffa5e is described below
commit 169ffa5eff5e41721053201c28f2f591a3614822
Author: r.4ntix <[email protected]>
AuthorDate: Thu Jul 13 09:53:42 2023 +0800
Upgrade DataFusion to 27.0.0 (#834)
* Upgrade DataFusion to 27.0.0
* Fix optimizer rule 'push_down_projection' failed
* Fix tests error of bencmarks
* Fix physical plan encode/decode error
---
Cargo.toml | 33 +++---
ballista/client/README.md | 4 +-
.../core/src/execution_plans/distributed_query.rs | 37 ++++---
.../core/src/execution_plans/shuffle_reader.rs | 28 ++---
.../core/src/execution_plans/shuffle_writer.rs | 37 ++++---
.../core/src/execution_plans/unresolved_shuffle.rs | 29 ++---
ballista/core/src/serde/scheduler/from_proto.rs | 14 ++-
ballista/core/src/serde/scheduler/mod.rs | 13 ++-
ballista/core/src/utils.rs | 2 +-
ballista/executor/src/collect.rs | 29 ++---
ballista/executor/src/execution_loop.rs | 5 +
ballista/executor/src/executor.rs | 23 +++-
ballista/executor/src/executor_server.rs | 3 +
ballista/scheduler/src/flight_sql.rs | 122 ++++++++++++++++++---
ballista/scheduler/src/planner.rs | 4 +-
ballista/scheduler/src/state/execution_graph.rs | 2 +-
.../src/state/execution_graph/execution_stage.rs | 8 +-
.../scheduler/src/state/execution_graph_dot.rs | 6 +-
ballista/scheduler/src/state/mod.rs | 2 +-
ballista/scheduler/src/test_utils.rs | 25 ++++-
benchmarks/src/bin/tpch.rs | 8 +-
21 files changed, 306 insertions(+), 128 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index c47dfdce..42f356f1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,21 +19,21 @@
members = ["ballista-cli", "ballista/client", "ballista/core",
"ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
[workspace.dependencies]
-arrow = { version = "40.0.0" }
-arrow-flight = { version = "40.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "40.0.0", default-features = false }
-configure_me = { version = "0.4.0" }
-configure_me_codegen = { version = "0.4.4" }
-datafusion = "26.0.0"
-datafusion-cli = "26.0.0"
-datafusion-proto = "26.0.0"
-object_store = "0.5.6"
-sqlparser = "0.34.0"
-tonic = { version = "0.9" }
-tonic-build = { version = "0.9", default-features = false, features =
["transport", "prost"] }
+arrow = {version = "43.0.0"}
+arrow-flight = {version = "43.0.0", features = ["flight-sql-experimental"]}
+arrow-schema = {version = "43.0.0", default-features = false}
+configure_me = {version = "0.4.0"}
+configure_me_codegen = {version = "0.4.4"}
+datafusion = "27.0.0"
+datafusion-cli = "27.0.0"
+datafusion-proto = "27.0.0"
+object_store = "0.6.1"
+sqlparser = "0.35.0"
+tonic = {version = "0.9"}
+tonic-build = {version = "0.9", default-features = false, features =
["transport", "prost"]}
tracing = "0.1.36"
tracing-appender = "0.2.2"
-tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
+tracing-subscriber = {version = "0.3.15", features = ["env-filter"]}
# cargo build --profile release-lto
[profile.release-lto]
@@ -54,3 +54,10 @@ opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
+
+[patch.crates-io]
+# TODO remove on upgrade to DataFusion 28.0.0
+# fix for https://github.com/apache/arrow-datafusion/issues/6819 and
https://github.com/apache/arrow-datafusion/issues/6898
+datafusion = {git = "https://github.com/apache/arrow-datafusion.git", rev =
"4e2a72f6c7109d40a4986e3d05360524be078dd4"}
+datafusion-cli = {git = "https://github.com/apache/arrow-datafusion.git", rev
= "4e2a72f6c7109d40a4986e3d05360524be078dd4"}
+datafusion-proto = {git = "https://github.com/apache/arrow-datafusion.git",
rev = "4e2a72f6c7109d40a4986e3d05360524be078dd4"}
diff --git a/ballista/client/README.md b/ballista/client/README.md
index 26cd90cc..86238a62 100644
--- a/ballista/client/README.md
+++ b/ballista/client/README.md
@@ -84,8 +84,8 @@ To build a simple ballista example, add the following
dependencies to your `Carg
```toml
[dependencies]
-ballista = "0.10"
-datafusion = "21.0.0"
+ballista = "0.11"
+datafusion = "27.0.0"
tokio = "1.0"
```
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index a7d42b82..ccb26206 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -33,7 +33,8 @@ use datafusion::logical_expr::LogicalPlan;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
+ Statistics,
};
use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -119,6 +120,24 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
}
}
+impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "DistributedQueryExec: scheduler_url={}",
+ self.scheduler_url
+ )
+ }
+ }
+ }
+}
+
impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
fn as_any(&self) -> &dyn Any {
self
@@ -191,22 +210,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(
- f,
- "DistributedQueryExec: scheduler_url={}",
- self.scheduler_url
- )
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// This execution plan sends the logical plan to the scheduler without
// performing the node by node conversion to a full physical plan.
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index f80002cb..fa3f9f69 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -37,7 +37,7 @@ use datafusion::error::Result;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use futures::{Stream, StreamExt, TryStreamExt};
@@ -83,6 +83,20 @@ impl ShuffleReaderExec {
}
}
+impl DisplayAs for ShuffleReaderExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "ShuffleReaderExec: partitions={}",
self.partition.len())
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ShuffleReaderExec {
fn as_any(&self) -> &dyn Any {
self
@@ -154,18 +168,6 @@ impl ExecutionPlan for ShuffleReaderExec {
Ok(Box::pin(result))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "ShuffleReaderExec: partitions={}",
self.partition.len())
- }
- }
- }
-
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index 742f0d4e..24869b2c 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -47,7 +47,8 @@ use datafusion::physical_plan::metrics::{
};
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
+ Statistics,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
@@ -294,6 +295,24 @@ impl ShuffleWriterExec {
}
}
+impl DisplayAs for ShuffleWriterExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "ShuffleWriterExec: {:?}",
+ self.shuffle_output_partitioning
+ )
+ }
+ }
+ }
+}
+
impl ExecutionPlan for ShuffleWriterExec {
fn as_any(&self) -> &dyn Any {
self
@@ -399,22 +418,6 @@ impl ExecutionPlan for ShuffleWriterExec {
Some(self.metrics.clone_inner())
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(
- f,
- "ShuffleWriterExec: {:?}",
- self.shuffle_output_partitioning
- )
- }
- }
- }
-
fn statistics(&self) -> Statistics {
self.plan.statistics()
}
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index 7c799741..0557529f 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -23,7 +23,8 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
+ Statistics,
};
/// UnresolvedShuffleExec represents a dependency on the results of a
ShuffleWriterExec node which hasn't computed yet.
@@ -57,6 +58,20 @@ impl UnresolvedShuffleExec {
}
}
+impl DisplayAs for UnresolvedShuffleExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "UnresolvedShuffleExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for UnresolvedShuffleExec {
fn as_any(&self) -> &dyn Any {
self
@@ -100,18 +115,6 @@ impl ExecutionPlan for UnresolvedShuffleExec {
))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "UnresolvedShuffleExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
// The full statistics are computed in the `ShuffleReaderExec` node
// that replaces this one once the previous stage is completed.
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs
b/ballista/core/src/serde/scheduler/from_proto.rs
index 545896d8..a38578cd 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -18,7 +18,7 @@
use chrono::{TimeZone, Utc};
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::execution::runtime_env::RuntimeEnv;
-use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
+use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion::physical_plan::metrics::{
Count, Gauge, MetricValue, MetricsSet, Time, Timestamp,
};
@@ -279,6 +279,7 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
runtime: Arc<RuntimeEnv>,
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+ window_functions: HashMap<String, Arc<WindowUDF>>,
codec: BallistaCodec<T, U>,
) -> Result<TaskDefinition, BallistaError> {
let mut props = HashMap::new();
@@ -289,6 +290,7 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
let mut task_scalar_functions = HashMap::new();
let mut task_aggregate_functions = HashMap::new();
+ let mut task_window_functions = HashMap::new();
// TODO combine the functions from Executor's functions and
TaskDefinition's function resources
for scalar_func in scalar_functions {
task_scalar_functions.insert(scalar_func.0, scalar_func.1);
@@ -296,9 +298,13 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
for agg_func in aggregate_functions {
task_aggregate_functions.insert(agg_func.0, agg_func.1);
}
+ for agg_func in window_functions {
+ task_window_functions.insert(agg_func.0, agg_func.1);
+ }
let function_registry = Arc::new(SimpleFunctionRegistry {
scalar_functions: task_scalar_functions,
aggregate_functions: task_aggregate_functions,
+ window_functions: task_window_functions,
});
let encoded_plan = task.plan.as_slice();
@@ -342,6 +348,7 @@ pub fn get_task_definition_vec<
runtime: Arc<RuntimeEnv>,
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+ window_functions: HashMap<String, Arc<WindowUDF>>,
codec: BallistaCodec<T, U>,
) -> Result<Vec<TaskDefinition>, BallistaError> {
let mut props = HashMap::new();
@@ -352,6 +359,7 @@ pub fn get_task_definition_vec<
let mut task_scalar_functions = HashMap::new();
let mut task_aggregate_functions = HashMap::new();
+ let mut task_window_functions = HashMap::new();
// TODO combine the functions from Executor's functions and
TaskDefinition's function resources
for scalar_func in scalar_functions {
task_scalar_functions.insert(scalar_func.0, scalar_func.1);
@@ -359,9 +367,13 @@ pub fn get_task_definition_vec<
for agg_func in aggregate_functions {
task_aggregate_functions.insert(agg_func.0, agg_func.1);
}
+ for agg_func in window_functions {
+ task_window_functions.insert(agg_func.0, agg_func.1);
+ }
let function_registry = Arc::new(SimpleFunctionRegistry {
scalar_functions: task_scalar_functions,
aggregate_functions: task_aggregate_functions,
+ window_functions: task_window_functions,
});
let encoded_plan = multi_task.plan.as_slice();
diff --git a/ballista/core/src/serde/scheduler/mod.rs
b/ballista/core/src/serde/scheduler/mod.rs
index 96c4e0fa..0ced200e 100644
--- a/ballista/core/src/serde/scheduler/mod.rs
+++ b/ballista/core/src/serde/scheduler/mod.rs
@@ -25,7 +25,7 @@ use datafusion::arrow::array::{
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::DataFusionError;
use datafusion::execution::FunctionRegistry;
-use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
+use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use serde::Serialize;
@@ -295,6 +295,7 @@ pub struct TaskDefinition {
pub struct SimpleFunctionRegistry {
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+ pub window_functions: HashMap<String, Arc<WindowUDF>>,
}
impl FunctionRegistry for SimpleFunctionRegistry {
@@ -321,4 +322,14 @@ impl FunctionRegistry for SimpleFunctionRegistry {
))
})
}
+
+ fn udwf(&self, name: &str) -> datafusion::common::Result<Arc<WindowUDF>> {
+ let result = self.window_functions.get(name);
+
+ result.cloned().ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "There is no UDWF named \"{name}\" in the TaskContext"
+ ))
+ })
+ }
}
diff --git a/ballista/core/src/utils.rs b/ballista/core/src/utils.rs
index 2d19889c..5504e1c4 100644
--- a/ballista/core/src/utils.rs
+++ b/ballista/core/src/utils.rs
@@ -27,6 +27,7 @@ use datafusion::arrow::{ipc::writer::FileWriter,
record_batch::RecordBatch};
use datafusion::datasource::object_store::{
DefaultObjectStoreRegistry, ObjectStoreRegistry,
};
+use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{
QueryPlanner, SessionConfig, SessionContext, SessionState,
@@ -38,7 +39,6 @@ use
datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::common::batch_byte_size;
use datafusion::physical_plan::empty::EmptyExec;
-use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::metrics::MetricsSet;
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 65e1c51c..8dbccc32 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -27,7 +27,8 @@ use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
- DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
+ Statistics,
};
use datafusion::{error::Result, physical_plan::RecordBatchStream};
use futures::stream::SelectAll;
@@ -46,6 +47,20 @@ impl CollectExec {
}
}
+impl DisplayAs for CollectExec {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "CollectExec")
+ }
+ }
+ }
+}
+
impl ExecutionPlan for CollectExec {
fn as_any(&self) -> &dyn Any {
self
@@ -93,18 +108,6 @@ impl ExecutionPlan for CollectExec {
}))
}
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(f, "CollectExec")
- }
- }
- }
-
fn statistics(&self) -> Statistics {
self.plan.statistics()
}
diff --git a/ballista/executor/src/execution_loop.rs
b/ballista/executor/src/execution_loop.rs
index 47593ebd..6d7b4aec 100644
--- a/ballista/executor/src/execution_loop.rs
+++ b/ballista/executor/src/execution_loop.rs
@@ -181,6 +181,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
let mut task_scalar_functions = HashMap::new();
let mut task_aggregate_functions = HashMap::new();
+ let mut task_window_functions = HashMap::new();
// TODO combine the functions from Executor's functions and
TaskDefintion's function resources
for scalar_func in executor.scalar_functions.clone() {
task_scalar_functions.insert(scalar_func.0.clone(), scalar_func.1);
@@ -188,6 +189,9 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
for agg_func in executor.aggregate_functions.clone() {
task_aggregate_functions.insert(agg_func.0, agg_func.1);
}
+ for window_func in executor.window_functions.clone() {
+ task_window_functions.insert(window_func.0, window_func.1);
+ }
let runtime = executor.runtime.clone();
let session_id = task.session_id.clone();
let task_context = Arc::new(TaskContext::new(
@@ -196,6 +200,7 @@ async fn run_received_task<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
session_config,
task_scalar_functions,
task_aggregate_functions,
+ task_window_functions,
runtime.clone(),
));
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 90d960d0..f5828bbb 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -28,6 +28,7 @@ use ballista_core::serde::scheduler::PartitionId;
use dashmap::DashMap;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::RuntimeEnv;
+use datafusion::logical_expr::WindowUDF;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use futures::future::AbortHandle;
@@ -68,6 +69,9 @@ pub struct Executor {
/// Aggregate functions registered in the Executor
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
+ /// Window functions registered in the Executor
+ pub window_functions: HashMap<String, Arc<WindowUDF>>,
+
/// Runtime environment for Executor
pub runtime: Arc<RuntimeEnv>,
@@ -101,6 +105,7 @@ impl Executor {
// TODO add logic to dynamically load UDF/UDAFs libs from files
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
+ window_functions: HashMap::new(),
runtime,
metrics_collector,
concurrent_tasks,
@@ -189,8 +194,8 @@ mod test {
use datafusion::error::DataFusionError;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::{
- ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
+ SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::SessionContext;
use futures::Stream;
@@ -225,6 +230,20 @@ mod test {
#[derive(Debug)]
pub struct NeverendingOperator;
+ impl DisplayAs for NeverendingOperator {
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(f, "NeverendingOperator")
+ }
+ }
+ }
+ }
+
impl ExecutionPlan for NeverendingOperator {
fn as_any(&self) -> &dyn Any {
self
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index 2892cb0b..6341a38d 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -351,6 +351,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
session_config,
function_registry.scalar_functions.clone(),
function_registry.aggregate_functions.clone(),
+ function_registry.window_functions.clone(),
runtime,
))
};
@@ -634,6 +635,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorGrpc
self.executor.runtime.clone(),
self.executor.scalar_functions.clone(),
self.executor.aggregate_functions.clone(),
+ self.executor.window_functions.clone(),
self.codec.clone(),
)
.map_err(|e| Status::invalid_argument(format!("{e}")))?,
@@ -661,6 +663,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorGrpc
self.executor.runtime.clone(),
self.executor.scalar_functions.clone(),
self.executor.aggregate_functions.clone(),
+ self.executor.window_functions.clone(),
self.codec.clone(),
)
.map_err(|e| Status::invalid_argument(format!("{e}")))?;
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index db13b518..a5329680 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -19,13 +19,17 @@ use arrow_flight::flight_descriptor::DescriptorType;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::sql::server::FlightSqlService;
use arrow_flight::sql::{
+ ActionBeginSavepointRequest, ActionBeginSavepointResult,
+ ActionBeginTransactionRequest, ActionBeginTransactionResult,
+ ActionCancelQueryRequest, ActionCancelQueryResult,
ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
- ActionCreatePreparedStatementResult, CommandGetCatalogs,
CommandGetCrossReference,
- CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys,
- CommandGetPrimaryKeys, CommandGetSqlInfo, CommandGetTableTypes,
CommandGetTables,
- CommandGetXdbcTypeInfo, CommandPreparedStatementQuery,
- CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementUpdate,
- SqlInfo, TicketStatementQuery,
+ ActionCreatePreparedStatementResult,
ActionCreatePreparedSubstraitPlanRequest,
+ ActionEndSavepointRequest, ActionEndTransactionRequest, CommandGetCatalogs,
+ CommandGetCrossReference, CommandGetDbSchemas, CommandGetExportedKeys,
+ CommandGetImportedKeys, CommandGetPrimaryKeys, CommandGetSqlInfo,
+ CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
+ CommandPreparedStatementQuery, CommandPreparedStatementUpdate,
CommandStatementQuery,
+ CommandStatementSubstraitPlan, CommandStatementUpdate, SqlInfo,
TicketStatementQuery,
};
use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest,
@@ -401,6 +405,7 @@ impl FlightSqlServiceImpl {
endpoint: fieps,
total_records: num_rows,
total_bytes: num_bytes,
+ ordered: false,
};
Response::new(info)
}
@@ -895,17 +900,104 @@ impl FlightSqlService for FlightSqlServiceImpl {
&self,
handle: ActionClosePreparedStatementRequest,
_request: Request<Action>,
- ) {
+ ) -> Result<(), Status> {
debug!("do_action_close_prepared_statement");
- let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_ref());
- let handle = if let Ok(handle) = handle {
- debug!("Closing {}", handle);
- handle
- } else {
- return;
- };
- let _ = self.remove_plan(handle);
+ let handle =
Uuid::from_slice(handle.prepared_statement_handle.as_ref())
+ .map(|id| {
+ debug!("Closing {}", id);
+ id
+ })
+ .map_err(|e| Status::internal(format!("Failed to parse handle:
{e:?}")))?;
+
+ self.remove_plan(handle)
+ }
+
+ /// Get a FlightInfo for executing a substrait plan.
+ async fn get_flight_info_substrait_plan(
+ &self,
+ _query: CommandStatementSubstraitPlan,
+ _request: Request<FlightDescriptor>,
+ ) -> Result<Response<FlightInfo>, Status> {
+ debug!("get_flight_info_substrait_plan");
+ Err(Status::unimplemented(
+ "Implement get_flight_info_substrait_plan",
+ ))
+ }
+
+ /// Execute a substrait plan
+ async fn do_put_substrait_plan(
+ &self,
+ _query: CommandStatementSubstraitPlan,
+ _request: Request<Streaming<FlightData>>,
+ ) -> Result<i64, Status> {
+ debug!("do_put_substrait_plan");
+ Err(Status::unimplemented("Implement do_put_substrait_plan"))
+ }
+
+ /// Create a prepared substrait plan.
+ async fn do_action_create_prepared_substrait_plan(
+ &self,
+ _query: ActionCreatePreparedSubstraitPlanRequest,
+ _request: Request<Action>,
+ ) -> Result<ActionCreatePreparedStatementResult, Status> {
+ debug!("do_action_create_prepared_substrait_plan");
+ Err(Status::unimplemented(
+ "Implement do_action_create_prepared_substrait_plan",
+ ))
+ }
+
+ /// Begin a transaction
+ async fn do_action_begin_transaction(
+ &self,
+ _query: ActionBeginTransactionRequest,
+ _request: Request<Action>,
+ ) -> Result<ActionBeginTransactionResult, Status> {
+ debug!("do_action_begin_transaction");
+ Err(Status::unimplemented(
+ "Implement do_action_begin_transaction",
+ ))
+ }
+
+ /// End a transaction
+ async fn do_action_end_transaction(
+ &self,
+ _query: ActionEndTransactionRequest,
+ _request: Request<Action>,
+ ) -> Result<(), Status> {
+ debug!("do_action_end_transaction");
+ Err(Status::unimplemented("Implement do_action_end_transaction"))
+ }
+
+ /// Begin a savepoint
+ async fn do_action_begin_savepoint(
+ &self,
+ _query: ActionBeginSavepointRequest,
+ _request: Request<Action>,
+ ) -> Result<ActionBeginSavepointResult, Status> {
+ debug!("do_action_begin_savepoint");
+ Err(Status::unimplemented("Implement do_action_begin_savepoint"))
+ }
+
+ /// End a savepoint
+ async fn do_action_end_savepoint(
+ &self,
+ _query: ActionEndSavepointRequest,
+ _request: Request<Action>,
+ ) -> Result<(), Status> {
+ debug!("do_action_end_savepoint");
+ Err(Status::unimplemented("Implement do_action_end_savepoint"))
+ }
+
+ /// Cancel a query
+ async fn do_action_cancel_query(
+ &self,
+ _query: ActionCancelQueryRequest,
+ _request: Request<Action>,
+ ) -> Result<ActionCancelQueryResult, Status> {
+ debug!("do_action_cancel_query");
+ Err(Status::unimplemented("Implement do_action_cancel_query"))
}
+ /// Register a new SqlInfo result, making it available when calling
GetSqlInfo.
async fn register_sql_info(&self, _id: i32, _result: &SqlInfo) {}
}
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index 763a7ef2..3e8bdc6c 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -345,7 +345,7 @@ mod test {
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
- println!("{}", displayable(stage.as_ref()).indent());
+ println!("{}", displayable(stage.as_ref()).indent(false));
}
/* Expected result:
@@ -451,7 +451,7 @@ order by
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
for stage in &stages {
- println!("{}", displayable(stage.as_ref()).indent());
+ println!("{}", displayable(stage.as_ref()).indent(false));
}
/* Expected result:
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index dde1268c..f5ea22d0 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -1622,7 +1622,7 @@ pub struct TaskDescription {
impl Debug for TaskDescription {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ let plan =
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
write!(
f,
"TaskDescription[session_id: {},job: {}, stage: {}.{}, partition:
{} task_id {}, task attempt {}]\n{}",
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index a4cdf238..fcac54d5 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -413,7 +413,7 @@ impl UnresolvedStage {
impl Debug for UnresolvedStage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ let plan =
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
write!(
f,
@@ -529,7 +529,7 @@ impl ResolvedStage {
impl Debug for ResolvedStage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ let plan =
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
write!(
f,
@@ -860,7 +860,7 @@ impl RunningStage {
impl Debug for RunningStage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ let plan =
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
write!(
f,
@@ -1147,7 +1147,7 @@ impl FailedStage {
impl Debug for FailedStage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let plan = DisplayableExecutionPlan::new(self.plan.as_ref()).indent();
+ let plan =
DisplayableExecutionPlan::new(self.plan.as_ref()).indent(false);
write!(
f,
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 254b6072..ce672f26 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -22,12 +22,12 @@ use ballista_core::execution_plans::{
ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use datafusion::datasource::listing::PartitionedFile;
+use datafusion::datasource::physical_plan::{
+ AvroExec, CsvExec, FileScanConfig, NdJsonExec, ParquetExec,
+};
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion::physical_plan::file_format::{
- AvroExec, CsvExec, FileScanConfig, NdJsonExec, ParquetExec,
-};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::CrossJoinExec;
use datafusion::physical_plan::joins::HashJoinExec;
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 957bbcfe..ba8647ee 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -405,7 +405,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
let plan = session_ctx.state().create_physical_plan(plan).await?;
debug!(
"Physical plan: {}",
- DisplayableExecutionPlan::new(plan.as_ref()).indent()
+ DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
);
self.task_manager
diff --git a/ballista/scheduler/src/test_utils.rs
b/ballista/scheduler/src/test_utils.rs
index fe274688..051b41ad 100644
--- a/ballista/scheduler/src/test_utils.rs
+++ b/ballista/scheduler/src/test_utils.rs
@@ -813,7 +813,10 @@ pub async fn test_aggregation_plan_with_job_id(
.await
.unwrap();
- println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+ println!(
+ "{}",
+ DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+ );
ExecutionGraph::new("localhost:50050", job_id, "", "session", plan,
0).unwrap()
}
@@ -845,7 +848,10 @@ pub async fn test_two_aggregations_plan(partition: usize)
-> ExecutionGraph {
.await
.unwrap();
- println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+ println!(
+ "{}",
+ DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+ );
ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap()
}
@@ -917,7 +923,10 @@ pub async fn test_join_plan(partition: usize) ->
ExecutionGraph {
.await
.unwrap();
- println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+ println!(
+ "{}",
+ DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+ );
let graph =
ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap();
@@ -946,7 +955,10 @@ pub async fn test_union_all_plan(partition: usize) ->
ExecutionGraph {
.await
.unwrap();
- println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+ println!(
+ "{}",
+ DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+ );
let graph =
ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap();
@@ -975,7 +987,10 @@ pub async fn test_union_plan(partition: usize) ->
ExecutionGraph {
.await
.unwrap();
- println!("{}", DisplayableExecutionPlan::new(plan.as_ref()).indent());
+ println!(
+ "{}",
+ DisplayableExecutionPlan::new(plan.as_ref()).indent(false)
+ );
let graph =
ExecutionGraph::new("localhost:50050", "job", "", "session", plan,
0).unwrap();
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 1a58714f..b2ffb1ba 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -712,7 +712,7 @@ async fn execute_query(
if debug {
println!(
"=== Physical plan ===\n{}\n",
- displayable(physical_plan.as_ref()).indent()
+ displayable(physical_plan.as_ref()).indent(false)
);
}
let task_ctx = ctx.task_ctx();
@@ -720,7 +720,7 @@ async fn execute_query(
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
-
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent()
+
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent(false)
);
if !result.is_empty() {
pretty::print_batches(&result)?;
@@ -1696,8 +1696,8 @@ mod ballista_round_trip {
)
.unwrap();
assert_eq!(
- format!("{}", displayable(physical_plan.as_ref()).indent()),
- format!("{}", displayable(round_trip.as_ref()).indent()),
+ format!("{}",
displayable(physical_plan.as_ref()).indent(false)),
+ format!("{}", displayable(round_trip.as_ref()).indent(false)),
"physical plan round trip failed"
);
}