This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new a9ecd3a0 Upgrade DataFusion to 22.0.0 (#740)
a9ecd3a0 is described below
commit a9ecd3a065077bec6c5d271e890d091c594746fa
Author: r.4ntix <[email protected]>
AuthorDate: Thu Apr 13 22:30:23 2023 +0800
Upgrade DataFusion to 22.0.0 (#740)
* Upgrade DataFusion to 22.0.0
* fix clippy
---
Cargo.toml | 8 ++--
ballista-cli/Cargo.toml | 2 +-
ballista/scheduler/src/planner.rs | 12 +++---
ballista/scheduler/src/state/mod.rs | 80 +++++++++++++++++--------------------
benchmarks/src/bin/tpch.rs | 6 +--
5 files changed, 51 insertions(+), 57 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index c2646cd3..bbab677c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,10 +20,10 @@ members = ["ballista-cli", "ballista/client",
"ballista/core", "ballista/executo
exclude = ["python"]
[workspace.dependencies]
-arrow = { version = "34.0.0" }
-arrow-flight = { version = "34.0.0", features = ["flight-sql-experimental"] }
-datafusion = "21.0.0"
-datafusion-proto = "21.0.0"
+arrow = { version = "36.0.0" }
+arrow-flight = { version = "36.0.0", features = ["flight-sql-experimental"] }
+datafusion = "22.0.0"
+datafusion-proto = "22.0.0"
# cargo build --profile release-lto
[profile.release-lto]
diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml
index 075989f2..8f94db42 100644
--- a/ballista-cli/Cargo.toml
+++ b/ballista-cli/Cargo.toml
@@ -32,7 +32,7 @@ readme = "README.md"
ballista = { path = "../ballista/client", version = "0.11.0", features =
["standalone"] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { workspace = true }
-datafusion-cli = "21.0.0"
+datafusion-cli = "22.0.0"
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
diff --git a/ballista/scheduler/src/planner.rs
b/ballista/scheduler/src/planner.rs
index ab8741b4..87c523cb 100644
--- a/ballista/scheduler/src/planner.rs
+++ b/ballista/scheduler/src/planner.rs
@@ -109,7 +109,8 @@ impl DistributedPlanner {
let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((
- with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?,
+ with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?
+ .into(),
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
@@ -125,7 +126,8 @@ impl DistributedPlanner {
let unresolved_shuffle =
create_unresolved_shuffle(&shuffle_writer);
stages.push(shuffle_writer);
Ok((
- with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?,
+ with_new_children_if_necessary(execution_plan,
vec![unresolved_shuffle])?
+ .into(),
stages,
))
} else if let Some(repart) =
@@ -156,7 +158,7 @@ impl DistributedPlanner {
)))
} else {
Ok((
- with_new_children_if_necessary(execution_plan, children)?,
+ with_new_children_if_necessary(execution_plan,
children)?.into(),
stages,
))
}
@@ -251,7 +253,7 @@ pub fn remove_unresolved_shuffles(
new_children.push(remove_unresolved_shuffles(child,
partition_locations)?);
}
}
- Ok(with_new_children_if_necessary(stage, new_children)?)
+ Ok(with_new_children_if_necessary(stage, new_children)?.into())
}
/// Rollback the ShuffleReaderExec to UnresolvedShuffleExec.
@@ -279,7 +281,7 @@ pub fn rollback_resolved_shuffles(
new_children.push(rollback_resolved_shuffles(child)?);
}
}
- Ok(with_new_children_if_necessary(stage, new_children)?)
+ Ok(with_new_children_if_necessary(stage, new_children)?.into())
}
fn create_shuffle_writer(
diff --git a/ballista/scheduler/src/state/mod.rs
b/ballista/scheduler/src/state/mod.rs
index 0c3b8bf0..31675cbe 100644
--- a/ballista/scheduler/src/state/mod.rs
+++ b/ballista/scheduler/src/state/mod.rs
@@ -15,9 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+use datafusion::common::tree_node::{TreeNode, VisitRecursion};
use datafusion::datasource::listing::{ListingTable, ListingTableUrl};
use datafusion::datasource::source_as_provider;
-use datafusion::logical_expr::PlanVisitor;
+use datafusion::error::DataFusionError;
use std::any::type_name;
use std::collections::HashMap;
use std::sync::Arc;
@@ -328,53 +329,44 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerState<T,
debug!("Optimized plan: {}", optimized_plan.display_indent());
}
- struct VerifyPathsExist {}
- impl PlanVisitor for VerifyPathsExist {
- type Error = BallistaError;
-
- fn pre_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- if let LogicalPlan::TableScan(scan) = plan {
- let provider = source_as_provider(&scan.source)?;
- if let Some(table) =
provider.as_any().downcast_ref::<ListingTable>()
- {
- let local_paths: Vec<&ListingTableUrl> = table
- .table_paths()
- .iter()
- .filter(|url| url.as_str().starts_with("file:///"))
- .collect();
- if !local_paths.is_empty() {
- // These are local files rather than remote object
stores, so we
- // need to check that they are accessible on the
scheduler (the client
- // may not be on the same host, or the data path
may not be correctly
- // mounted in the container). There could be
thousands of files so we
- // just check the first one.
- let url = &local_paths[0].as_str();
- // the unwraps are safe here because we checked
that the url starts with file:///
- // we need to check both versions here to support
Linux & Windows
-
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
- .or_else(|_| {
- ListingTableUrl::parse(
- url.strip_prefix("file:///").unwrap(),
- )
- })
- .map_err(|e| {
- BallistaError::General(format!(
+ plan.apply(&mut |plan| {
+ if let LogicalPlan::TableScan(scan) = plan {
+ let provider = source_as_provider(&scan.source)?;
+ if let Some(table) =
provider.as_any().downcast_ref::<ListingTable>() {
+ let local_paths: Vec<&ListingTableUrl> = table
+ .table_paths()
+ .iter()
+ .filter(|url| url.as_str().starts_with("file:///"))
+ .collect();
+ if !local_paths.is_empty() {
+ // These are local files rather than remote object
stores, so we
+ // need to check that they are accessible on the
scheduler (the client
+ // may not be on the same host, or the data path may
not be correctly
+ // mounted in the container). There could be thousands
of files so we
+ // just check the first one.
+ let url = &local_paths[0].as_str();
+ // the unwraps are safe here because we checked that
the url starts with file:///
+ // we need to check both versions here to support
Linux & Windows
+
ListingTableUrl::parse(url.strip_prefix("file://").unwrap())
+ .or_else(|_| {
+ ListingTableUrl::parse(
+ url.strip_prefix("file:///").unwrap(),
+ )
+ })
+ .map_err(|e| {
+ DataFusionError::External(
+ format!(
"logical plan refers to path on local file
system \
- that is not accessible in the scheduler:
{url}: {e:?}"
- ))
- })?;
- }
+ that is not accessible in the scheduler:
{url}: {e:?}"
+ )
+ .into(),
+ )
+ })?;
}
}
- Ok(true)
}
- }
-
- let mut verify_paths_exist = VerifyPathsExist {};
- plan.accept(&mut verify_paths_exist)?;
+ Ok(VisitRecursion::Continue)
+ })?;
let plan = session_ctx.state().create_physical_plan(plan).await?;
debug!(
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index b4314359..061a2da1 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -769,11 +769,11 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
let compression = match opt.compression.as_str() {
"none" => Compression::UNCOMPRESSED,
"snappy" => Compression::SNAPPY,
- "brotli" => Compression::BROTLI,
- "gzip" => Compression::GZIP,
+ "brotli" => Compression::BROTLI(Default::default()),
+ "gzip" => Compression::GZIP(Default::default()),
"lz4" => Compression::LZ4,
"lz0" => Compression::LZO,
- "zstd" => Compression::ZSTD,
+ "zstd" => Compression::ZSTD(Default::default()),
other => {
return Err(DataFusionError::NotImplemented(format!(
"Invalid compression format: {other}"