This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new d1aff3ea Make Ballista compatible with Datafusion 37.0.0 (from 36.0.0)
(#1031)
d1aff3ea is described below
commit d1aff3eaef0d7d892336781a2a684ce822ede2e3
Author: Raphaƫl Marinier <[email protected]>
AuthorDate: Thu Jun 27 22:36:12 2024 +0200
Make Ballista compatible with Datafusion 37.0.0 (from 36.0.0) (#1031)
* Upgrading Ballista to datafusion 37.0.0.
* Better test debugging information in planner.rs
* Updated test logic in planner.
Since datafusion's https://github.com/apache/datafusion/pull/9236,
HashJoinExec can also project.
* cargo fmt
* cargo fix
* Removed leftover comment
* Make cargo clippy happy
* lint
* Cargo fmt
* Fix tpch build
* Fix comment spelling
* cargo fmt
---
Cargo.toml | 18 +++---
ballista/client/src/context.rs | 5 +-
ballista/core/src/client.rs | 4 +-
.../core/src/execution_plans/distributed_query.rs | 32 +++++++---
.../core/src/execution_plans/shuffle_reader.rs | 50 +++++++--------
.../core/src/execution_plans/shuffle_writer.rs | 34 +++++-----
.../core/src/execution_plans/unresolved_shuffle.rs | 25 ++++----
ballista/core/src/serde/mod.rs | 13 ++--
ballista/core/src/serde/scheduler/from_proto.rs | 5 +-
ballista/core/src/serde/scheduler/to_proto.rs | 4 +-
ballista/executor/src/collect.rs | 27 ++++----
ballista/executor/src/executor.rs | 39 +++++++-----
ballista/executor/src/flight_service.rs | 9 ++-
ballista/scheduler/src/cluster/mod.rs | 6 +-
ballista/scheduler/src/flight_sql.rs | 43 +++++++------
ballista/scheduler/src/planner.rs | 73 +++++++++++++---------
ballista/scheduler/src/state/execution_graph.rs | 2 +-
.../src/state/execution_graph/execution_stage.rs | 2 +-
.../scheduler/src/state/execution_graph_dot.rs | 12 ++--
ballista/scheduler/src/state/mod.rs | 4 +-
benchmarks/src/bin/tpch.rs | 14 +++--
21 files changed, 243 insertions(+), 178 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 13984c0e..e73f5ce7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,18 +30,18 @@ exclude = [ "python" ]
resolver = "2"
[workspace.dependencies]
-arrow = { version = "50.0.0", features=["ipc_compression"] }
-arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "50.0.0", default-features = false }
+arrow = { version = "51.0.0", features=["ipc_compression"] }
+arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "51.0.0", default-features = false }
configure_me = { version = "0.4.0" }
configure_me_codegen = { version = "0.4.4" }
-datafusion = "36.0.0"
-datafusion-cli = "36.0.0"
-datafusion-proto = "36.0.0"
+datafusion = "37.0.0"
+datafusion-cli = "37.0.0"
+datafusion-proto = "37.0.0"
object_store = "0.9.0"
-sqlparser = "0.43.0"
-tonic = { version = "0.10" }
-tonic-build = { version = "0.10", default-features = false, features = [
+sqlparser = "0.44.0"
+tonic = { version = "0.11" }
+tonic-build = { version = "0.11", default-features = false, features = [
"transport",
"prost"
] }
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 5ed77cd4..293c69e4 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -478,9 +478,9 @@ impl BallistaContext {
#[cfg(feature = "standalone")]
mod standalone_tests {
use ballista_core::error::Result;
+ use datafusion::config::TableParquetOptions;
use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::listing::ListingTableUrl;
- use datafusion::parquet::file::properties::WriterProperties;
use tempfile::TempDir;
#[tokio::test]
@@ -507,7 +507,7 @@ mod standalone_tests {
df.write_parquet(
&file_path,
DataFrameWriteOptions::default(),
- Some(WriterProperties::default()),
+ Some(TableParquetOptions::default()),
)
.await?;
Ok(())
@@ -662,7 +662,6 @@ mod standalone_tests {
collect_stat: x.collect_stat,
target_partitions: x.target_partitions,
file_sort_order: vec![],
- file_type_write_options: None,
};
let table_paths = listing_table
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index d19e6766..2c99569b 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -28,6 +28,7 @@ use std::{
use crate::error::{BallistaError, Result};
use crate::serde::scheduler::{Action, PartitionId};
+use arrow_flight;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
@@ -137,7 +138,6 @@ impl BallistaClient {
ticket: buf.clone().into(),
});
let result = self.flight_client.do_get(request).await;
-
let res = match result {
Ok(res) => res,
Err(ref err) => {
@@ -156,11 +156,11 @@ impl BallistaClient {
};
let mut stream = res.into_inner();
+
match stream.message().await {
Ok(res) => {
return match res {
Some(flight_data) => {
- // convert FlightData to a stream
let schema =
Arc::new(Schema::try_from(&flight_data)?);
// all the remaining stream messages should be
dictionary and record batches
diff --git a/ballista/core/src/execution_plans/distributed_query.rs
b/ballista/core/src/execution_plans/distributed_query.rs
index afde8e34..2f12a674 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -30,11 +30,11 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::LogicalPlan;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
+use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
+ PlanProperties, SendableRecordBatchStream, Statistics,
};
use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -65,6 +65,7 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
plan_repr: PhantomData<T>,
/// Session id
session_id: String,
+ properties: PlanProperties,
}
impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
@@ -74,6 +75,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
plan: LogicalPlan,
session_id: String,
) -> Self {
+ let properties =
Self::compute_properties(plan.schema().as_ref().clone().into());
Self {
scheduler_url,
config,
@@ -81,6 +83,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
plan_repr: PhantomData,
session_id,
+ properties,
}
}
@@ -91,6 +94,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
extension_codec: Arc<dyn LogicalExtensionCodec>,
session_id: String,
) -> Self {
+ let properties =
Self::compute_properties(plan.schema().as_ref().clone().into());
Self {
scheduler_url,
config,
@@ -98,6 +102,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
extension_codec,
plan_repr: PhantomData,
session_id,
+ properties,
}
}
@@ -109,6 +114,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
plan_repr: PhantomData<T>,
session_id: String,
) -> Self {
+ let properties =
Self::compute_properties(plan.schema().as_ref().clone().into());
Self {
scheduler_url,
config,
@@ -116,8 +122,17 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
extension_codec,
plan_repr,
session_id,
+ properties,
}
}
+
+ fn compute_properties(schema: SchemaRef) -> PlanProperties {
+ PlanProperties::new(
+ EquivalenceProperties::new(schema),
+ Partitioning::UnknownPartitioning(1),
+ ExecutionMode::Bounded,
+ )
+ }
}
impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
@@ -147,12 +162,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
self.plan.schema().as_ref().clone().into()
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(1)
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -170,6 +181,9 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for
DistributedQueryExec<T> {
extension_codec: self.extension_codec.clone(),
plan_repr: self.plan_repr,
session_id: self.session_id.clone(),
+ properties: Self::compute_properties(
+ self.plan.schema().as_ref().clone().into(),
+ ),
}))
}
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 491e4d05..babd8f64 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -34,26 +34,24 @@ use crate::serde::scheduler::{PartitionLocation,
PartitionStats};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::error::ArrowError;
use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::common::runtime::SpawnedTask;
use datafusion::error::Result;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
+ PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use futures::{Stream, StreamExt, TryStreamExt};
use crate::error::BallistaError;
use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::common::AbortOnDropMany;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use itertools::Itertools;
use log::{error, info};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::sync::{mpsc, Semaphore};
-use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
/// ShuffleReaderExec reads partitions that have already been materialized by
a ShuffleWriterExec
@@ -67,6 +65,7 @@ pub struct ShuffleReaderExec {
pub partition: Vec<Vec<PartitionLocation>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ properties: PlanProperties,
}
impl ShuffleReaderExec {
@@ -76,11 +75,19 @@ impl ShuffleReaderExec {
partition: Vec<Vec<PartitionLocation>>,
schema: SchemaRef,
) -> Result<Self> {
+ let properties = PlanProperties::new(
+
datafusion::physical_expr::EquivalenceProperties::new(schema.clone()),
+ // TODO partitioning may be known and could be populated here
+ // see https://github.com/apache/arrow-datafusion/issues/758
+ Partitioning::UnknownPartitioning(partition.len()),
+ datafusion::physical_plan::ExecutionMode::Bounded,
+ );
Ok(Self {
stage_id,
schema,
partition,
metrics: ExecutionPlanMetricsSet::new(),
+ properties,
})
}
}
@@ -108,16 +115,9 @@ impl ExecutionPlan for ShuffleReaderExec {
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- // TODO partitioning may be known and could be populated here
- // see https://github.com/apache/arrow-datafusion/issues/758
- Partitioning::UnknownPartitioning(self.partition.len())
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
}
-
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
@@ -244,7 +244,7 @@ struct AbortableReceiverStream {
inner: ReceiverStream<result::Result<SendableRecordBatchStream,
BallistaError>>,
#[allow(dead_code)]
- drop_helper: AbortOnDropMany<()>,
+ drop_helper: Vec<SpawnedTask<()>>,
}
impl AbortableReceiverStream {
@@ -253,12 +253,12 @@ impl AbortableReceiverStream {
rx: tokio::sync::mpsc::Receiver<
result::Result<SendableRecordBatchStream, BallistaError>,
>,
- join_handles: Vec<JoinHandle<()>>,
+ spawned_tasks: Vec<SpawnedTask<()>>,
) -> AbortableReceiverStream {
let inner = ReceiverStream::new(rx);
Self {
inner,
- drop_helper: AbortOnDropMany(join_handles),
+ drop_helper: spawned_tasks,
}
}
}
@@ -282,7 +282,7 @@ fn send_fetch_partitions(
) -> AbortableReceiverStream {
let (response_sender, response_receiver) = mpsc::channel(max_request_num);
let semaphore = Arc::new(Semaphore::new(max_request_num));
- let mut join_handles = vec![];
+ let mut spawned_tasks: Vec<SpawnedTask<()>> = vec![];
let (local_locations, remote_locations): (Vec<_>, Vec<_>) =
partition_locations
.into_iter()
.partition(check_is_local_location);
@@ -295,34 +295,32 @@ fn send_fetch_partitions(
// keep local shuffle files reading in serial order for memory control.
let response_sender_c = response_sender.clone();
- let join_handle = tokio::spawn(async move {
+ spawned_tasks.push(SpawnedTask::spawn(async move {
for p in local_locations {
let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
if let Err(e) = response_sender_c.send(r).await {
error!("Fail to send response event to the channel due to {}",
e);
}
}
- });
- join_handles.push(join_handle);
+ }));
for p in remote_locations.into_iter() {
let semaphore = semaphore.clone();
let response_sender = response_sender.clone();
- let join_handle = tokio::spawn(async move {
- // Block if exceeds max request number
+ spawned_tasks.push(SpawnedTask::spawn(async move {
+ // Block if exceeds max request number.
let permit = semaphore.acquire_owned().await.unwrap();
let r =
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
- // Block if the channel buffer is ful
+ // Block if the channel buffer is full.
if let Err(e) = response_sender.send(r).await {
error!("Fail to send response event to the channel due to {}",
e);
}
// Increase semaphore by dropping existing permits.
drop(permit);
- });
- join_handles.push(join_handle);
+ }));
}
- AbortableReceiverStream::create(response_receiver, join_handles)
+ AbortableReceiverStream::create(response_receiver, spawned_tasks)
}
fn check_is_local_location(location: &PartitionLocation) -> bool {
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs
b/ballista/core/src/execution_plans/shuffle_writer.rs
index b9e25ed5..8c034194 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -22,7 +22,6 @@
use datafusion::arrow::ipc::writer::IpcWriteOptions;
use datafusion::arrow::ipc::CompressionType;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::arrow::ipc::writer::StreamWriter;
use std::any::Any;
@@ -51,8 +50,8 @@ use datafusion::physical_plan::metrics::{
};
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
@@ -81,6 +80,7 @@ pub struct ShuffleWriterExec {
shuffle_output_partitioning: Option<Partitioning>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ properties: PlanProperties,
}
pub struct WriteTracker {
@@ -127,6 +127,16 @@ impl ShuffleWriterExec {
work_dir: String,
shuffle_output_partitioning: Option<Partitioning>,
) -> Result<Self> {
+ // If [`shuffle_output_partitioning`] is none, then there's no need to
do repartitioning.
+ // Therefore, the partition is the same as its input plan's.
+ let partitioning = shuffle_output_partitioning
+ .clone()
+ .unwrap_or_else(||
plan.properties().output_partitioning().clone());
+ let properties = PlanProperties::new(
+
datafusion::physical_expr::EquivalenceProperties::new(plan.schema()),
+ partitioning,
+ datafusion::physical_plan::ExecutionMode::Bounded,
+ );
Ok(Self {
job_id,
stage_id,
@@ -134,6 +144,7 @@ impl ShuffleWriterExec {
work_dir,
shuffle_output_partitioning,
metrics: ExecutionPlanMetricsSet::new(),
+ properties,
})
}
@@ -149,7 +160,10 @@ impl ShuffleWriterExec {
/// Get the input partition count
pub fn input_partition_count(&self) -> usize {
- self.plan.output_partitioning().partition_count()
+ self.plan
+ .properties()
+ .output_partitioning()
+ .partition_count()
}
/// Get the true output partitioning
@@ -349,16 +363,8 @@ impl ExecutionPlan for ShuffleWriterExec {
self.plan.schema()
}
- /// If [`shuffle_output_partitioning`] is none, then there's no need to do
repartitioning.
- /// Therefore, the partition is the same as its input plan's.
- fn output_partitioning(&self) -> Partitioning {
- self.shuffle_output_partitioning
- .clone()
- .unwrap_or_else(|| self.plan.output_partitioning())
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index fe3610c4..bb097017 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -21,10 +21,9 @@ use std::sync::Arc;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
};
/// UnresolvedShuffleExec represents a dependency on the results of a
ShuffleWriterExec node which hasn't computed yet.
@@ -41,6 +40,8 @@ pub struct UnresolvedShuffleExec {
// The partition count this node will have once it is replaced with a
ShuffleReaderExec
pub output_partition_count: usize,
+
+ properties: PlanProperties,
}
impl UnresolvedShuffleExec {
@@ -50,10 +51,18 @@ impl UnresolvedShuffleExec {
schema: SchemaRef,
output_partition_count: usize,
) -> Self {
+ let properties = PlanProperties::new(
+
datafusion::physical_expr::EquivalenceProperties::new(schema.clone()),
+ // TODO the output partition is known and should be populated here!
+ // see https://github.com/apache/arrow-datafusion/issues/758
+ Partitioning::UnknownPartitioning(output_partition_count),
+ datafusion::physical_plan::ExecutionMode::Bounded,
+ );
Self {
stage_id,
schema,
output_partition_count,
+ properties,
}
}
}
@@ -81,14 +90,8 @@ impl ExecutionPlan for UnresolvedShuffleExec {
self.schema.clone()
}
- fn output_partitioning(&self) -> Partitioning {
- // TODO the output partition is known and should be populated here!
- // see https://github.com/apache/arrow-datafusion/issues/758
- Partitioning::UnknownPartitioning(self.output_partition_count)
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index e12dec8a..64edec41 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -181,12 +181,11 @@ impl PhysicalExtensionCodec for
BallistaPhysicalExtensionCodec {
}
PhysicalPlanType::UnresolvedShuffle(unresolved_shuffle) => {
let schema =
Arc::new(convert_required!(unresolved_shuffle.schema)?);
- Ok(Arc::new(UnresolvedShuffleExec {
- stage_id: unresolved_shuffle.stage_id as usize,
+ Ok(Arc::new(UnresolvedShuffleExec::new(
+ unresolved_shuffle.stage_id as usize,
schema,
- output_partition_count:
unresolved_shuffle.output_partition_count
- as usize,
- }))
+ unresolved_shuffle.output_partition_count as usize,
+ )))
}
}
}
@@ -201,10 +200,12 @@ impl PhysicalExtensionCodec for
BallistaPhysicalExtensionCodec {
// to get the true output partitioning
let output_partitioning = match exec.shuffle_output_partitioning()
{
Some(Partitioning::Hash(exprs, partition_count)) => {
+ let default_codec =
+
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {};
Some(datafusion_proto::protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
- .map(|expr| expr.clone().try_into())
+
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
&default_codec))
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
})
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs
b/ballista/core/src/serde/scheduler/from_proto.rs
index a38578cd..65991980 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -16,7 +16,7 @@
// under the License.
use chrono::{TimeZone, Utc};
-use datafusion::common::tree_node::{Transformed, TreeNode};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
use datafusion::physical_plan::metrics::{
@@ -417,7 +417,8 @@ fn reset_metrics_for_execution_plan(
) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
plan.transform(&|plan| {
let children = plan.children().clone();
- plan.with_new_children(children).map(Transformed::Yes)
+ plan.with_new_children(children).map(Transformed::yes)
})
+ .data()
.map_err(BallistaError::DataFusionError)
}
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs
b/ballista/core/src/serde/scheduler/to_proto.rs
index 82977e3b..8159aab6 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -101,10 +101,12 @@ pub fn hash_partitioning_to_proto(
) -> Result<Option<datafusion_protobuf::PhysicalHashRepartition>,
BallistaError> {
match output_partitioning {
Some(Partitioning::Hash(exprs, partition_count)) => {
+ let default_codec =
+ datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec
{};
Ok(Some(datafusion_protobuf::PhysicalHashRepartition {
hash_expr: exprs
.iter()
- .map(|expr| expr.clone().try_into())
+
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
&default_codec))
.collect::<Result<Vec<_>, DataFusionError>>()?,
partition_count: *partition_count as u64,
}))
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 22567bca..4456e9c5 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -25,10 +25,9 @@ use std::{any::Any, pin::Pin};
use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream,
- Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+ SendableRecordBatchStream, Statistics,
};
use datafusion::{error::Result, physical_plan::RecordBatchStream};
use futures::stream::SelectAll;
@@ -39,11 +38,17 @@ use futures::Stream;
#[derive(Debug, Clone)]
pub struct CollectExec {
plan: Arc<dyn ExecutionPlan>,
+ properties: PlanProperties,
}
impl CollectExec {
pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
- Self { plan }
+ let properties = PlanProperties::new(
+
datafusion::physical_expr::EquivalenceProperties::new(plan.schema()),
+ Partitioning::UnknownPartitioning(1),
+ datafusion::physical_plan::ExecutionMode::Bounded,
+ );
+ Self { plan, properties }
}
}
@@ -70,12 +75,8 @@ impl ExecutionPlan for CollectExec {
self.plan.schema()
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(1)
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -95,7 +96,11 @@ impl ExecutionPlan for CollectExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(0, partition);
- let num_partitions = self.plan.output_partitioning().partition_count();
+ let num_partitions = self
+ .plan
+ .properties()
+ .output_partitioning()
+ .partition_count();
let streams = (0..num_partitions)
.map(|i| self.plan.execute(i, context.clone()))
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index 60db9f00..83caf398 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -200,21 +200,20 @@ impl Executor {
#[cfg(test)]
mod test {
+ use crate::execution_engine::DefaultQueryStageExec;
use crate::executor::Executor;
use crate::metrics::LoggingMetricsCollector;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::ExecutorRegistration;
- use datafusion::execution::context::TaskContext;
-
- use crate::execution_engine::DefaultQueryStageExec;
use ballista_core::serde::scheduler::PartitionId;
use datafusion::error::{DataFusionError, Result};
- use datafusion::physical_expr::PhysicalSortExpr;
+ use datafusion::execution::context::TaskContext;
+
use datafusion::physical_plan::{
- DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
RecordBatchStream,
- SendableRecordBatchStream, Statistics,
+ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties,
+ RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::SessionContext;
use futures::Stream;
@@ -247,7 +246,23 @@ mod test {
/// An ExecutionPlan which will never terminate
#[derive(Debug)]
- pub struct NeverendingOperator;
+ pub struct NeverendingOperator {
+ properties: PlanProperties,
+ }
+
+ impl NeverendingOperator {
+ fn new() -> Self {
+ NeverendingOperator {
+ properties: PlanProperties::new(
+
datafusion::physical_expr::EquivalenceProperties::new(Arc::new(
+ Schema::empty(),
+ )),
+ Partitioning::UnknownPartitioning(1),
+ datafusion::physical_plan::ExecutionMode::Bounded,
+ ),
+ }
+ }
+ }
impl DisplayAs for NeverendingOperator {
fn fmt_as(
@@ -272,12 +287,8 @@ mod test {
Arc::new(Schema::empty())
}
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(1)
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- None
+ fn properties(&self) -> &PlanProperties {
+ &self.properties
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -316,7 +327,7 @@ mod test {
let shuffle_write = ShuffleWriterExec::try_new(
"job-id".to_owned(),
1,
- Arc::new(NeverendingOperator),
+ Arc::new(NeverendingOperator::new()),
work_dir.clone(),
None,
)
diff --git a/ballista/executor/src/flight_service.rs
b/ballista/executor/src/flight_service.rs
index c4ffab4f..43387f09 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -33,7 +33,7 @@ use arrow::ipc::writer::IpcWriteOptions;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse,
- PutResult, SchemaResult, Ticket,
+ PollInfo, PutResult, SchemaResult, Ticket,
};
use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
use futures::{Stream, StreamExt, TryStreamExt};
@@ -203,6 +203,13 @@ impl FlightService for BallistaFlightService {
) -> Result<Response<Self::DoExchangeStream>, Status> {
Err(Status::unimplemented("do_exchange"))
}
+
+ async fn poll_flight_info(
+ &self,
+ _request: Request<FlightDescriptor>,
+ ) -> Result<Response<PollInfo>, Status> {
+ Err(Status::unimplemented("poll_flight_info"))
+ }
}
fn read_partition<T>(
diff --git a/ballista/scheduler/src/cluster/mod.rs
b/ballista/scheduler/src/cluster/mod.rs
index f7e1ca46..71e81c90 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use clap::ArgEnum;
use datafusion::common::tree_node::TreeNode;
-use datafusion::common::tree_node::VisitRecursion;
+use datafusion::common::tree_node::TreeNodeRecursion;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{AvroExec, CsvExec, NdJsonExec,
ParquetExec};
use datafusion::error::DataFusionError;
@@ -701,11 +701,11 @@ pub(crate) fn get_scan_files(
} else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
csv_exec.base_config().file_groups.clone()
} else {
- return Ok(VisitRecursion::Continue);
+ return Ok(TreeNodeRecursion::Continue);
};
collector.push(file_groups);
- Ok(VisitRecursion::Skip)
+ Ok(TreeNodeRecursion::Jump)
})?;
Ok(collector)
}
diff --git a/ballista/scheduler/src/flight_sql.rs
b/ballista/scheduler/src/flight_sql.rs
index 62c96966..24ede23b 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -33,7 +33,7 @@ use arrow_flight::sql::{
};
use arrow_flight::{
Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
HandshakeRequest,
- HandshakeResponse, Location, Ticket,
+ HandshakeResponse, Ticket,
};
use base64::Engine;
use futures::Stream;
@@ -69,6 +69,7 @@ use datafusion::common::DFSchemaRef;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+use prost::bytes::Bytes;
use prost::Message;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::sleep;
@@ -93,7 +94,6 @@ impl FlightSqlServiceImpl {
}
}
- #[allow(deprecated)]
fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch,
ArrowError> {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, true),
@@ -101,9 +101,21 @@ impl FlightSqlServiceImpl {
Field::new("table_name", DataType::Utf8, false),
Field::new("table_type", DataType::Utf8, false),
]));
- let tables = ctx.tables()?; // resolved in #501
- let names: Vec<_> = tables.iter().map(|it|
Some(it.as_str())).collect();
- let types: Vec<_> = names.iter().map(|_| Some("TABLE")).collect();
+ let mut names: Vec<Option<String>> = vec![];
+ for catalog_name in ctx.catalog_names() {
+ let catalog = ctx
+ .catalog(&catalog_name)
+ .expect("catalog should have been found");
+ for schema_name in catalog.schema_names() {
+ let schema = catalog
+ .schema(&schema_name)
+ .expect("schema should have been found");
+ for table_name in schema.table_names() {
+ names.push(Some(table_name));
+ }
+ }
+ }
+ let types: Vec<_> = names.iter().map(|_|
Some("TABLE".to_string())).collect();
let cats: Vec<_> = names.iter().map(|_| None).collect();
let schemas: Vec<_> = names.iter().map(|_| None).collect();
let rb = RecordBatch::try_new(
@@ -290,15 +302,11 @@ impl FlightSqlServiceImpl {
Err(Status::internal("Error getting stats".to_string()))?
}
let authority = format!("{}:{}", &host, &port);
- let loc = Location {
- uri: format!("grpc+tcp://{authority}"),
- };
let buf = fetch.as_any().encode_to_vec();
let ticket = Ticket { ticket: buf.into() };
- let fiep = FlightEndpoint {
- ticket: Some(ticket),
- location: vec![loc],
- };
+ let fiep = FlightEndpoint::new()
+ .with_ticket(ticket)
+ .with_location(format!("grpc+tcp://{authority}"));
fieps.push(fiep);
}
Ok(fieps)
@@ -319,15 +327,11 @@ impl FlightSqlServiceImpl {
settings: vec![],
};
let authority = format!("{}:{}", &host, &port); // TODO: use advertise
host
- let loc = Location {
- uri: format!("grpc+tcp://{authority}"),
- };
let buf = fetch.as_any().encode_to_vec();
let ticket = Ticket { ticket: buf.into() };
- let fiep = FlightEndpoint {
- ticket: Some(ticket),
- location: vec![loc],
- };
+ let fiep = FlightEndpoint::new()
+ .with_ticket(ticket)
+ .with_location(format!("grpc+tcp://{authority}"));
let fieps = vec![fiep];
Ok(fieps)
}
@@ -406,6 +410,7 @@ impl FlightSqlServiceImpl {
total_records: num_rows,
total_bytes: num_bytes,
ordered: false,
+ app_metadata: Bytes::new(),
};
Response::new(info)
}
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index 3e8bdc6c..dfaa00aa 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -109,8 +109,7 @@ impl DistributedPlanner {
let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((
- with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?
- .into(),
+ with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?,
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
@@ -126,14 +125,13 @@ impl DistributedPlanner {
let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((
- with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?
- .into(),
+ with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?,
stages,
))
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
- match repart.output_partitioning() {
+ match repart.properties().output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
@@ -158,7 +156,7 @@ impl DistributedPlanner {
)))
} else {
Ok((
- with_new_children_if_necessary(execution_plan,
children)?.into(),
+ with_new_children_if_necessary(execution_plan, children)?,
stages,
))
}
@@ -177,7 +175,10 @@ fn create_unresolved_shuffle(
Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
- shuffle_writer.output_partitioning().partition_count(),
+ shuffle_writer
+ .properties()
+ .output_partitioning()
+ .partition_count(),
))
}
@@ -250,7 +251,7 @@ pub fn remove_unresolved_shuffles(
new_children.push(remove_unresolved_shuffles(child,
partition_locations)?);
}
}
- Ok(with_new_children_if_necessary(stage, new_children)?.into())
+ Ok(with_new_children_if_necessary(stage, new_children)?)
}
/// Rollback the ShuffleReaderExec to UnresolvedShuffleExec.
@@ -262,8 +263,10 @@ pub fn rollback_resolved_shuffles(
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
for child in stage.children() {
if let Some(shuffle_reader) =
child.as_any().downcast_ref::<ShuffleReaderExec>() {
- let output_partition_count =
- shuffle_reader.output_partitioning().partition_count();
+ let output_partition_count = shuffle_reader
+ .properties()
+ .output_partitioning()
+ .partition_count();
let stage_id = shuffle_reader.stage_id;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
@@ -276,7 +279,7 @@ pub fn rollback_resolved_shuffles(
new_children.push(rollback_resolved_shuffles(child)?);
}
}
- Ok(with_new_children_if_necessary(stage, new_children)?.into())
+ Ok(with_new_children_if_necessary(stage, new_children)?)
}
fn create_shuffle_writer(
@@ -318,7 +321,11 @@ mod test {
macro_rules! downcast_exec {
($exec: expr, $ty: ty) => {
- $exec.as_any().downcast_ref::<$ty>().unwrap()
+ $exec.as_any().downcast_ref::<$ty>().expect(&format!(
+ "Downcast to {} failed. Got {:?}",
+ stringify!($ty),
+ $exec
+ ))
};
}
@@ -344,8 +351,8 @@ mod test {
let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
- for stage in &stages {
- println!("{}", displayable(stage.as_ref()).indent(false));
+ for (i, stage) in stages.iter().enumerate() {
+ println!("Stage {i}:\n{}",
displayable(stage.as_ref()).indent(false));
}
/* Expected result:
@@ -450,8 +457,8 @@ order by
let mut planner = DistributedPlanner::new();
let job_uuid = Uuid::new_v4();
let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
- for stage in &stages {
- println!("{}", displayable(stage.as_ref()).indent(false));
+ for (i, stage) in stages.iter().enumerate() {
+ println!("Stage {i}:\n{}",
displayable(stage.as_ref()).indent(false));
}
/* Expected result:
@@ -467,13 +474,12 @@ order by
ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0
}], 2))
AggregateExec: mode=Partial, gby=[l_shipmode@0 as l_shipmode],
aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR
orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END),
SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND
orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
- ProjectionExec: expr=[l_shipmode@1 as l_shipmode,
o_orderpriority@3 as o_orderpriority]
- CoalesceBatchesExec: target_batch_size=8192
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column {
name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
- CoalesceBatchesExec: target_batch_size=8192
- UnresolvedShuffleExec
- CoalesceBatchesExec: target_batch_size=8192
- UnresolvedShuffleExec
+ CoalesceBatchesExec: target_batch_size=8192
+ HashJoinExec: mode=Partitioned, join_type=Inner,
on=[(l_orderkey@0, o_orderkey@0)], projection=[l_shipmode@1, o_orderpriority@3]
+ CoalesceBatchesExec: target_batch_size=8192
+ UnresolvedShuffleExec
+ CoalesceBatchesExec: target_batch_size=8192
+ UnresolvedShuffleExec
ShuffleWriterExec: None
SortExec: expr=[l_shipmode@0 ASC NULLS LAST]
@@ -495,6 +501,7 @@ order by
assert_eq!(
2,
stages[0].children()[0]
+ .properties()
.output_partitioning()
.partition_count()
);
@@ -502,7 +509,7 @@ order by
2,
stages[0]
.shuffle_output_partitioning()
- .unwrap()
+ .expect("stage 0")
.partition_count()
);
@@ -510,6 +517,7 @@ order by
assert_eq!(
1,
stages[1].children()[0]
+ .properties()
.output_partitioning()
.partition_count()
);
@@ -517,31 +525,32 @@ order by
2,
stages[1]
.shuffle_output_partitioning()
- .unwrap()
+ .expect("stage 1")
.partition_count()
);
// join and partial hash aggregate
let input = stages[2].children()[0].clone();
- assert_eq!(2, input.output_partitioning().partition_count());
+ assert_eq!(
+ 2,
+ input.properties().output_partitioning().partition_count()
+ );
assert_eq!(
2,
stages[2]
.shuffle_output_partitioning()
- .unwrap()
+ .expect("stage 2")
.partition_count()
);
let hash_agg = downcast_exec!(input, AggregateExec);
- let projection = hash_agg.children()[0].clone();
- let projection = downcast_exec!(projection, ProjectionExec);
-
- let coalesce_batches = projection.children()[0].clone();
+ let coalesce_batches = hash_agg.children()[0].clone();
let coalesce_batches = downcast_exec!(coalesce_batches,
CoalesceBatchesExec);
let join = coalesce_batches.children()[0].clone();
let join = downcast_exec!(join, HashJoinExec);
+ assert!(join.contain_projection());
let join_input_1 = join.children()[0].clone();
// skip CoalesceBatches
@@ -561,6 +570,7 @@ order by
assert_eq!(
2,
stages[3].children()[0]
+ .properties()
.output_partitioning()
.partition_count()
);
@@ -570,6 +580,7 @@ order by
assert_eq!(
1,
stages[4].children()[0]
+ .properties()
.output_partitioning()
.partition_count()
);
diff --git a/ballista/scheduler/src/state/execution_graph.rs
b/ballista/scheduler/src/state/execution_graph.rs
index 70b2659f..9ee95d67 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -151,7 +151,7 @@ impl ExecutionGraph {
) -> Result<Self> {
let mut planner = DistributedPlanner::new();
- let output_partitions = plan.output_partitioning().partition_count();
+ let output_partitions =
plan.properties().output_partitioning().partition_count();
let shuffle_stages = planner.plan_query_stages(job_id, plan)?;
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index f082fe43..8aded3e0 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -1176,7 +1176,7 @@ fn get_stage_partitions(plan: Arc<dyn ExecutionPlan>) ->
usize {
plan.as_any()
.downcast_ref::<ShuffleWriterExec>()
.map(|shuffle_writer| shuffle_writer.input_partition_count())
- .unwrap_or_else(|| plan.output_partitioning().partition_count())
+ .unwrap_or_else(||
plan.properties().output_partitioning().partition_count())
}
/// This data structure collects the partition locations for an
`ExecutionStage`.
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 0cd6837f..3a6dce7a 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -281,12 +281,12 @@ aggr=[{}]",
} else if let Some(exec) =
plan.as_any().downcast_ref::<CoalescePartitionsExec>() {
format!(
"CoalescePartitions [{}]",
- format_partitioning(exec.output_partitioning())
+
format_partitioning(exec.properties().output_partitioning().clone())
)
} else if let Some(exec) = plan.as_any().downcast_ref::<RepartitionExec>()
{
format!(
"RepartitionExec [{}]",
- format_partitioning(exec.output_partitioning())
+
format_partitioning(exec.properties().output_partitioning().clone())
)
} else if let Some(exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
let join_expr = exec
@@ -323,24 +323,24 @@ filter_expr={}",
} else if plan.as_any().downcast_ref::<MemoryExec>().is_some() {
"MemoryExec".to_string()
} else if let Some(exec) = plan.as_any().downcast_ref::<CsvExec>() {
- let parts = exec.output_partitioning().partition_count();
+ let parts = exec.properties().output_partitioning().partition_count();
format!(
"CSV: {} [{} partitions]",
get_file_scan(exec.base_config()),
parts
)
} else if let Some(exec) = plan.as_any().downcast_ref::<NdJsonExec>() {
- let parts = exec.output_partitioning().partition_count();
+ let parts = exec.properties().output_partitioning().partition_count();
format!("JSON [{parts} partitions]")
} else if let Some(exec) = plan.as_any().downcast_ref::<AvroExec>() {
- let parts = exec.output_partitioning().partition_count();
+ let parts = exec.properties().output_partitioning().partition_count();
format!(
"Avro: {} [{} partitions]",
get_file_scan(exec.base_config()),
parts
)
} else if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
- let parts = exec.output_partitioning().partition_count();
+ let parts = exec.properties().output_partitioning().partition_count();
format!(
"Parquet: {} [{} partitions]",
get_file_scan(exec.base_config()),
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index ba8647ee..90c9f6f3 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::common::tree_node::{TreeNode, VisitRecursion};
+use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
use datafusion::datasource::source_as_provider;
use datafusion::error::DataFusionError;
@@ -399,7 +399,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
}
}
}
- Ok(VisitRecursion::Continue)
+ Ok(TreeNodeRecursion::Continue)
})?;
let plan = session_ctx.state().create_physical_plan(plan).await?;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index c0719175..01dcade1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -827,7 +827,7 @@ async fn get_table(
}
"parquet" => {
let path = format!("{path}/{table}");
- let format =
ParquetFormat::default().with_enable_pruning(Some(true));
+ let format =
ParquetFormat::default().with_enable_pruning(true);
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
@@ -844,7 +844,6 @@ async fn get_table(
collect_stat: true,
table_partition_cols: vec![],
file_sort_order: vec![],
- file_type_write_options: None,
};
let url = ListingTableUrl::parse(path)?;
@@ -1042,7 +1041,7 @@ async fn get_expected_results(n: usize, path: &str) ->
Result<Vec<RecordBatch>>
// there's no support for casting from Utf8 to
Decimal, so
// we'll cast from Utf8 to Float64 to Decimal for
Decimal types
let inner_cast = Box::new(Expr::Cast(Cast::new(
- Box::new(trim(col(Field::name(field)))),
+ Box::new(trim(vec![col(Field::name(field))])),
DataType::Float64,
)));
Expr::Cast(Cast::new(
@@ -1052,7 +1051,7 @@ async fn get_expected_results(n: usize, path: &str) ->
Result<Vec<RecordBatch>>
.alias(Field::name(field))
}
_ => Expr::Cast(Cast::new(
- Box::new(trim(col(Field::name(field)))),
+ Box::new(trim(vec![col(Field::name(field))])),
Field::data_type(field).to_owned(),
))
.alias(Field::name(field)),
@@ -1577,6 +1576,7 @@ mod tests {
mod ballista_round_trip {
use super::*;
use ballista_core::serde::BallistaCodec;
+ use datafusion::config::TableOptions;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::execution::options::ReadOptions;
use datafusion::physical_plan::ExecutionPlan;
@@ -1609,7 +1609,8 @@ mod ballista_round_trip {
.has_header(false)
.file_extension(".tbl");
let cfg = SessionConfig::new();
- let listing_options = options.to_listing_options(&cfg);
+ let listing_options =
+ options.to_listing_options(&cfg, TableOptions::default());
let config = ListingTableConfig::new(path.clone())
.with_listing_options(listing_options)
.with_schema(Arc::new(schema));
@@ -1665,7 +1666,8 @@ mod ballista_round_trip {
.has_header(false)
.file_extension(".tbl");
let cfg = SessionConfig::new();
- let listing_options = options.to_listing_options(&cfg);
+ let listing_options =
+ options.to_listing_options(&cfg, TableOptions::default());
let config = ListingTableConfig::new(path.clone())
.with_listing_options(listing_options)
.with_schema(Arc::new(schema));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]