This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 49cf0d7f chore: Move string kernels and expressions to spark-expr
crate (#1164)
49cf0d7f is described below
commit 49cf0d7f32813b6cb5bd3999378c960bd1171fca
Author: Andy Grove <[email protected]>
AuthorDate: Thu Dec 12 07:45:06 2024 -0700
chore: Move string kernels and expressions to spark-expr crate (#1164)
* Move string kernels and expressions to spark-expr crate
* remove unused hash kernel
* remove unused dependencies
---
native/Cargo.lock | 2 -
native/core/Cargo.toml | 6 -
native/core/benches/hash.rs | 137 ---------------
.../src/execution/datafusion/expressions/mod.rs | 1 -
native/core/src/execution/datafusion/planner.rs | 15 +-
native/core/src/execution/kernels/hash.rs | 187 ---------------------
native/core/src/execution/kernels/mod.rs | 23 ---
native/core/src/execution/mod.rs | 3 -
native/spark-expr/src/kernels/mod.rs | 1 +
.../src}/kernels/strings.rs | 7 +-
native/spark-expr/src/lib.rs | 2 +
.../expressions => spark-expr/src}/strings.rs | 2 +-
12 files changed, 13 insertions(+), 373 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 67d041a3..9a8eab83 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -871,7 +871,6 @@ dependencies = [
name = "datafusion-comet"
version = "0.5.0"
dependencies = [
- "ahash",
"arrow",
"arrow-array",
"arrow-buffer",
@@ -893,7 +892,6 @@ dependencies = [
"datafusion-physical-expr",
"flate2",
"futures",
- "half",
"hex",
"itertools 0.11.0",
"jni",
diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml
index 4b9753ec..489da46d 100644
--- a/native/core/Cargo.toml
+++ b/native/core/Cargo.toml
@@ -41,7 +41,6 @@ arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
parquet = { workspace = true, default-features = false, features =
["experimental"] }
-half = { version = "2.4.1", default-features = false }
futures = { workspace = true }
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
@@ -62,7 +61,6 @@ rand = { workspace = true}
num = { workspace = true }
bytes = "1.5.0"
tempfile = "3.8.0"
-ahash = { version = "0.8", default-features = false }
itertools = "0.11.0"
paste = "1.0.14"
datafusion-common = { workspace = true }
@@ -102,10 +100,6 @@ harness = false
name = "bit_util"
harness = false
-[[bench]]
-name = "hash"
-harness = false
-
[[bench]]
name = "row_columnar"
harness = false
diff --git a/native/core/benches/hash.rs b/native/core/benches/hash.rs
deleted file mode 100644
index 039a6d5d..00000000
--- a/native/core/benches/hash.rs
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[path = "common.rs"]
-mod common;
-
-use arrow_array::ArrayRef;
-use comet::execution::kernels::hash;
-use common::*;
-use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
-use datafusion_comet_spark_expr::scalar_funcs::{spark_murmur3_hash,
spark_xxhash64};
-use datafusion_common::ScalarValue;
-use datafusion_expr::ColumnarValue;
-use std::sync::Arc;
-
-const BATCH_SIZE: usize = 1024 * 8;
-const NUM_ITER: usize = 10;
-const NULL_FRACTION: f32 = 0.1;
-
-fn criterion_benchmark(c: &mut Criterion) {
- let mut group = c.benchmark_group("hash");
-
- let a1: ArrayRef = Arc::new(create_int64_array(BATCH_SIZE, 0.0, 0,
BATCH_SIZE as i64));
- let a2: ArrayRef = Arc::new(create_int64_array(BATCH_SIZE, 0.0, 0,
BATCH_SIZE as i64));
- let a3: ArrayRef = Arc::new(create_int64_array(
- BATCH_SIZE,
- NULL_FRACTION,
- 0,
- BATCH_SIZE as i64,
- ));
- let a4: ArrayRef = Arc::new(create_int64_array(
- BATCH_SIZE,
- NULL_FRACTION,
- 0,
- BATCH_SIZE as i64,
- ));
-
- group.bench_function(
- BenchmarkId::new("hash_i64_single_nonnull", BATCH_SIZE),
- |b| {
- let input = vec![a1.clone()];
- let mut dst = vec![0; BATCH_SIZE];
-
- b.iter(|| {
- for _ in 0..NUM_ITER {
- hash(&input, &mut dst);
- }
- });
- },
- );
- group.bench_function(BenchmarkId::new("hash_i64_single_null", BATCH_SIZE),
|b| {
- let input = vec![a3.clone()];
- let mut dst = vec![0; BATCH_SIZE];
-
- b.iter(|| {
- for _ in 0..NUM_ITER {
- hash(&input, &mut dst);
- }
- });
- });
- group.bench_function(
- BenchmarkId::new("hash_i64_multiple_nonnull", BATCH_SIZE),
- |b| {
- let input = vec![a1.clone(), a2.clone()];
- let mut dst = vec![0; BATCH_SIZE];
-
- b.iter(|| {
- for _ in 0..NUM_ITER {
- hash(&input, &mut dst);
- }
- });
- },
- );
- group.bench_function(
- BenchmarkId::new("hash_i64_multiple_null", BATCH_SIZE),
- |b| {
- let input = vec![a3.clone(), a4.clone()];
- let mut dst = vec![0; BATCH_SIZE];
-
- b.iter(|| {
- for _ in 0..NUM_ITER {
- hash(&input, &mut dst);
- }
- });
- },
- );
- group.bench_function(BenchmarkId::new("xxhash64", BATCH_SIZE), |b| {
- let inputs = &[
- ColumnarValue::Array(a3.clone()),
- ColumnarValue::Array(a4.clone()),
- ColumnarValue::Scalar(ScalarValue::Int64(Some(42i64))),
- ];
-
- b.iter(|| {
- for _ in 0..NUM_ITER {
- spark_xxhash64(inputs).unwrap();
- }
- });
- });
- group.bench_function(BenchmarkId::new("murmur3", BATCH_SIZE), |b| {
- let inputs = &[
- ColumnarValue::Array(a3.clone()),
- ColumnarValue::Array(a4.clone()),
- ColumnarValue::Scalar(ScalarValue::Int32(Some(42))),
- ];
- b.iter(|| {
- for _ in 0..NUM_ITER {
- spark_murmur3_hash(inputs).unwrap();
- }
- });
- });
-}
-
-fn config() -> Criterion {
- Criterion::default()
-}
-
-criterion_group! {
- name = benches;
- config = config();
- targets = criterion_benchmark
-}
-criterion_main!(benches);
diff --git a/native/core/src/execution/datafusion/expressions/mod.rs
b/native/core/src/execution/datafusion/expressions/mod.rs
index 2bb14df3..5f9f322b 100644
--- a/native/core/src/execution/datafusion/expressions/mod.rs
+++ b/native/core/src/execution/datafusion/expressions/mod.rs
@@ -23,7 +23,6 @@ use crate::errors::CometError;
pub mod bloom_filter_agg;
pub mod bloom_filter_might_contain;
pub mod negative;
-pub mod strings;
pub mod subquery;
pub mod unbound;
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index 5e77b3f6..0e64ed6a 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -25,12 +25,8 @@ use crate::{
datafusion::{
expressions::{
bloom_filter_agg::BloomFilterAgg,
- bloom_filter_might_contain::BloomFilterMightContain,
- checkoverflow::CheckOverflow,
- negative,
- strings::{Contains, EndsWith, Like, StartsWith,
StringSpaceExpr, SubstringExpr},
- subquery::Subquery,
- unbound::UnboundColumn,
+ bloom_filter_might_contain::BloomFilterMightContain,
checkoverflow::CheckOverflow,
+ negative, subquery::Subquery, unbound::UnboundColumn,
},
operators::expand::CometExpandExec,
shuffle_writer::ShuffleWriterExec,
@@ -90,9 +86,10 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as
SparkPartitioning},
};
use datafusion_comet_spark_expr::{
- ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Correlation,
Covariance, CreateNamedStruct,
- DateTruncExpr, GetArrayStructFields, GetStructField, HourExpr, IfExpr,
ListExtract, MinuteExpr,
- NormalizeNaNAndZero, RLike, SecondExpr, SparkCastOptions, Stddev,
SumDecimal,
+ ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation,
Covariance,
+ CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields,
GetStructField, HourExpr,
+ IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike,
SecondExpr,
+ SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr,
SumDecimal,
TimestampTruncExpr, ToJson, Variance,
};
use datafusion_common::scalar::ScalarStructBuilder;
diff --git a/native/core/src/execution/kernels/hash.rs
b/native/core/src/execution/kernels/hash.rs
deleted file mode 100644
index b39fd622..00000000
--- a/native/core/src/execution/kernels/hash.rs
+++ /dev/null
@@ -1,187 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use crate::common::bit;
-use ahash::RandomState;
-use arrow::datatypes::{i256, ArrowNativeType};
-use arrow_array::{
- downcast_dictionary_array, downcast_primitive_array, Array, ArrayAccessor,
ArrayRef,
- ArrowPrimitiveType, PrimitiveArray,
-};
-use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
-use std::fmt::Debug;
-
-pub fn hash(src: &[ArrayRef], dst: &mut [u64]) {
- let state = RandomState::with_seed(42);
- src.iter().enumerate().for_each(|(idx, v)| {
- downcast_dictionary_array!(
- v => {
- let keys = v.keys();
- let values = v.values();
- downcast_primitive_array!(
- values => hash_dict_typed(&state, idx > 0, keys, values,
dst),
- dt => panic!("Expected only primitive type but found {}",
dt)
- )
- },
- dt => {
- downcast_primitive_array!(
- v => hash_typed(&state, idx > 0, v, dst),
- _ => panic!("Expected only primitive type but found {}",
dt)
- )
- }
- )
- });
-}
-
-fn hash_typed<T>(state: &RandomState, mix: bool, array: T, dst: &mut [u64])
-where
- T: ArrayAccessor,
- T::Item: Hashable + Debug,
-{
- let nullable = array.null_count() > 0;
- let num_values = array.len();
- if nullable {
- for i in 0..num_values {
- if !array.is_null(i) {
- unsafe {
- let value = array.value_unchecked(i);
- hash1(state, mix, i, value, dst);
- }
- }
- }
- } else {
- for i in 0..num_values {
- unsafe {
- let value = array.value_unchecked(i);
- hash1(state, mix, i, value, dst);
- }
- }
- }
-}
-
-fn hash_dict_typed<K, V>(
- state: &RandomState,
- mix: bool,
- keys: &PrimitiveArray<K>,
- values: V,
- dst: &mut [u64],
-) where
- K: ArrowPrimitiveType,
- V: ArrayAccessor,
- V::Item: Hashable + Debug,
-{
- let nullable = keys.null_count() > 0;
- let num_keys = keys.len();
- let mut value_hashes = vec![0; values.len()];
-
- for (i, value_hash) in value_hashes.iter_mut().enumerate() {
- unsafe {
- *value_hash = values.value_unchecked(i).create_hash(state);
- }
- }
-
- if nullable {
- for i in 0..num_keys {
- if !keys.is_null(i) {
- unsafe {
- let idx = keys.value_unchecked(i);
- let hash = value_hashes[idx.as_usize()];
- hash1_helper(mix, i, hash, dst);
- }
- }
- }
- } else {
- for i in 0..num_keys {
- unsafe {
- let idx = keys.value_unchecked(i);
- let hash = value_hashes[idx.as_usize()];
- hash1_helper(mix, i, hash, dst);
- }
- }
- }
-}
-
-#[inline(always)]
-fn hash1<T: Hashable>(state: &RandomState, mix: bool, i: usize, value: T, dst:
&mut [u64]) {
- let hash = value.create_hash(state);
- hash1_helper(mix, i, hash, dst);
-}
-
-#[inline(always)]
-fn hash1_helper(mix: bool, i: usize, hash: u64, dst: &mut [u64]) {
- dst[i] = if mix {
- bit::mix_hash(dst[i], hash)
- } else {
- hash
- }
-}
-
-pub(crate) trait Hashable {
- fn create_hash(&self, state: &RandomState) -> u64;
-}
-
-macro_rules! impl_hashable {
- ($($t:ty),+) => {
- $(impl Hashable for $t {
- #[inline]
- fn create_hash(&self, state: &RandomState) -> u64 {
- state.hash_one(self)
- }
- })+
- };
-}
-
-impl_hashable!(i8, i16, i32, u8, u16, u32, u64, i128, i256);
-
-impl Hashable for i64 {
- fn create_hash(&self, state: &RandomState) -> u64 {
- state.hash_one(self)
- }
-}
-
-impl Hashable for half::f16 {
- fn create_hash(&self, _: &RandomState) -> u64 {
- unimplemented!("hashing on f16 is not supported")
- }
-}
-
-impl Hashable for f32 {
- fn create_hash(&self, state: &RandomState) -> u64 {
- state.hash_one(u32::from_ne_bytes(self.to_ne_bytes()))
- }
-}
-
-impl Hashable for f64 {
- fn create_hash(&self, state: &RandomState) -> u64 {
- state.hash_one(u64::from_ne_bytes(self.to_ne_bytes()))
- }
-}
-
-impl Hashable for IntervalDayTime {
- fn create_hash(&self, state: &RandomState) -> u64 {
- state.hash_one(self.days);
- state.hash_one(self.milliseconds)
- }
-}
-
-impl Hashable for IntervalMonthDayNano {
- fn create_hash(&self, state: &RandomState) -> u64 {
- state.hash_one(self.months);
- state.hash_one(self.days);
- state.hash_one(self.nanoseconds)
- }
-}
diff --git a/native/core/src/execution/kernels/mod.rs
b/native/core/src/execution/kernels/mod.rs
deleted file mode 100644
index 675dcd48..00000000
--- a/native/core/src/execution/kernels/mod.rs
+++ /dev/null
@@ -1,23 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Kernels
-
-mod hash;
-pub use hash::hash;
-
-pub(crate) mod strings;
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index f1793570..3dba747f 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -18,9 +18,6 @@
//! PoC of vectorization execution through JNI to Rust.
pub mod datafusion;
pub mod jni_api;
-
-pub mod kernels; // for benchmarking
-
mod metrics;
pub mod operators;
pub mod serde;
diff --git a/native/spark-expr/src/kernels/mod.rs
b/native/spark-expr/src/kernels/mod.rs
index 88aa34b1..3669ff13 100644
--- a/native/spark-expr/src/kernels/mod.rs
+++ b/native/spark-expr/src/kernels/mod.rs
@@ -17,4 +17,5 @@
//! Kernels
+pub mod strings;
pub(crate) mod temporal;
diff --git a/native/core/src/execution/kernels/strings.rs
b/native/spark-expr/src/kernels/strings.rs
similarity index 96%
rename from native/core/src/execution/kernels/strings.rs
rename to native/spark-expr/src/kernels/strings.rs
index d63b2c47..bb275fbb 100644
--- a/native/core/src/execution/kernels/strings.rs
+++ b/native/spark-expr/src/kernels/strings.rs
@@ -25,15 +25,14 @@ use arrow::{
compute::kernels::substring::{substring as arrow_substring,
substring_by_char},
datatypes::{DataType, Int32Type},
};
-
-use crate::errors::ExpressionError;
+use datafusion_common::DataFusionError;
/// Returns an ArrayRef with a string consisting of `length` spaces.
///
/// # Preconditions
///
/// - elements in `length` must not be negative
-pub fn string_space(length: &dyn Array) -> Result<ArrayRef, ExpressionError> {
+pub fn string_space(length: &dyn Array) -> Result<ArrayRef, DataFusionError> {
match length.data_type() {
DataType::Int32 => {
let array = length.as_any().downcast_ref::<Int32Array>().unwrap();
@@ -52,7 +51,7 @@ pub fn string_space(length: &dyn Array) -> Result<ArrayRef,
ExpressionError> {
}
}
-pub fn substring(array: &dyn Array, start: i64, length: u64) ->
Result<ArrayRef, ExpressionError> {
+pub fn substring(array: &dyn Array, start: i64, length: u64) ->
Result<ArrayRef, DataFusionError> {
match array.data_type() {
DataType::LargeUtf8 => substring_by_char(
array
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 15f446ef..5dff6e0b 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -33,6 +33,8 @@ mod correlation;
pub use correlation::Correlation;
mod covariance;
pub use covariance::Covariance;
+mod strings;
+pub use strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr,
SubstringExpr};
mod kernels;
mod list;
mod regexp;
diff --git a/native/core/src/execution/datafusion/expressions/strings.rs
b/native/spark-expr/src/strings.rs
similarity index 99%
rename from native/core/src/execution/datafusion/expressions/strings.rs
rename to native/spark-expr/src/strings.rs
index 200b4ec5..a8aab6ae 100644
--- a/native/core/src/execution/datafusion/expressions/strings.rs
+++ b/native/spark-expr/src/strings.rs
@@ -17,7 +17,7 @@
#![allow(deprecated)]
-use crate::execution::kernels::strings::{string_space, substring};
+use crate::kernels::strings::{string_space, substring};
use arrow::{
compute::{
contains_dyn, contains_utf8_scalar_dyn, ends_with_dyn,
ends_with_utf8_scalar_dyn, like_dyn,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]