This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 15e7baa5 feat: Upgrade to DataFusion 40.0.0-rc1 (#644)
15e7baa5 is described below
commit 15e7baa5d1f76a1a0720e4f848cd01a14c09e147
Author: Andy Grove <[email protected]>
AuthorDate: Tue Jul 9 15:07:01 2024 -0600
feat: Upgrade to DataFusion 40.0.0-rc1 (#644)
* Partial upgrade to DataFusion 40.0.0
* fix
* implement more udaf
* update bitwise agg
* add func names
* remove unused imports
* remove arrow-string dep
* fix copy and paste error
* use 40.0.0-rc1 and temporarily ignore failing test
* clippy
* fall back to Spark for count windows aggregate
* address feedback
---
native/Cargo.lock | 128 ++++++++++++---------
native/core/Cargo.toml | 24 ++--
.../src/execution/datafusion/expressions/abs.rs | 2 +-
.../src/execution/datafusion/expressions/avg.rs | 2 +-
.../src/execution/datafusion/operators/expand.rs | 4 +
native/core/src/execution/datafusion/planner.rs | 119 +++++++++++++++----
.../src/execution/datafusion/shuffle_writer.rs | 4 +
native/core/src/execution/operators/copy.rs | 4 +
native/core/src/execution/operators/scan.rs | 4 +
.../org/apache/comet/serde/QueryPlanSerde.scala | 5 +-
.../org/apache/comet/exec/CometExecSuite.scala | 2 +-
11 files changed, 202 insertions(+), 96 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 6136e033..df1828ee 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -114,8 +114,9 @@ checksum =
"96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "arrow"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6127ea5e585a12ec9f742232442828ebaf264dfa5eefdd71282376c599562b77"
dependencies = [
"arrow-arith",
"arrow-array",
@@ -134,8 +135,9 @@ dependencies = [
[[package]]
name = "arrow-arith"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7add7f39210b7d726e2a8efc0083e7bf06e8f2d15bdb4896b564dce4410fbf5d"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -148,8 +150,9 @@ dependencies = [
[[package]]
name = "arrow-array"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "81c16ec702d3898c2f5cfdc148443c6cd7dbe5bac28399859eb0a3d38f072827"
dependencies = [
"ahash",
"arrow-buffer",
@@ -164,8 +167,9 @@ dependencies = [
[[package]]
name = "arrow-buffer"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cae6970bab043c4fbc10aee1660ceb5b306d0c42c8cc5f6ae564efcd9759b663"
dependencies = [
"bytes",
"half",
@@ -174,8 +178,9 @@ dependencies = [
[[package]]
name = "arrow-cast"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1c7ef44f26ef4f8edc392a048324ed5d757ad09135eff6d5509e6450d39e0398"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -194,8 +199,9 @@ dependencies = [
[[package]]
name = "arrow-csv"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5f843490bd258c5182b66e888161bb6f198f49f3792f7c7f98198b924ae0f564"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -212,8 +218,9 @@ dependencies = [
[[package]]
name = "arrow-data"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a769666ffac256dd301006faca1ca553d0ae7cffcf4cd07095f73f95eb226514"
dependencies = [
"arrow-buffer",
"arrow-schema",
@@ -223,8 +230,9 @@ dependencies = [
[[package]]
name = "arrow-ipc"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dbf9c3fb57390a1af0b7bb3b5558c1ee1f63905f3eccf49ae7676a8d1e6e5a72"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -237,8 +245,9 @@ dependencies = [
[[package]]
name = "arrow-json"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "654e7f3724176b66ddfacba31af397c48e106fbe4d281c8144e7d237df5acfd7"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -256,8 +265,9 @@ dependencies = [
[[package]]
name = "arrow-ord"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e8008370e624e8e3c68174faaf793540287106cfda8ad1da862fdc53d8e096b4"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -270,8 +280,9 @@ dependencies = [
[[package]]
name = "arrow-row"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ca5e3a6b7fda8d9fe03f3b18a2d946354ea7f3c8e4076dbdb502ad50d9d44824"
dependencies = [
"ahash",
"arrow-array",
@@ -284,16 +295,18 @@ dependencies = [
[[package]]
name = "arrow-schema"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dab1c12b40e29d9f3b699e0203c2a73ba558444c05e388a4377208f8f9c97eee"
dependencies = [
"bitflags 2.6.0",
]
[[package]]
name = "arrow-select"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e80159088ffe8c48965cb9b1a7c968b2729f29f37363df7eca177fc3281fe7c3"
dependencies = [
"ahash",
"arrow-array",
@@ -305,8 +318,9 @@ dependencies = [
[[package]]
name = "arrow-string"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fd04a6ea7de183648edbcb7a6dd925bbd04c210895f6384c780e27a9b54afcd"
dependencies = [
"arrow-array",
"arrow-buffer",
@@ -790,8 +804,8 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"ahash",
"arrow",
@@ -890,8 +904,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"ahash",
"arrow",
@@ -910,16 +924,16 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"tokio",
]
[[package]]
name = "datafusion-execution"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"arrow",
"chrono",
@@ -938,8 +952,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"ahash",
"arrow",
@@ -956,8 +970,8 @@ dependencies = [
[[package]]
name = "datafusion-functions"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"arrow",
"base64",
@@ -967,7 +981,6 @@ dependencies = [
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
- "datafusion-physical-expr",
"hashbrown",
"hex",
"itertools 0.12.1",
@@ -982,8 +995,8 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"ahash",
"arrow",
@@ -999,8 +1012,8 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"arrow",
"async-trait",
@@ -1012,13 +1025,14 @@ dependencies = [
"indexmap",
"itertools 0.12.1",
"log",
+ "paste",
"regex-syntax",
]
[[package]]
name = "datafusion-physical-expr"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"ahash",
"arrow",
@@ -1032,7 +1046,6 @@ dependencies = [
"datafusion-common",
"datafusion-execution",
"datafusion-expr",
- "datafusion-functions-aggregate",
"datafusion-physical-expr-common",
"half",
"hashbrown",
@@ -1047,19 +1060,21 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr-common"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
+ "ahash",
"arrow",
"datafusion-common",
"datafusion-expr",
+ "hashbrown",
"rand",
]
[[package]]
name = "datafusion-physical-plan"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"ahash",
"arrow",
@@ -1091,8 +1106,8 @@ dependencies = [
[[package]]
name = "datafusion-sql"
-version = "39.0.0"
-source =
"git+https://github.com/viirya/arrow-datafusion.git?rev=17446b1#17446b1886d2872be482efa4225d2b35e5d96569"
+version = "40.0.0"
+source =
"git+https://github.com/apache/datafusion.git?rev=40.0.0-rc1#4cae81363e29f011c6602a7a7a54e1aaee841046"
dependencies = [
"arrow",
"arrow-array",
@@ -2015,8 +2030,9 @@ dependencies = [
[[package]]
name = "parquet"
-version = "52.0.0"
-source =
"git+https://github.com/apache/arrow-rs.git?rev=0a4d8a1#0a4d8a14b58e45ef92e31541f0b51a5b25de5f10"
+version = "52.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0f22ba0d95db56dde8685e3fadcb915cdaadda31ab8abbe3ff7f0ad1ef333267"
dependencies = [
"ahash",
"bytes",
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index bd0a3d5e..160db294 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -32,12 +32,12 @@ include = [
]
[dependencies]
-arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1",
features = ["prettyprint", "ffi", "chrono-tz"] }
-arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev =
"0a4d8a1" }
-arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev =
"0a4d8a1" }
-arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1"
}
-arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev =
"0a4d8a1" }
-parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "0a4d8a1",
default-features = false, features = ["experimental"] }
+arrow = { version = "52.1.0", features = ["prettyprint", "ffi", "chrono-tz"] }
+arrow-array = { version = "52.1.0" }
+arrow-buffer = { version = "52.1.0" }
+arrow-data = { version = "52.1.0" }
+arrow-schema = { version = "52.1.0" }
+parquet = { version = "52.1.0", default-features = false, features =
["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
mimalloc = { version = "*", default-features = false, optional = true }
@@ -64,12 +64,12 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
-datafusion-common = { git = "https://github.com/viirya/arrow-datafusion.git",
rev = "17446b1" }
-datafusion = { default-features = false, git =
"https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features =
["unicode_expressions", "crypto_expressions"] }
-datafusion-functions = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "17446b1", features =
["crypto_expressions"] }
-datafusion-expr = { git = "https://github.com/viirya/arrow-datafusion.git",
rev = "17446b1", default-features = false }
-datafusion-physical-expr-common = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "17446b1",
default-features = false }
-datafusion-physical-expr = { git =
"https://github.com/viirya/arrow-datafusion.git", rev = "17446b1",
default-features = false }
+datafusion-common = { git = "https://github.com/apache/datafusion.git", rev =
"40.0.0-rc1" }
+datafusion = { default-features = false, git =
"https://github.com/apache/datafusion.git", rev = "40.0.0-rc1", features =
["unicode_expressions", "crypto_expressions"] }
+datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev
= "40.0.0-rc1", features = ["crypto_expressions"] }
+datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev =
"40.0.0-rc1", default-features = false }
+datafusion-physical-expr-common = { git =
"https://github.com/apache/datafusion.git", rev = "40.0.0-rc1",
default-features = false }
+datafusion-physical-expr = { git = "https://github.com/apache/datafusion.git",
rev = "40.0.0-rc1", default-features = false }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
diff --git a/native/core/src/execution/datafusion/expressions/abs.rs
b/native/core/src/execution/datafusion/expressions/abs.rs
index 4eb8c7c1..a037e5cb 100644
--- a/native/core/src/execution/datafusion/expressions/abs.rs
+++ b/native/core/src/execution/datafusion/expressions/abs.rs
@@ -37,7 +37,7 @@ impl CometAbsFunc {
pub fn new(eval_mode: EvalMode, data_type_name: String) -> Result<Self,
ExecutionError> {
if let EvalMode::Legacy | EvalMode::Ansi = eval_mode {
Ok(Self {
- inner_abs_func: math::abs().inner(),
+ inner_abs_func: math::abs().inner().clone(),
eval_mode,
data_type_name,
})
diff --git a/native/core/src/execution/datafusion/expressions/avg.rs
b/native/core/src/execution/datafusion/expressions/avg.rs
index 1ff276e5..3c8865bd 100644
--- a/native/core/src/execution/datafusion/expressions/avg.rs
+++ b/native/core/src/execution/datafusion/expressions/avg.rs
@@ -47,7 +47,7 @@ pub struct Avg {
impl Avg {
/// Create a new AVG aggregate function
pub fn new(expr: Arc<dyn PhysicalExpr>, name: impl Into<String>,
data_type: DataType) -> Self {
- let result_data_type = avg_return_type(&data_type).unwrap();
+ let result_data_type = avg_return_type("avg", &data_type).unwrap();
Self {
name: name.into(),
diff --git a/native/core/src/execution/datafusion/operators/expand.rs
b/native/core/src/execution/datafusion/operators/expand.rs
index 5285dfb4..67171212 100644
--- a/native/core/src/execution/datafusion/operators/expand.rs
+++ b/native/core/src/execution/datafusion/operators/expand.rs
@@ -126,6 +126,10 @@ impl ExecutionPlan for CometExpandExec {
fn properties(&self) -> &PlanProperties {
&self.cache
}
+
+ fn name(&self) -> &str {
+ "CometExpandExec"
+ }
}
pub struct ExpandStream {
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index 40515c0c..36038040 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -20,19 +20,22 @@
use std::{collections::HashMap, sync::Arc};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf,
bit_or_udaf, bit_xor_udaf};
+use datafusion::functions_aggregate::count::count_udaf;
+use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::physical_plan::windows::BoundedWindowAggExec;
use datafusion::physical_plan::InputOrderMode;
use datafusion::{
arrow::{compute::SortOptions, datatypes::SchemaRef},
common::DataFusionError,
execution::FunctionRegistry,
+ functions_aggregate::first_last::{FirstValue, LastValue},
logical_expr::Operator as DataFusionOperator,
physical_expr::{
execution_props::ExecutionProps,
expressions::{
- in_list, BinaryExpr, BitAnd, BitOr, BitXor, CaseExpr, CastExpr,
Column, Count,
- FirstValue, IsNotNullExpr, IsNullExpr, LastValue, Literal as
DataFusionLiteral, Max,
- Min, NotExpr, Sum,
+ in_list, BinaryExpr, CaseExpr, CastExpr, Column, IsNotNullExpr,
IsNullExpr,
+ Literal as DataFusionLiteral, Max, Min, NotExpr,
},
AggregateExpr, PhysicalExpr, PhysicalSortExpr, ScalarFunctionExpr,
},
@@ -647,7 +650,7 @@ impl PhysicalPlanner {
let left = self.create_expr(left, input_schema.clone())?;
let right = self.create_expr(right, input_schema.clone())?;
match (
- op,
+ &op,
left.data_type(&input_schema),
right.data_type(&input_schema),
) {
@@ -1208,11 +1211,19 @@ impl PhysicalPlanner {
.iter()
.map(|child| self.create_expr(child, schema.clone()))
.collect::<Result<Vec<_>, _>>()?;
- Ok(Arc::new(Count::new_with_multiple_exprs(
- children,
+
+ create_aggregate_expr(
+ &count_udaf(),
+ &children,
+ &[],
+ &[],
+ &[],
+ schema.as_ref(),
"count",
- DataType::Int64,
- )))
+ false,
+ false,
+ )
+ .map_err(|e| ExecutionError::DataFusionError(e.to_string()))
}
AggExprStruct::Min(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(),
schema)?;
@@ -1236,7 +1247,18 @@ impl PhysicalPlanner {
// cast to the result data type of SUM if necessary,
we should not expect
// a cast failure since it should have already been
checked at Spark side
let child = Arc::new(CastExpr::new(child,
datatype.clone(), None));
- Ok(Arc::new(Sum::new(child, "sum", datatype)))
+ create_aggregate_expr(
+ &sum_udaf(),
+ &[child],
+ &[],
+ &[],
+ &[],
+ schema.as_ref(),
+ "sum",
+ false,
+ false,
+ )
+ .map_err(|e|
ExecutionError::DataFusionError(e.to_string()))
}
}
}
@@ -1263,31 +1285,79 @@ impl PhysicalPlanner {
AggExprStruct::First(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(),
schema.clone())?;
let func =
datafusion_expr::AggregateUDF::new_from_impl(FirstValue::new());
-
- create_aggregate_expr(&func, &[child], &[], &[], &schema,
"first", false, false)
- .map_err(|e| e.into())
+ create_aggregate_expr(
+ &func,
+ &[child],
+ &[],
+ &[],
+ &[],
+ &schema,
+ "first",
+ false,
+ false,
+ )
+ .map_err(|e| e.into())
}
AggExprStruct::Last(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(),
schema.clone())?;
let func =
datafusion_expr::AggregateUDF::new_from_impl(LastValue::new());
-
- create_aggregate_expr(&func, &[child], &[], &[], &schema,
"last", false, false)
- .map_err(|e| e.into())
+ create_aggregate_expr(
+ &func,
+ &[child],
+ &[],
+ &[],
+ &[],
+ &schema,
+ "last",
+ false,
+ false,
+ )
+ .map_err(|e| e.into())
}
AggExprStruct::BitAndAgg(expr) => {
- let child = self.create_expr(expr.child.as_ref().unwrap(),
schema)?;
- let datatype =
to_arrow_datatype(expr.datatype.as_ref().unwrap());
- Ok(Arc::new(BitAnd::new(child, "bit_and", datatype)))
+ let child = self.create_expr(expr.child.as_ref().unwrap(),
schema.clone())?;
+ create_aggregate_expr(
+ &bit_and_udaf(),
+ &[child],
+ &[],
+ &[],
+ &[],
+ &schema,
+ "bit_and",
+ false,
+ false,
+ )
+ .map_err(|e| e.into())
}
AggExprStruct::BitOrAgg(expr) => {
- let child = self.create_expr(expr.child.as_ref().unwrap(),
schema)?;
- let datatype =
to_arrow_datatype(expr.datatype.as_ref().unwrap());
- Ok(Arc::new(BitOr::new(child, "bit_or", datatype)))
+ let child = self.create_expr(expr.child.as_ref().unwrap(),
schema.clone())?;
+ create_aggregate_expr(
+ &bit_or_udaf(),
+ &[child],
+ &[],
+ &[],
+ &[],
+ &schema,
+ "bit_or",
+ false,
+ false,
+ )
+ .map_err(|e| e.into())
}
AggExprStruct::BitXorAgg(expr) => {
- let child = self.create_expr(expr.child.as_ref().unwrap(),
schema)?;
- let datatype =
to_arrow_datatype(expr.datatype.as_ref().unwrap());
- Ok(Arc::new(BitXor::new(child, "bit_xor", datatype)))
+ let child = self.create_expr(expr.child.as_ref().unwrap(),
schema.clone())?;
+ create_aggregate_expr(
+ &bit_xor_udaf(),
+ &[child],
+ &[],
+ &[],
+ &[],
+ &schema,
+ "bit_xor",
+ false,
+ false,
+ )
+ .map_err(|e| e.into())
}
AggExprStruct::Covariance(expr) => {
let child1 = self.create_expr(expr.child1.as_ref().unwrap(),
schema.clone())?;
@@ -1483,6 +1553,7 @@ impl PhysicalPlanner {
&window_func,
window_func_name,
&window_args,
+ &[],
partition_by,
sort_exprs,
window_frame.into(),
diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs
b/native/core/src/execution/datafusion/shuffle_writer.rs
index 5afc9a53..6e59ce53 100644
--- a/native/core/src/execution/datafusion/shuffle_writer.rs
+++ b/native/core/src/execution/datafusion/shuffle_writer.rs
@@ -160,6 +160,10 @@ impl ExecutionPlan for ShuffleWriterExec {
fn properties(&self) -> &PlanProperties {
&self.cache
}
+
+ fn name(&self) -> &str {
+ "ShuffleWriterExec"
+ }
}
impl ShuffleWriterExec {
diff --git a/native/core/src/execution/operators/copy.rs
b/native/core/src/execution/operators/copy.rs
index d011b3cb..68c91aaf 100644
--- a/native/core/src/execution/operators/copy.rs
+++ b/native/core/src/execution/operators/copy.rs
@@ -126,6 +126,10 @@ impl ExecutionPlan for CopyExec {
fn properties(&self) -> &PlanProperties {
&self.cache
}
+
+ fn name(&self) -> &str {
+ "CopyExec"
+ }
}
struct CopyStream {
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index de532821..68dd773c 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -270,6 +270,10 @@ impl ExecutionPlan for ScanExec {
fn properties(&self) -> &PlanProperties {
&self.cache
}
+
+ fn name(&self) -> &str {
+ "ScanExec"
+ }
}
impl DisplayAs for ScanExec {
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 65de37c8..da534b02 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -208,7 +208,10 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
expr match {
case agg: AggregateExpression =>
agg.aggregateFunction match {
- case _: Min | _: Max | _: Count =>
+ // TODO add support for Count (this was removed when upgrading
+ // to DataFusion 40 because it is no longer a built-in window
function)
+ // https://github.com/apache/datafusion-comet/issues/645
+ case _: Min | _: Max =>
Some(agg)
case _ =>
withInfo(windowExpr, "Unsupported aggregate", expr)
diff --git a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
index 9cc4e7f7..e657af9b 100644
--- a/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
@@ -1438,7 +1438,7 @@ class CometExecSuite extends CometTestBase {
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> aqeEnabled) {
withParquetTable((0 until 10).map(i => (i, 10 - i)), "t1") { // TODO:
test nulls
val aggregateFunctions =
- List("COUNT(_1)", "MAX(_1)", "MIN(_1)") // TODO: Test all the
aggregates
+ List("MAX(_1)", "MIN(_1)") // TODO: Test all the aggregates
aggregateFunctions.foreach { function =>
val queries = Seq(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]