This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new a9ecd3a0 Upgrade DataFusion to 22.0.0 (#740)
a9ecd3a0 is described below

commit a9ecd3a065077bec6c5d271e890d091c594746fa
Author: r.4ntix <[email protected]>
AuthorDate: Thu Apr 13 22:30:23 2023 +0800

    Upgrade DataFusion to 22.0.0 (#740)
    
    * Upgrade DataFusion to 22.0.0
    
    * fix clippy
---
 Cargo.toml                          |  8 ++--
 ballista-cli/Cargo.toml             |  2 +-
 ballista/scheduler/src/planner.rs   | 12 +++---
 ballista/scheduler/src/state/mod.rs | 80 +++++++++++++++++--------------------
 benchmarks/src/bin/tpch.rs          |  6 +--
 5 files changed, 51 insertions(+), 57 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c2646cd3..bbab677c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,10 +20,10 @@ members = ["ballista-cli", "ballista/client", 
"ballista/core", "ballista/executo
 exclude = ["python"]
 
 [workspace.dependencies]
-arrow = { version = "34.0.0" }
-arrow-flight = { version = "34.0.0", features = ["flight-sql-experimental"] }
-datafusion = "21.0.0"
-datafusion-proto = "21.0.0"
+arrow = { version = "36.0.0" }
+arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"] }
+datafusion = "22.0.0"
+datafusion-proto = "22.0.0"
 
 # cargo build --profile release-lto
 [profile.release-lto]
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 075989f2..8f94db42 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -32,7 +32,7 @@ readme = "README.md"
 ballista = { path = "../ballista/client", version = "0.11.0", features = 
["standalone"] }
 clap = { version = "3", features = ["derive", "cargo"] }
 datafusion = { workspace = true }
-datafusion-cli = "21.0.0"
+datafusion-cli = "22.0.0"
 dirs = "4.0.0"
 env_logger = "0.10"
 mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/scheduler/src/planner.rs 
b/ballista/scheduler/src/planner.rs
index ab8741b4..87c523cb 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -109,7 +109,8 @@ impl DistributedPlanner {
             let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
             stages.push(shuffle_writer);
             Ok((
-                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?,
+                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?
+                    .into(),
                 stages,
             ))
         } else if let Some(_sort_preserving_merge) = execution_plan
@@ -125,7 +126,8 @@ impl DistributedPlanner {
             let unresolved_shuffle = 
create_unresolved_shuffle(&shuffle_writer);
             stages.push(shuffle_writer);
             Ok((
-                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?,
+                with_new_children_if_necessary(execution_plan, 
vec![unresolved_shuffle])?
+                    .into(),
                 stages,
             ))
         } else if let Some(repart) =
@@ -156,7 +158,7 @@ impl DistributedPlanner {
             )))
         } else {
             Ok((
-                with_new_children_if_necessary(execution_plan, children)?,
+                with_new_children_if_necessary(execution_plan, 
children)?.into(),
                 stages,
             ))
         }
@@ -251,7 +253,7 @@ pub fn remove_unresolved_shuffles(
             new_children.push(remove_unresolved_shuffles(child, 
partition_locations)?);
         }
     }
-    Ok(with_new_children_if_necessary(stage, new_children)?)
+    Ok(with_new_children_if_necessary(stage, new_children)?.into())
 }
 
 /// Rollback the ShuffleReaderExec to UnresolvedShuffleExec.
@@ -279,7 +281,7 @@ pub fn rollback_resolved_shuffles(
             new_children.push(rollback_resolved_shuffles(child)?);
         }
     }
-    Ok(with_new_children_if_necessary(stage, new_children)?)
+    Ok(with_new_children_if_necessary(stage, new_children)?.into())
 }
 
 fn create_shuffle_writer(
diff --git a/ballista/scheduler/src/state/mod.rs 
b/ballista/scheduler/src/state/mod.rs
index 0c3b8bf0..31675cbe 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::common::tree_node::{TreeNode, VisitRecursion};
 use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
 use datafusion::datasource::source_as_provider;
-use datafusion::logical_expr::PlanVisitor;
+use datafusion::error::DataFusionError;
 use std::any::type_name;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -328,53 +329,44 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerState<T,
             debug!("Optimized plan: {}", optimized_plan.display_indent());
         }
 
-        struct VerifyPathsExist {}
-        impl PlanVisitor for VerifyPathsExist {
-            type Error = BallistaError;
-
-            fn pre_visit(
-                &mut self,
-                plan: &LogicalPlan,
-            ) -> std::result::Result<bool, Self::Error> {
-                if let LogicalPlan::TableScan(scan) = plan {
-                    let provider = source_as_provider(&scan.source)?;
-                    if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>()
-                    {
-                        let local_paths: Vec<&ListingTableUrl> = table
-                            .table_paths()
-                            .iter()
-                            .filter(|url| url.as_str().starts_with("file:///"))
-                            .collect();
-                        if !local_paths.is_empty() {
-                            // These are local files rather than remote object 
stores, so we
-                            // need to check that they are accessible on the 
scheduler (the client
-                            // may not be on the same host, or the data path 
may not be correctly
-                            // mounted in the container). There could be 
thousands of files so we
-                            // just check the first one.
-                            let url = &local_paths[0].as_str();
-                            // the unwraps are safe here because we checked 
that the url starts with file:///
-                            // we need to check both versions here to support 
Linux & Windows
-                            
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
-                                .or_else(|_| {
-                                    ListingTableUrl::parse(
-                                        url.strip_prefix("file:///").unwrap(),
-                                    )
-                                })
-                                .map_err(|e| {
-                                    BallistaError::General(format!(
+        plan.apply(&mut |plan| {
+            if let LogicalPlan::TableScan(scan) = plan {
+                let provider = source_as_provider(&scan.source)?;
+                if let Some(table) = 
provider.as_any().downcast_ref::<ListingTable>() {
+                    let local_paths: Vec<&ListingTableUrl> = table
+                        .table_paths()
+                        .iter()
+                        .filter(|url| url.as_str().starts_with("file:///"))
+                        .collect();
+                    if !local_paths.is_empty() {
+                        // These are local files rather than remote object 
stores, so we
+                        // need to check that they are accessible on the 
scheduler (the client
+                        // may not be on the same host, or the data path may 
not be correctly
+                        // mounted in the container). There could be thousands 
of files so we
+                        // just check the first one.
+                        let url = &local_paths[0].as_str();
+                        // the unwraps are safe here because we checked that 
the url starts with file:///
+                        // we need to check both versions here to support 
Linux & Windows
+                        
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
+                            .or_else(|_| {
+                                ListingTableUrl::parse(
+                                    url.strip_prefix("file:///").unwrap(),
+                                )
+                            })
+                            .map_err(|e| {
+                                DataFusionError::External(
+                                    format!(
                                     "logical plan refers to path on local file 
system \
-                                    that is not accessible in the scheduler: 
{url}: {e:?}"
-                                ))
-                                })?;
-                        }
+                                that is not accessible in the scheduler: 
{url}: {e:?}"
+                                )
+                                    .into(),
+                                )
+                            })?;
                     }
                 }
-                Ok(true)
             }
-        }
-
-        let mut verify_paths_exist = VerifyPathsExist {};
-        plan.accept(&mut verify_paths_exist)?;
+            Ok(VisitRecursion::Continue)
+        })?;
 
         let plan = session_ctx.state().create_physical_plan(plan).await?;
         debug!(
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index b4314359..061a2da1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -769,11 +769,11 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
                 let compression = match opt.compression.as_str() {
                     "none" => Compression::UNCOMPRESSED,
                     "snappy" => Compression::SNAPPY,
-                    "brotli" => Compression::BROTLI,
-                    "gzip" => Compression::GZIP,
+                    "brotli" => Compression::BROTLI(Default::default()),
+                    "gzip" => Compression::GZIP(Default::default()),
                     "lz4" => Compression::LZ4,
                     "lz0" => Compression::LZO,
-                    "zstd" => Compression::ZSTD,
+                    "zstd" => Compression::ZSTD(Default::default()),
                     other => {
                         return Err(DataFusionError::NotImplemented(format!(
                             "Invalid compression format: {other}"

Reply via email to