This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new d1aff3ea Make Ballista compatible with Datafusion 37.0.0 (from 36.0.0) 
(#1031)
d1aff3ea is described below

commit d1aff3eaef0d7d892336781a2a684ce822ede2e3
Author: RaphaĆ«l Marinier <[email protected]>
AuthorDate: Thu Jun 27 22:36:12 2024 +0200

    Make Ballista compatible with Datafusion 37.0.0 (from 36.0.0) (#1031)
    
    * Upgrading Ballista to datafusion 37.0.0.
    
    * Better test debugging information in planner.rs
    
    * Updated test logic in planner.
    
    Since datafusion's https://github.com/apache/datafusion/pull/9236,
    HashJoinExec can also project.
    
    * cargo fmt
    
    * cargo fix
    
    * Removed leftover comment
    
    * Make cargo clippy happy
    
    * lint
    
    * Cargo fmt
    
    * Fix tpch build
    
    * Fix comment spelling
    
    * cargo fmt
---
 Cargo.toml                                         | 18 +++---
 ballista/client/src/context.rs                     |  5 +-
 ballista/core/src/client.rs                        |  4 +-
 .../core/src/execution_plans/distributed_query.rs  | 32 +++++++---
 .../core/src/execution_plans/shuffle_reader.rs     | 50 +++++++--------
 .../core/src/execution_plans/shuffle_writer.rs     | 34 +++++-----
 .../core/src/execution_plans/unresolved_shuffle.rs | 25 ++++----
 ballista/core/src/serde/mod.rs                     | 13 ++--
 ballista/core/src/serde/scheduler/from_proto.rs    |  5 +-
 ballista/core/src/serde/scheduler/to_proto.rs      |  4 +-
 ballista/executor/src/collect.rs                   | 27 ++++----
 ballista/executor/src/executor.rs                  | 39 +++++++-----
 ballista/executor/src/flight_service.rs            |  9 ++-
 ballista/scheduler/src/cluster/mod.rs              |  6 +-
 ballista/scheduler/src/flight_sql.rs               | 43 +++++++------
 ballista/scheduler/src/planner.rs                  | 73 +++++++++++++---------
 ballista/scheduler/src/state/execution_graph.rs    |  2 +-
 .../src/state/execution_graph/execution_stage.rs   |  2 +-
 .../scheduler/src/state/execution_graph_dot.rs     | 12 ++--
 ballista/scheduler/src/state/mod.rs                |  4 +-
 benchmarks/src/bin/tpch.rs                         | 14 +++--
 21 files changed, 243 insertions(+), 178 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 13984c0e..e73f5ce7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,18 +30,18 @@ exclude = [ "python" ]
 resolver = "2"
 
 [workspace.dependencies]
-arrow = { version = "50.0.0", features=["ipc_compression"] }
-arrow-flight = { version = "50.0.0", features = ["flight-sql-experimental"] }
-arrow-schema = { version = "50.0.0", default-features = false }
+arrow = { version = "51.0.0", features=["ipc_compression"] }
+arrow-flight = { version = "51.0.0", features = ["flight-sql-experimental"] }
+arrow-schema = { version = "51.0.0", default-features = false }
 configure_me = { version = "0.4.0" }
 configure_me_codegen = { version = "0.4.4" }
-datafusion = "36.0.0"
-datafusion-cli = "36.0.0"
-datafusion-proto = "36.0.0"
+datafusion = "37.0.0"
+datafusion-cli = "37.0.0"
+datafusion-proto = "37.0.0"
 object_store = "0.9.0"
-sqlparser = "0.43.0"
-tonic = { version = "0.10" }
-tonic-build = { version = "0.10", default-features = false, features = [
+sqlparser = "0.44.0"
+tonic = { version = "0.11" }
+tonic-build = { version = "0.11", default-features = false, features = [
     "transport",
     "prost"
 ] }
diff --git a/ballista/client/src/context.rs b/ballista/client/src/context.rs
index 5ed77cd4..293c69e4 100644
--- a/ballista/client/src/context.rs
+++ b/ballista/client/src/context.rs
@@ -478,9 +478,9 @@ impl BallistaContext {
 #[cfg(feature = "standalone")]
 mod standalone_tests {
     use ballista_core::error::Result;
+    use datafusion::config::TableParquetOptions;
     use datafusion::dataframe::DataFrameWriteOptions;
     use datafusion::datasource::listing::ListingTableUrl;
-    use datafusion::parquet::file::properties::WriterProperties;
     use tempfile::TempDir;
 
     #[tokio::test]
@@ -507,7 +507,7 @@ mod standalone_tests {
         df.write_parquet(
             &file_path,
             DataFrameWriteOptions::default(),
-            Some(WriterProperties::default()),
+            Some(TableParquetOptions::default()),
         )
         .await?;
         Ok(())
@@ -662,7 +662,6 @@ mod standalone_tests {
                         collect_stat: x.collect_stat,
                         target_partitions: x.target_partitions,
                         file_sort_order: vec![],
-                        file_type_write_options: None,
                     };
 
                     let table_paths = listing_table
diff --git a/ballista/core/src/client.rs b/ballista/core/src/client.rs
index d19e6766..2c99569b 100644
--- a/ballista/core/src/client.rs
+++ b/ballista/core/src/client.rs
@@ -28,6 +28,7 @@ use std::{
 use crate::error::{BallistaError, Result};
 use crate::serde::scheduler::{Action, PartitionId};
 
+use arrow_flight;
 use arrow_flight::utils::flight_data_to_arrow_batch;
 use arrow_flight::Ticket;
 use arrow_flight::{flight_service_client::FlightServiceClient, FlightData};
@@ -137,7 +138,6 @@ impl BallistaClient {
                 ticket: buf.clone().into(),
             });
             let result = self.flight_client.do_get(request).await;
-
             let res = match result {
                 Ok(res) => res,
                 Err(ref err) => {
@@ -156,11 +156,11 @@ impl BallistaClient {
             };
 
             let mut stream = res.into_inner();
+
             match stream.message().await {
                 Ok(res) => {
                     return match res {
                         Some(flight_data) => {
-                            // convert FlightData to a stream
                             let schema = 
Arc::new(Schema::try_from(&flight_data)?);
 
                             // all the remaining stream messages should be 
dictionary and record batches
diff --git a/ballista/core/src/execution_plans/distributed_query.rs 
b/ballista/core/src/execution_plans/distributed_query.rs
index afde8e34..2f12a674 100644
--- a/ballista/core/src/execution_plans/distributed_query.rs
+++ b/ballista/core/src/execution_plans/distributed_query.rs
@@ -30,11 +30,11 @@ use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
 use datafusion::logical_expr::LogicalPlan;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
+use datafusion::physical_expr::EquivalenceProperties;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
-    Statistics,
+    DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning,
+    PlanProperties, SendableRecordBatchStream, Statistics,
 };
 use datafusion_proto::logical_plan::{
     AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -65,6 +65,7 @@ pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
     plan_repr: PhantomData<T>,
     /// Session id
     session_id: String,
+    properties: PlanProperties,
 }
 
 impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
@@ -74,6 +75,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
         plan: LogicalPlan,
         session_id: String,
     ) -> Self {
+        let properties = 
Self::compute_properties(plan.schema().as_ref().clone().into());
         Self {
             scheduler_url,
             config,
@@ -81,6 +83,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
             extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
             plan_repr: PhantomData,
             session_id,
+            properties,
         }
     }
 
@@ -91,6 +94,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
         extension_codec: Arc<dyn LogicalExtensionCodec>,
         session_id: String,
     ) -> Self {
+        let properties = 
Self::compute_properties(plan.schema().as_ref().clone().into());
         Self {
             scheduler_url,
             config,
@@ -98,6 +102,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
             extension_codec,
             plan_repr: PhantomData,
             session_id,
+            properties,
         }
     }
 
@@ -109,6 +114,7 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
         plan_repr: PhantomData<T>,
         session_id: String,
     ) -> Self {
+        let properties = 
Self::compute_properties(plan.schema().as_ref().clone().into());
         Self {
             scheduler_url,
             config,
@@ -116,8 +122,17 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
             extension_codec,
             plan_repr,
             session_id,
+            properties,
         }
     }
+
+    fn compute_properties(schema: SchemaRef) -> PlanProperties {
+        PlanProperties::new(
+            EquivalenceProperties::new(schema),
+            Partitioning::UnknownPartitioning(1),
+            ExecutionMode::Bounded,
+        )
+    }
 }
 
 impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
@@ -147,12 +162,8 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
         self.plan.schema().as_ref().clone().into()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -170,6 +181,9 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for 
DistributedQueryExec<T> {
             extension_codec: self.extension_codec.clone(),
             plan_repr: self.plan_repr,
             session_id: self.session_id.clone(),
+            properties: Self::compute_properties(
+                self.plan.schema().as_ref().clone().into(),
+            ),
         }))
     }
 
diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs 
b/ballista/core/src/execution_plans/shuffle_reader.rs
index 491e4d05..babd8f64 100644
--- a/ballista/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/core/src/execution_plans/shuffle_reader.rs
@@ -34,26 +34,24 @@ use crate::serde::scheduler::{PartitionLocation, 
PartitionStats};
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::arrow::error::ArrowError;
 use datafusion::arrow::record_batch::RecordBatch;
+use datafusion::common::runtime::SpawnedTask;
 
 use datafusion::error::Result;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
 use datafusion::physical_plan::{
     ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
-    RecordBatchStream, SendableRecordBatchStream, Statistics,
+    PlanProperties, RecordBatchStream, SendableRecordBatchStream, Statistics,
 };
 use futures::{Stream, StreamExt, TryStreamExt};
 
 use crate::error::BallistaError;
 use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::common::AbortOnDropMany;
 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use itertools::Itertools;
 use log::{error, info};
 use rand::prelude::SliceRandom;
 use rand::thread_rng;
 use tokio::sync::{mpsc, Semaphore};
-use tokio::task::JoinHandle;
 use tokio_stream::wrappers::ReceiverStream;
 
 /// ShuffleReaderExec reads partitions that have already been materialized by 
a ShuffleWriterExec
@@ -67,6 +65,7 @@ pub struct ShuffleReaderExec {
     pub partition: Vec<Vec<PartitionLocation>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    properties: PlanProperties,
 }
 
 impl ShuffleReaderExec {
@@ -76,11 +75,19 @@ impl ShuffleReaderExec {
         partition: Vec<Vec<PartitionLocation>>,
         schema: SchemaRef,
     ) -> Result<Self> {
+        let properties = PlanProperties::new(
+            
datafusion::physical_expr::EquivalenceProperties::new(schema.clone()),
+            // TODO partitioning may be known and could be populated here
+            // see https://github.com/apache/arrow-datafusion/issues/758
+            Partitioning::UnknownPartitioning(partition.len()),
+            datafusion::physical_plan::ExecutionMode::Bounded,
+        );
         Ok(Self {
             stage_id,
             schema,
             partition,
             metrics: ExecutionPlanMetricsSet::new(),
+            properties,
         })
     }
 }
@@ -108,16 +115,9 @@ impl ExecutionPlan for ShuffleReaderExec {
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        // TODO partitioning may be known and could be populated here
-        // see https://github.com/apache/arrow-datafusion/issues/758
-        Partitioning::UnknownPartitioning(self.partition.len())
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
     }
-
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
         vec![]
     }
@@ -244,7 +244,7 @@ struct AbortableReceiverStream {
     inner: ReceiverStream<result::Result<SendableRecordBatchStream, 
BallistaError>>,
 
     #[allow(dead_code)]
-    drop_helper: AbortOnDropMany<()>,
+    drop_helper: Vec<SpawnedTask<()>>,
 }
 
 impl AbortableReceiverStream {
@@ -253,12 +253,12 @@ impl AbortableReceiverStream {
         rx: tokio::sync::mpsc::Receiver<
             result::Result<SendableRecordBatchStream, BallistaError>,
         >,
-        join_handles: Vec<JoinHandle<()>>,
+        spawned_tasks: Vec<SpawnedTask<()>>,
     ) -> AbortableReceiverStream {
         let inner = ReceiverStream::new(rx);
         Self {
             inner,
-            drop_helper: AbortOnDropMany(join_handles),
+            drop_helper: spawned_tasks,
         }
     }
 }
@@ -282,7 +282,7 @@ fn send_fetch_partitions(
 ) -> AbortableReceiverStream {
     let (response_sender, response_receiver) = mpsc::channel(max_request_num);
     let semaphore = Arc::new(Semaphore::new(max_request_num));
-    let mut join_handles = vec![];
+    let mut spawned_tasks: Vec<SpawnedTask<()>> = vec![];
     let (local_locations, remote_locations): (Vec<_>, Vec<_>) = 
partition_locations
         .into_iter()
         .partition(check_is_local_location);
@@ -295,34 +295,32 @@ fn send_fetch_partitions(
 
     // keep local shuffle files reading in serial order for memory control.
     let response_sender_c = response_sender.clone();
-    let join_handle = tokio::spawn(async move {
+    spawned_tasks.push(SpawnedTask::spawn(async move {
         for p in local_locations {
             let r = PartitionReaderEnum::Local.fetch_partition(&p).await;
             if let Err(e) = response_sender_c.send(r).await {
                 error!("Fail to send response event to the channel due to {}", 
e);
             }
         }
-    });
-    join_handles.push(join_handle);
+    }));
 
     for p in remote_locations.into_iter() {
         let semaphore = semaphore.clone();
         let response_sender = response_sender.clone();
-        let join_handle = tokio::spawn(async move {
-            // Block if exceeds max request number
+        spawned_tasks.push(SpawnedTask::spawn(async move {
+            // Block if exceeds max request number.
             let permit = semaphore.acquire_owned().await.unwrap();
             let r = 
PartitionReaderEnum::FlightRemote.fetch_partition(&p).await;
-            // Block if the channel buffer is ful
+            // Block if the channel buffer is full.
             if let Err(e) = response_sender.send(r).await {
                 error!("Fail to send response event to the channel due to {}", 
e);
             }
             // Increase semaphore by dropping existing permits.
             drop(permit);
-        });
-        join_handles.push(join_handle);
+        }));
     }
 
-    AbortableReceiverStream::create(response_receiver, join_handles)
+    AbortableReceiverStream::create(response_receiver, spawned_tasks)
 }
 
 fn check_is_local_location(location: &PartitionLocation) -> bool {
diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs 
b/ballista/core/src/execution_plans/shuffle_writer.rs
index b9e25ed5..8c034194 100644
--- a/ballista/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/core/src/execution_plans/shuffle_writer.rs
@@ -22,7 +22,6 @@
 
 use datafusion::arrow::ipc::writer::IpcWriteOptions;
 use datafusion::arrow::ipc::CompressionType;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
 
 use datafusion::arrow::ipc::writer::StreamWriter;
 use std::any::Any;
@@ -51,8 +50,8 @@ use datafusion::physical_plan::metrics::{
 };
 
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
-    Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream, Statistics,
 };
 use futures::{StreamExt, TryFutureExt, TryStreamExt};
 
@@ -81,6 +80,7 @@ pub struct ShuffleWriterExec {
     shuffle_output_partitioning: Option<Partitioning>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    properties: PlanProperties,
 }
 
 pub struct WriteTracker {
@@ -127,6 +127,16 @@ impl ShuffleWriterExec {
         work_dir: String,
         shuffle_output_partitioning: Option<Partitioning>,
     ) -> Result<Self> {
+        // If [`shuffle_output_partitioning`] is none, then there's no need to 
do repartitioning.
+        // Therefore, the partition is the same as its input plan's.
+        let partitioning = shuffle_output_partitioning
+            .clone()
+            .unwrap_or_else(|| 
plan.properties().output_partitioning().clone());
+        let properties = PlanProperties::new(
+            
datafusion::physical_expr::EquivalenceProperties::new(plan.schema()),
+            partitioning,
+            datafusion::physical_plan::ExecutionMode::Bounded,
+        );
         Ok(Self {
             job_id,
             stage_id,
@@ -134,6 +144,7 @@ impl ShuffleWriterExec {
             work_dir,
             shuffle_output_partitioning,
             metrics: ExecutionPlanMetricsSet::new(),
+            properties,
         })
     }
 
@@ -149,7 +160,10 @@ impl ShuffleWriterExec {
 
     /// Get the input partition count
     pub fn input_partition_count(&self) -> usize {
-        self.plan.output_partitioning().partition_count()
+        self.plan
+            .properties()
+            .output_partitioning()
+            .partition_count()
     }
 
     /// Get the true output partitioning
@@ -349,16 +363,8 @@ impl ExecutionPlan for ShuffleWriterExec {
         self.plan.schema()
     }
 
-    /// If [`shuffle_output_partitioning`] is none, then there's no need to do 
repartitioning.
-    /// Therefore, the partition is the same as its input plan's.
-    fn output_partitioning(&self) -> Partitioning {
-        self.shuffle_output_partitioning
-            .clone()
-            .unwrap_or_else(|| self.plan.output_partitioning())
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/ballista/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/core/src/execution_plans/unresolved_shuffle.rs
index fe3610c4..bb097017 100644
--- a/ballista/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/core/src/execution_plans/unresolved_shuffle.rs
@@ -21,10 +21,9 @@ use std::sync::Arc;
 use datafusion::arrow::datatypes::SchemaRef;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
-    Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream, Statistics,
 };
 
 /// UnresolvedShuffleExec represents a dependency on the results of a 
ShuffleWriterExec node which hasn't computed yet.
@@ -41,6 +40,8 @@ pub struct UnresolvedShuffleExec {
 
     // The partition count this node will have once it is replaced with a 
ShuffleReaderExec
     pub output_partition_count: usize,
+
+    properties: PlanProperties,
 }
 
 impl UnresolvedShuffleExec {
@@ -50,10 +51,18 @@ impl UnresolvedShuffleExec {
         schema: SchemaRef,
         output_partition_count: usize,
     ) -> Self {
+        let properties = PlanProperties::new(
+            
datafusion::physical_expr::EquivalenceProperties::new(schema.clone()),
+            // TODO the output partition is known and should be populated here!
+            // see https://github.com/apache/arrow-datafusion/issues/758
+            Partitioning::UnknownPartitioning(output_partition_count),
+            datafusion::physical_plan::ExecutionMode::Bounded,
+        );
         Self {
             stage_id,
             schema,
             output_partition_count,
+            properties,
         }
     }
 }
@@ -81,14 +90,8 @@ impl ExecutionPlan for UnresolvedShuffleExec {
         self.schema.clone()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        // TODO the output partition is known and should be populated here!
-        // see https://github.com/apache/arrow-datafusion/issues/758
-        Partitioning::UnknownPartitioning(self.output_partition_count)
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/ballista/core/src/serde/mod.rs b/ballista/core/src/serde/mod.rs
index e12dec8a..64edec41 100644
--- a/ballista/core/src/serde/mod.rs
+++ b/ballista/core/src/serde/mod.rs
@@ -181,12 +181,11 @@ impl PhysicalExtensionCodec for 
BallistaPhysicalExtensionCodec {
             }
             PhysicalPlanType::UnresolvedShuffle(unresolved_shuffle) => {
                 let schema = 
Arc::new(convert_required!(unresolved_shuffle.schema)?);
-                Ok(Arc::new(UnresolvedShuffleExec {
-                    stage_id: unresolved_shuffle.stage_id as usize,
+                Ok(Arc::new(UnresolvedShuffleExec::new(
+                    unresolved_shuffle.stage_id as usize,
                     schema,
-                    output_partition_count: 
unresolved_shuffle.output_partition_count
-                        as usize,
-                }))
+                    unresolved_shuffle.output_partition_count as usize,
+                )))
             }
         }
     }
@@ -201,10 +200,12 @@ impl PhysicalExtensionCodec for 
BallistaPhysicalExtensionCodec {
             // to get the true output partitioning
             let output_partitioning = match exec.shuffle_output_partitioning() 
{
                 Some(Partitioning::Hash(exprs, partition_count)) => {
+                    let default_codec =
+                        
datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec {};
                     Some(datafusion_proto::protobuf::PhysicalHashRepartition {
                         hash_expr: exprs
                             .iter()
-                            .map(|expr| expr.clone().try_into())
+                            
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
 &default_codec))
                             .collect::<Result<Vec<_>, DataFusionError>>()?,
                         partition_count: *partition_count as u64,
                     })
diff --git a/ballista/core/src/serde/scheduler/from_proto.rs 
b/ballista/core/src/serde/scheduler/from_proto.rs
index a38578cd..65991980 100644
--- a/ballista/core/src/serde/scheduler/from_proto.rs
+++ b/ballista/core/src/serde/scheduler/from_proto.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use chrono::{TimeZone, Utc};
-use datafusion::common::tree_node::{Transformed, TreeNode};
+use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 use datafusion::physical_plan::metrics::{
@@ -417,7 +417,8 @@ fn reset_metrics_for_execution_plan(
 ) -> Result<Arc<dyn ExecutionPlan>, BallistaError> {
     plan.transform(&|plan| {
         let children = plan.children().clone();
-        plan.with_new_children(children).map(Transformed::Yes)
+        plan.with_new_children(children).map(Transformed::yes)
     })
+    .data()
     .map_err(BallistaError::DataFusionError)
 }
diff --git a/ballista/core/src/serde/scheduler/to_proto.rs 
b/ballista/core/src/serde/scheduler/to_proto.rs
index 82977e3b..8159aab6 100644
--- a/ballista/core/src/serde/scheduler/to_proto.rs
+++ b/ballista/core/src/serde/scheduler/to_proto.rs
@@ -101,10 +101,12 @@ pub fn hash_partitioning_to_proto(
 ) -> Result<Option<datafusion_protobuf::PhysicalHashRepartition>, 
BallistaError> {
     match output_partitioning {
         Some(Partitioning::Hash(exprs, partition_count)) => {
+            let default_codec =
+                datafusion_proto::physical_plan::DefaultPhysicalExtensionCodec 
{};
             Ok(Some(datafusion_protobuf::PhysicalHashRepartition {
                 hash_expr: exprs
                     .iter()
-                    .map(|expr| expr.clone().try_into())
+                    
.map(|expr|datafusion_proto::physical_plan::to_proto::serialize_physical_expr(expr.clone(),
 &default_codec))
                     .collect::<Result<Vec<_>, DataFusionError>>()?,
                 partition_count: *partition_count as u64,
             }))
diff --git a/ballista/executor/src/collect.rs b/ballista/executor/src/collect.rs
index 22567bca..4456e9c5 100644
--- a/ballista/executor/src/collect.rs
+++ b/ballista/executor/src/collect.rs
@@ -25,10 +25,9 @@ use std::{any::Any, pin::Pin};
 use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
 use datafusion::error::DataFusionError;
 use datafusion::execution::context::TaskContext;
-use datafusion::physical_plan::expressions::PhysicalSortExpr;
 use datafusion::physical_plan::{
-    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
SendableRecordBatchStream,
-    Statistics,
+    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
+    SendableRecordBatchStream, Statistics,
 };
 use datafusion::{error::Result, physical_plan::RecordBatchStream};
 use futures::stream::SelectAll;
@@ -39,11 +38,17 @@ use futures::Stream;
 #[derive(Debug, Clone)]
 pub struct CollectExec {
     plan: Arc<dyn ExecutionPlan>,
+    properties: PlanProperties,
 }
 
 impl CollectExec {
     pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
-        Self { plan }
+        let properties = PlanProperties::new(
+            
datafusion::physical_expr::EquivalenceProperties::new(plan.schema()),
+            Partitioning::UnknownPartitioning(1),
+            datafusion::physical_plan::ExecutionMode::Bounded,
+        );
+        Self { plan, properties }
     }
 }
 
@@ -70,12 +75,8 @@ impl ExecutionPlan for CollectExec {
         self.plan.schema()
     }
 
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        None
+    fn properties(&self) -> &PlanProperties {
+        &self.properties
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -95,7 +96,11 @@ impl ExecutionPlan for CollectExec {
         context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
         assert_eq!(0, partition);
-        let num_partitions = self.plan.output_partitioning().partition_count();
+        let num_partitions = self
+            .plan
+            .properties()
+            .output_partitioning()
+            .partition_count();
 
         let streams = (0..num_partitions)
             .map(|i| self.plan.execute(i, context.clone()))
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index 60db9f00..83caf398 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -200,21 +200,20 @@ impl Executor {
 
 #[cfg(test)]
 mod test {
+    use crate::execution_engine::DefaultQueryStageExec;
     use crate::executor::Executor;
     use crate::metrics::LoggingMetricsCollector;
     use arrow::datatypes::{Schema, SchemaRef};
     use arrow::record_batch::RecordBatch;
     use ballista_core::execution_plans::ShuffleWriterExec;
     use ballista_core::serde::protobuf::ExecutorRegistration;
-    use datafusion::execution::context::TaskContext;
-
-    use crate::execution_engine::DefaultQueryStageExec;
     use ballista_core::serde::scheduler::PartitionId;
     use datafusion::error::{DataFusionError, Result};
-    use datafusion::physical_expr::PhysicalSortExpr;
+    use datafusion::execution::context::TaskContext;
+
     use datafusion::physical_plan::{
-        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
RecordBatchStream,
-        SendableRecordBatchStream, Statistics,
+        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties,
+        RecordBatchStream, SendableRecordBatchStream, Statistics,
     };
     use datafusion::prelude::SessionContext;
     use futures::Stream;
@@ -247,7 +246,23 @@ mod test {
 
     /// An ExecutionPlan which will never terminate
     #[derive(Debug)]
-    pub struct NeverendingOperator;
+    pub struct NeverendingOperator {
+        properties: PlanProperties,
+    }
+
+    impl NeverendingOperator {
+        fn new() -> Self {
+            NeverendingOperator {
+                properties: PlanProperties::new(
+                    
datafusion::physical_expr::EquivalenceProperties::new(Arc::new(
+                        Schema::empty(),
+                    )),
+                    Partitioning::UnknownPartitioning(1),
+                    datafusion::physical_plan::ExecutionMode::Bounded,
+                ),
+            }
+        }
+    }
 
     impl DisplayAs for NeverendingOperator {
         fn fmt_as(
@@ -272,12 +287,8 @@ mod test {
             Arc::new(Schema::empty())
         }
 
-        fn output_partitioning(&self) -> Partitioning {
-            Partitioning::UnknownPartitioning(1)
-        }
-
-        fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-            None
+        fn properties(&self) -> &PlanProperties {
+            &self.properties
         }
 
         fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
@@ -316,7 +327,7 @@ mod test {
         let shuffle_write = ShuffleWriterExec::try_new(
             "job-id".to_owned(),
             1,
-            Arc::new(NeverendingOperator),
+            Arc::new(NeverendingOperator::new()),
             work_dir.clone(),
             None,
         )
diff --git a/ballista/executor/src/flight_service.rs 
b/ballista/executor/src/flight_service.rs
index c4ffab4f..43387f09 100644
--- a/ballista/executor/src/flight_service.rs
+++ b/ballista/executor/src/flight_service.rs
@@ -33,7 +33,7 @@ use arrow::ipc::writer::IpcWriteOptions;
 use arrow_flight::{
     flight_service_server::FlightService, Action, ActionType, Criteria, Empty,
     FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, 
HandshakeResponse,
-    PutResult, SchemaResult, Ticket,
+    PollInfo, PutResult, SchemaResult, Ticket,
 };
 use datafusion::arrow::{error::ArrowError, record_batch::RecordBatch};
 use futures::{Stream, StreamExt, TryStreamExt};
@@ -203,6 +203,13 @@ impl FlightService for BallistaFlightService {
     ) -> Result<Response<Self::DoExchangeStream>, Status> {
         Err(Status::unimplemented("do_exchange"))
     }
+
+    async fn poll_flight_info(
+        &self,
+        _request: Request<FlightDescriptor>,
+    ) -> Result<Response<PollInfo>, Status> {
+        Err(Status::unimplemented("poll_flight_info"))
+    }
 }
 
 fn read_partition<T>(
diff --git a/ballista/scheduler/src/cluster/mod.rs 
b/ballista/scheduler/src/cluster/mod.rs
index f7e1ca46..71e81c90 100644
--- a/ballista/scheduler/src/cluster/mod.rs
+++ b/ballista/scheduler/src/cluster/mod.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 
 use clap::ArgEnum;
 use datafusion::common::tree_node::TreeNode;
-use datafusion::common::tree_node::VisitRecursion;
+use datafusion::common::tree_node::TreeNodeRecursion;
 use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::physical_plan::{AvroExec, CsvExec, NdJsonExec, 
ParquetExec};
 use datafusion::error::DataFusionError;
@@ -701,11 +701,11 @@ pub(crate) fn get_scan_files(
             } else if let Some(csv_exec) = plan_any.downcast_ref::<CsvExec>() {
                 csv_exec.base_config().file_groups.clone()
             } else {
-                return Ok(VisitRecursion::Continue);
+                return Ok(TreeNodeRecursion::Continue);
             };
 
         collector.push(file_groups);
-        Ok(VisitRecursion::Skip)
+        Ok(TreeNodeRecursion::Jump)
     })?;
     Ok(collector)
 }
diff --git a/ballista/scheduler/src/flight_sql.rs 
b/ballista/scheduler/src/flight_sql.rs
index 62c96966..24ede23b 100644
--- a/ballista/scheduler/src/flight_sql.rs
+++ b/ballista/scheduler/src/flight_sql.rs
@@ -33,7 +33,7 @@ use arrow_flight::sql::{
 };
 use arrow_flight::{
     Action, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, 
HandshakeRequest,
-    HandshakeResponse, Location, Ticket,
+    HandshakeResponse, Ticket,
 };
 use base64::Engine;
 use futures::Stream;
@@ -69,6 +69,7 @@ use datafusion::common::DFSchemaRef;
 use datafusion::logical_expr::LogicalPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
+use prost::bytes::Bytes;
 use prost::Message;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::time::sleep;
@@ -93,7 +94,6 @@ impl FlightSqlServiceImpl {
         }
     }
 
-    #[allow(deprecated)]
     fn tables(&self, ctx: Arc<SessionContext>) -> Result<RecordBatch, 
ArrowError> {
         let schema = Arc::new(Schema::new(vec![
             Field::new("catalog_name", DataType::Utf8, true),
@@ -101,9 +101,21 @@ impl FlightSqlServiceImpl {
             Field::new("table_name", DataType::Utf8, false),
             Field::new("table_type", DataType::Utf8, false),
         ]));
-        let tables = ctx.tables()?; // resolved in #501
-        let names: Vec<_> = tables.iter().map(|it| 
Some(it.as_str())).collect();
-        let types: Vec<_> = names.iter().map(|_| Some("TABLE")).collect();
+        let mut names: Vec<Option<String>> = vec![];
+        for catalog_name in ctx.catalog_names() {
+            let catalog = ctx
+                .catalog(&catalog_name)
+                .expect("catalog should have been found");
+            for schema_name in catalog.schema_names() {
+                let schema = catalog
+                    .schema(&schema_name)
+                    .expect("schema should have been found");
+                for table_name in schema.table_names() {
+                    names.push(Some(table_name));
+                }
+            }
+        }
+        let types: Vec<_> = names.iter().map(|_| 
Some("TABLE".to_string())).collect();
         let cats: Vec<_> = names.iter().map(|_| None).collect();
         let schemas: Vec<_> = names.iter().map(|_| None).collect();
         let rb = RecordBatch::try_new(
@@ -290,15 +302,11 @@ impl FlightSqlServiceImpl {
                 Err(Status::internal("Error getting stats".to_string()))?
             }
             let authority = format!("{}:{}", &host, &port);
-            let loc = Location {
-                uri: format!("grpc+tcp://{authority}"),
-            };
             let buf = fetch.as_any().encode_to_vec();
             let ticket = Ticket { ticket: buf.into() };
-            let fiep = FlightEndpoint {
-                ticket: Some(ticket),
-                location: vec![loc],
-            };
+            let fiep = FlightEndpoint::new()
+                .with_ticket(ticket)
+                .with_location(format!("grpc+tcp://{authority}"));
             fieps.push(fiep);
         }
         Ok(fieps)
@@ -319,15 +327,11 @@ impl FlightSqlServiceImpl {
             settings: vec![],
         };
         let authority = format!("{}:{}", &host, &port); // TODO: use advertise 
host
-        let loc = Location {
-            uri: format!("grpc+tcp://{authority}"),
-        };
         let buf = fetch.as_any().encode_to_vec();
         let ticket = Ticket { ticket: buf.into() };
-        let fiep = FlightEndpoint {
-            ticket: Some(ticket),
-            location: vec![loc],
-        };
+        let fiep = FlightEndpoint::new()
+            .with_ticket(ticket)
+            .with_location(format!("grpc+tcp://{authority}"));
         let fieps = vec![fiep];
         Ok(fieps)
     }
@@ -406,6 +410,7 @@ impl FlightSqlServiceImpl {
             total_records: num_rows,
             total_bytes: num_bytes,
             ordered: false,
+            app_metadata: Bytes::new(),
         };
         Response::new(info)
     }
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index 3e8bdc6c..dfaa00aa 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -109,8 +109,7 @@ impl DistributedPlanner {
             let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
             stages.push(shuffle_writer);
             Ok((
-                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?
-                    .into(),
+                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?,
                 stages,
             ))
         } else if let Some(_sort_preserving_merge) = execution_plan
@@ -126,14 +125,13 @@ impl DistributedPlanner {
             let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
             stages.push(shuffle_writer);
             Ok((
-                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?
-                    .into(),
+                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?,
                 stages,
             ))
         } else if let Some(repart) =
             execution_plan.as_any().downcast_ref::<RepartitionExec>()
         {
-            match repart.output_partitioning() {
+            match repart.properties().output_partitioning() {
                 Partitioning::Hash(_, _) => {
                     let shuffle_writer = create_shuffle_writer(
                         job_id,
@@ -158,7 +156,7 @@ impl DistributedPlanner {
             )))
         } else {
             Ok((
-                with_new_children_if_necessary(execution_plan, 
children)?.into(),
+                with_new_children_if_necessary(execution_plan, children)?,
                 stages,
             ))
         }
@@ -177,7 +175,10 @@ fn create_unresolved_shuffle(
     Arc::new(UnresolvedShuffleExec::new(
         shuffle_writer.stage_id(),
         shuffle_writer.schema(),
-        shuffle_writer.output_partitioning().partition_count(),
+        shuffle_writer
+            .properties()
+            .output_partitioning()
+            .partition_count(),
     ))
 }
 
@@ -250,7 +251,7 @@ pub fn remove_unresolved_shuffles(
             new_children.push(remove_unresolved_shuffles(child, 
partition_locations)?);
         }
     }
-    Ok(with_new_children_if_necessary(stage, new_children)?.into())
+    Ok(with_new_children_if_necessary(stage, new_children)?)
 }
 
 /// Rollback the ShuffleReaderExec to UnresolvedShuffleExec.
@@ -262,8 +263,10 @@ pub fn rollback_resolved_shuffles(
     let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
     for child in stage.children() {
         if let Some(shuffle_reader) = 
child.as_any().downcast_ref::<ShuffleReaderExec>() {
-            let output_partition_count =
-                shuffle_reader.output_partitioning().partition_count();
+            let output_partition_count = shuffle_reader
+                .properties()
+                .output_partitioning()
+                .partition_count();
             let stage_id = shuffle_reader.stage_id;
 
             let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
@@ -276,7 +279,7 @@ pub fn rollback_resolved_shuffles(
             new_children.push(rollback_resolved_shuffles(child)?);
         }
     }
-    Ok(with_new_children_if_necessary(stage, new_children)?.into())
+    Ok(with_new_children_if_necessary(stage, new_children)?)
 }
 
 fn create_shuffle_writer(
@@ -318,7 +321,11 @@ mod test {
 
     macro_rules! downcast_exec {
         ($exec: expr, $ty: ty) => {
-            $exec.as_any().downcast_ref::<$ty>().unwrap()
+            $exec.as_any().downcast_ref::<$ty>().expect(&format!(
+                "Downcast to {} failed. Got {:?}",
+                stringify!($ty),
+                $exec
+            ))
         };
     }
 
@@ -344,8 +351,8 @@ mod test {
         let mut planner = DistributedPlanner::new();
         let job_uuid = Uuid::new_v4();
         let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
-        for stage in &stages {
-            println!("{}", displayable(stage.as_ref()).indent(false));
+        for (i, stage) in stages.iter().enumerate() {
+            println!("Stage {i}:\n{}", 
displayable(stage.as_ref()).indent(false));
         }
 
         /* Expected result:
@@ -450,8 +457,8 @@ order by
         let mut planner = DistributedPlanner::new();
         let job_uuid = Uuid::new_v4();
         let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?;
-        for stage in &stages {
-            println!("{}", displayable(stage.as_ref()).indent(false));
+        for (i, stage) in stages.iter().enumerate() {
+            println!("Stage {i}:\n{}", 
displayable(stage.as_ref()).indent(false));
         }
 
         /* Expected result:
@@ -467,13 +474,12 @@ order by
 
         ShuffleWriterExec: Some(Hash([Column { name: "l_shipmode", index: 0 
}], 2))
           AggregateExec: mode=Partial, gby=[l_shipmode@0 as l_shipmode], 
aggr=[SUM(CASE WHEN orders.o_orderpriority = Utf8("1-URGENT") OR 
orders.o_orderpriority = Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END), 
SUM(CASE WHEN orders.o_orderpriority != Utf8("1-URGENT") AND 
orders.o_orderpriority != Utf8("2-HIGH") THEN Int64(1) ELSE Int64(0) END)]
-            ProjectionExec: expr=[l_shipmode@1 as l_shipmode, 
o_orderpriority@3 as o_orderpriority]
-              CoalesceBatchesExec: target_batch_size=8192
-                HashJoinExec: mode=Partitioned, join_type=Inner, on=[(Column { 
name: "l_orderkey", index: 0 }, Column { name: "o_orderkey", index: 0 })]
-                  CoalesceBatchesExec: target_batch_size=8192
-                    UnresolvedShuffleExec
-                  CoalesceBatchesExec: target_batch_size=8192
-                    UnresolvedShuffleExec
+            CoalesceBatchesExec: target_batch_size=8192
+              HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(l_orderkey@0, o_orderkey@0)], projection=[l_shipmode@1, o_orderpriority@3]
+                CoalesceBatchesExec: target_batch_size=8192
+                  UnresolvedShuffleExec
+                CoalesceBatchesExec: target_batch_size=8192
+                  UnresolvedShuffleExec
 
         ShuffleWriterExec: None
           SortExec: expr=[l_shipmode@0 ASC NULLS LAST]
@@ -495,6 +501,7 @@ order by
         assert_eq!(
             2,
             stages[0].children()[0]
+                .properties()
                 .output_partitioning()
                 .partition_count()
         );
@@ -502,7 +509,7 @@ order by
             2,
             stages[0]
                 .shuffle_output_partitioning()
-                .unwrap()
+                .expect("stage 0")
                 .partition_count()
         );
 
@@ -510,6 +517,7 @@ order by
         assert_eq!(
             1,
             stages[1].children()[0]
+                .properties()
                 .output_partitioning()
                 .partition_count()
         );
@@ -517,31 +525,32 @@ order by
             2,
             stages[1]
                 .shuffle_output_partitioning()
-                .unwrap()
+                .expect("stage 1")
                 .partition_count()
         );
 
         // join and partial hash aggregate
         let input = stages[2].children()[0].clone();
-        assert_eq!(2, input.output_partitioning().partition_count());
+        assert_eq!(
+            2,
+            input.properties().output_partitioning().partition_count()
+        );
         assert_eq!(
             2,
             stages[2]
                 .shuffle_output_partitioning()
-                .unwrap()
+                .expect("stage 2")
                 .partition_count()
         );
 
         let hash_agg = downcast_exec!(input, AggregateExec);
 
-        let projection = hash_agg.children()[0].clone();
-        let projection = downcast_exec!(projection, ProjectionExec);
-
-        let coalesce_batches = projection.children()[0].clone();
+        let coalesce_batches = hash_agg.children()[0].clone();
         let coalesce_batches = downcast_exec!(coalesce_batches, 
CoalesceBatchesExec);
 
         let join = coalesce_batches.children()[0].clone();
         let join = downcast_exec!(join, HashJoinExec);
+        assert!(join.contain_projection());
 
         let join_input_1 = join.children()[0].clone();
         // skip CoalesceBatches
@@ -561,6 +570,7 @@ order by
         assert_eq!(
             2,
             stages[3].children()[0]
+                .properties()
                 .output_partitioning()
                 .partition_count()
         );
@@ -570,6 +580,7 @@ order by
         assert_eq!(
             1,
             stages[4].children()[0]
+                .properties()
                 .output_partitioning()
                 .partition_count()
         );
diff --git a/ballista/scheduler/src/state/execution_graph.rs 
b/ballista/scheduler/src/state/execution_graph.rs
index 70b2659f..9ee95d67 100644
--- a/ballista/scheduler/src/state/execution_graph.rs
+++ b/ballista/scheduler/src/state/execution_graph.rs
@@ -151,7 +151,7 @@ impl ExecutionGraph {
     ) -> Result<Self> {
         let mut planner = DistributedPlanner::new();
 
-        let output_partitions = plan.output_partitioning().partition_count();
+        let output_partitions = 
plan.properties().output_partitioning().partition_count();
 
         let shuffle_stages = planner.plan_query_stages(job_id, plan)?;
 
diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs 
b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
index f082fe43..8aded3e0 100644
--- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs
+++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs
@@ -1176,7 +1176,7 @@ fn get_stage_partitions(plan: Arc<dyn ExecutionPlan>) -> 
usize {
     plan.as_any()
         .downcast_ref::<ShuffleWriterExec>()
         .map(|shuffle_writer| shuffle_writer.input_partition_count())
-        .unwrap_or_else(|| plan.output_partitioning().partition_count())
+        .unwrap_or_else(|| 
plan.properties().output_partitioning().partition_count())
 }
 
 /// This data structure collects the partition locations for an 
`ExecutionStage`.
diff --git a/ballista/scheduler/src/state/execution_graph_dot.rs 
b/ballista/scheduler/src/state/execution_graph_dot.rs
index 0cd6837f..3a6dce7a 100644
--- a/ballista/scheduler/src/state/execution_graph_dot.rs
+++ b/ballista/scheduler/src/state/execution_graph_dot.rs
@@ -281,12 +281,12 @@ aggr=[{}]",
     } else if let Some(exec) = 
plan.as_any().downcast_ref::<CoalescePartitionsExec>() {
         format!(
             "CoalescePartitions [{}]",
-            format_partitioning(exec.output_partitioning())
+            
format_partitioning(exec.properties().output_partitioning().clone())
         )
     } else if let Some(exec) = plan.as_any().downcast_ref::<RepartitionExec>() 
{
         format!(
             "RepartitionExec [{}]",
-            format_partitioning(exec.output_partitioning())
+            
format_partitioning(exec.properties().output_partitioning().clone())
         )
     } else if let Some(exec) = plan.as_any().downcast_ref::<HashJoinExec>() {
         let join_expr = exec
@@ -323,24 +323,24 @@ filter_expr={}",
     } else if plan.as_any().downcast_ref::<MemoryExec>().is_some() {
         "MemoryExec".to_string()
     } else if let Some(exec) = plan.as_any().downcast_ref::<CsvExec>() {
-        let parts = exec.output_partitioning().partition_count();
+        let parts = exec.properties().output_partitioning().partition_count();
         format!(
             "CSV: {} [{} partitions]",
             get_file_scan(exec.base_config()),
             parts
         )
     } else if let Some(exec) = plan.as_any().downcast_ref::<NdJsonExec>() {
-        let parts = exec.output_partitioning().partition_count();
+        let parts = exec.properties().output_partitioning().partition_count();
         format!("JSON [{parts} partitions]")
     } else if let Some(exec) = plan.as_any().downcast_ref::<AvroExec>() {
-        let parts = exec.output_partitioning().partition_count();
+        let parts = exec.properties().output_partitioning().partition_count();
         format!(
             "Avro: {} [{} partitions]",
             get_file_scan(exec.base_config()),
             parts
         )
     } else if let Some(exec) = plan.as_any().downcast_ref::<ParquetExec>() {
-        let parts = exec.output_partitioning().partition_count();
+        let parts = exec.properties().output_partitioning().partition_count();
         format!(
             "Parquet: {} [{} partitions]",
             get_file_scan(exec.base_config()),
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index ba8647ee..90c9f6f3 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::common::tree_node::{TreeNode, VisitRecursion};
+use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
 use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
 use datafusion::datasource::source_as_provider;
 use datafusion::error::DataFusionError;
@@ -399,7 +399,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
                     }
                 }
             }
-            Ok(VisitRecursion::Continue)
+            Ok(TreeNodeRecursion::Continue)
         })?;
 
         let plan = session_ctx.state().create_physical_plan(plan).await?;
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index c0719175..01dcade1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -827,7 +827,7 @@ async fn get_table(
             }
             "parquet" => {
                 let path = format!("{path}/{table}");
-                let format = 
ParquetFormat::default().with_enable_pruning(Some(true));
+                let format = 
ParquetFormat::default().with_enable_pruning(true);
 
                 (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
             }
@@ -844,7 +844,6 @@ async fn get_table(
         collect_stat: true,
         table_partition_cols: vec![],
         file_sort_order: vec![],
-        file_type_write_options: None,
     };
 
     let url = ListingTableUrl::parse(path)?;
@@ -1042,7 +1041,7 @@ async fn get_expected_results(n: usize, path: &str) -> 
Result<Vec<RecordBatch>>
                         // there's no support for casting from Utf8 to 
Decimal, so
                         // we'll cast from Utf8 to Float64 to Decimal for 
Decimal types
                         let inner_cast = Box::new(Expr::Cast(Cast::new(
-                            Box::new(trim(col(Field::name(field)))),
+                            Box::new(trim(vec![col(Field::name(field))])),
                             DataType::Float64,
                         )));
                         Expr::Cast(Cast::new(
@@ -1052,7 +1051,7 @@ async fn get_expected_results(n: usize, path: &str) -> 
Result<Vec<RecordBatch>>
                         .alias(Field::name(field))
                     }
                     _ => Expr::Cast(Cast::new(
-                        Box::new(trim(col(Field::name(field)))),
+                        Box::new(trim(vec![col(Field::name(field))])),
                         Field::data_type(field).to_owned(),
                     ))
                     .alias(Field::name(field)),
@@ -1577,6 +1576,7 @@ mod tests {
 mod ballista_round_trip {
     use super::*;
     use ballista_core::serde::BallistaCodec;
+    use datafusion::config::TableOptions;
     use datafusion::datasource::listing::ListingTableUrl;
     use datafusion::execution::options::ReadOptions;
     use datafusion::physical_plan::ExecutionPlan;
@@ -1609,7 +1609,8 @@ mod ballista_round_trip {
                 .has_header(false)
                 .file_extension(".tbl");
             let cfg = SessionConfig::new();
-            let listing_options = options.to_listing_options(&cfg);
+            let listing_options =
+                options.to_listing_options(&cfg, TableOptions::default());
             let config = ListingTableConfig::new(path.clone())
                 .with_listing_options(listing_options)
                 .with_schema(Arc::new(schema));
@@ -1665,7 +1666,8 @@ mod ballista_round_trip {
                 .has_header(false)
                 .file_extension(".tbl");
             let cfg = SessionConfig::new();
-            let listing_options = options.to_listing_options(&cfg);
+            let listing_options =
+                options.to_listing_options(&cfg, TableOptions::default());
             let config = ListingTableConfig::new(path.clone())
                 .with_listing_options(listing_options)
                 .with_schema(Arc::new(schema));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to