This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 7db9aa67 chore: Move remaining expressions to spark-expr crate + some 
minor refactoring (#1165)
7db9aa67 is described below

commit 7db9aa67ed0049bb25e783bc4ba991e53df34bbf
Author: Andy Grove <[email protected]>
AuthorDate: Thu Dec 12 14:51:19 2024 -0700

    chore: Move remaining expressions to spark-expr crate + some minor 
refactoring (#1165)
    
    * move CheckOverflow to spark-expr crate
    
    * move NegativeExpr to spark-expr crate
    
    * move UnboundColumn to spark-expr crate
    
    * move ExpandExec from execution::datafusion::operators to 
execution::operators
    
    * refactoring to remove datafusion subpackage
    
    * update imports in benches
    
    * fix
    
    * fix
---
 native/Cargo.lock                                  |  1 +
 native/core/benches/bloom_filter_agg.rs            |  2 +-
 native/core/benches/shuffle_writer.rs              |  2 +-
 .../src/execution/datafusion/expressions/mod.rs    | 35 ----------------------
 .../core/src/execution/datafusion/operators/mod.rs | 18 -----------
 .../expressions/bloom_filter_agg.rs                |  4 +--
 .../expressions/bloom_filter_might_contain.rs      |  4 +--
 .../execution/{datafusion => expressions}/mod.rs   | 13 ++++----
 .../{datafusion => }/expressions/subquery.rs       |  0
 native/core/src/execution/jni_api.rs               |  6 ++--
 native/core/src/execution/metrics/utils.rs         |  2 +-
 native/core/src/execution/mod.rs                   |  5 +++-
 .../execution/{datafusion => }/operators/expand.rs | 10 +++----
 native/core/src/execution/operators/mod.rs         |  2 ++
 native/core/src/execution/operators/scan.rs        |  3 +-
 .../core/src/execution/{datafusion => }/planner.rs | 32 +++++++++-----------
 native/core/src/execution/shuffle/mod.rs           |  2 ++
 native/core/src/execution/shuffle/row.rs           |  2 +-
 .../{datafusion => shuffle}/shuffle_writer.rs      |  0
 .../src/execution/{datafusion => }/spark_plan.rs   |  0
 .../src/execution/{datafusion => }/util/mod.rs     |  0
 .../{datafusion => }/util/spark_bit_array.rs       |  0
 .../{datafusion => }/util/spark_bloom_filter.rs    |  4 +--
 native/spark-expr/Cargo.toml                       |  1 +
 .../src}/checkoverflow.rs                          |  0
 native/spark-expr/src/lib.rs                       | 12 ++++++++
 .../expressions => spark-expr/src}/negative.rs     |  7 ++---
 .../expressions => spark-expr/src}/unbound.rs      |  0
 28 files changed, 63 insertions(+), 104 deletions(-)

diff --git a/native/Cargo.lock b/native/Cargo.lock
index 9a8eab83..7966bb80 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -931,6 +931,7 @@ version = "0.5.0"
 dependencies = [
  "arrow",
  "arrow-array",
+ "arrow-buffer",
  "arrow-data",
  "arrow-schema",
  "chrono",
diff --git a/native/core/benches/bloom_filter_agg.rs 
b/native/core/benches/bloom_filter_agg.rs
index 25d27d17..b83ff3fa 100644
--- a/native/core/benches/bloom_filter_agg.rs
+++ b/native/core/benches/bloom_filter_agg.rs
@@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema};
 use arrow_array::builder::Int64Builder;
 use arrow_array::{ArrayRef, RecordBatch};
 use arrow_schema::SchemaRef;
-use 
comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg;
+use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg;
 use criterion::{black_box, criterion_group, criterion_main, Criterion};
 use datafusion::physical_expr::PhysicalExpr;
 use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
diff --git a/native/core/benches/shuffle_writer.rs 
b/native/core/benches/shuffle_writer.rs
index 6f287186..27288723 100644
--- a/native/core/benches/shuffle_writer.rs
+++ b/native/core/benches/shuffle_writer.rs
@@ -17,7 +17,7 @@
 
 use arrow_array::{builder::StringBuilder, RecordBatch};
 use arrow_schema::{DataType, Field, Schema};
-use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec;
+use comet::execution::shuffle::ShuffleWriterExec;
 use criterion::{criterion_group, criterion_main, Criterion};
 use datafusion::{
     physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
diff --git a/native/core/src/execution/datafusion/expressions/mod.rs 
b/native/core/src/execution/datafusion/expressions/mod.rs
deleted file mode 100644
index 5f9f322b..00000000
--- a/native/core/src/execution/datafusion/expressions/mod.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Native DataFusion expressions
-
-pub mod checkoverflow;
-
-use crate::errors::CometError;
-pub mod bloom_filter_agg;
-pub mod bloom_filter_might_contain;
-pub mod negative;
-pub mod subquery;
-pub mod unbound;
-
-pub use datafusion_comet_spark_expr::{EvalMode, SparkError};
-
-fn arithmetic_overflow_error(from_type: &str) -> CometError {
-    CometError::Spark(SparkError::ArithmeticOverflow {
-        from_type: from_type.to_string(),
-    })
-}
diff --git a/native/core/src/execution/datafusion/operators/mod.rs 
b/native/core/src/execution/datafusion/operators/mod.rs
deleted file mode 100644
index 3d28a266..00000000
--- a/native/core/src/execution/datafusion/operators/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod expand;
diff --git 
a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs 
b/native/core/src/execution/expressions/bloom_filter_agg.rs
similarity index 97%
rename from native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
rename to native/core/src/execution/expressions/bloom_filter_agg.rs
index 1300e08c..ea8bb364 100644
--- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
+++ b/native/core/src/execution/expressions/bloom_filter_agg.rs
@@ -19,8 +19,8 @@ use arrow_schema::Field;
 use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
 use std::{any::Any, sync::Arc};
 
-use crate::execution::datafusion::util::spark_bloom_filter;
-use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter;
+use crate::execution::util::spark_bloom_filter;
+use crate::execution::util::spark_bloom_filter::SparkBloomFilter;
 use arrow::array::ArrayRef;
 use arrow_array::BinaryArray;
 use datafusion::error::Result;
diff --git 
a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs
 b/native/core/src/execution/expressions/bloom_filter_might_contain.rs
similarity index 97%
rename from 
native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs
rename to native/core/src/execution/expressions/bloom_filter_might_contain.rs
index de922d83..af6a5a47 100644
--- 
a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs
+++ b/native/core/src/execution/expressions/bloom_filter_might_contain.rs
@@ -15,9 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{
-    execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, 
parquet::data_type::AsBytes,
-};
+use crate::{execution::util::spark_bloom_filter::SparkBloomFilter, 
parquet::data_type::AsBytes};
 use arrow::record_batch::RecordBatch;
 use arrow_array::cast::as_primitive_array;
 use arrow_schema::{DataType, Schema};
diff --git a/native/core/src/execution/datafusion/mod.rs 
b/native/core/src/execution/expressions/mod.rs
similarity index 83%
rename from native/core/src/execution/datafusion/mod.rs
rename to native/core/src/execution/expressions/mod.rs
index ca41fa0a..e2f811fa 100644
--- a/native/core/src/execution/datafusion/mod.rs
+++ b/native/core/src/execution/expressions/mod.rs
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Native execution through DataFusion
+//! Native DataFusion expressions
 
-pub mod expressions;
-mod operators;
-pub mod planner;
-pub mod shuffle_writer;
-pub(crate) mod spark_plan;
-mod util;
+pub mod bloom_filter_agg;
+pub mod bloom_filter_might_contain;
+pub mod subquery;
+
+pub use datafusion_comet_spark_expr::EvalMode;
diff --git a/native/core/src/execution/datafusion/expressions/subquery.rs 
b/native/core/src/execution/expressions/subquery.rs
similarity index 100%
rename from native/core/src/execution/datafusion/expressions/subquery.rs
rename to native/core/src/execution/expressions/subquery.rs
diff --git a/native/core/src/execution/jni_api.rs 
b/native/core/src/execution/jni_api.rs
index 5103f5ce..491b389c 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -45,8 +45,8 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
 use crate::{
     errors::{try_unwrap_or_throw, CometError, CometResult},
     execution::{
-        datafusion::planner::PhysicalPlanner, 
metrics::utils::update_comet_metric,
-        serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition, 
sort::RdxSort,
+        metrics::utils::update_comet_metric, planner::PhysicalPlanner, 
serde::to_arrow_datatype,
+        shuffle::row::process_sorted_row_partition, sort::RdxSort,
     },
     jvm_bridge::{jni_new_global_ref, JVMClasses},
 };
@@ -59,8 +59,8 @@ use jni::{
 };
 use tokio::runtime::Runtime;
 
-use crate::execution::datafusion::spark_plan::SparkPlan;
 use crate::execution::operators::ScanExec;
+use crate::execution::spark_plan::SparkPlan;
 use log::info;
 
 /// Comet native execution context. Kept alive across JNI calls.
diff --git a/native/core/src/execution/metrics/utils.rs 
b/native/core/src/execution/metrics/utils.rs
index 4bb1c447..0eb4b631 100644
--- a/native/core/src/execution/metrics/utils.rs
+++ b/native/core/src/execution/metrics/utils.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::execution::datafusion::spark_plan::SparkPlan;
+use crate::execution::spark_plan::SparkPlan;
 use crate::jvm_bridge::jni_new_global_ref;
 use crate::{
     errors::CometError,
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index 3dba747f..a74ec301 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -16,13 +16,16 @@
 // under the License.
 
 //! PoC of vectorization execution through JNI to Rust.
-pub mod datafusion;
+pub mod expressions;
 pub mod jni_api;
 mod metrics;
 pub mod operators;
+pub(crate) mod planner;
 pub mod serde;
 pub mod shuffle;
 pub(crate) mod sort;
+pub(crate) mod spark_plan;
+pub(crate) mod util;
 pub use datafusion_comet_spark_expr::timezone;
 pub(crate) mod utils;
 
diff --git a/native/core/src/execution/datafusion/operators/expand.rs 
b/native/core/src/execution/operators/expand.rs
similarity index 97%
rename from native/core/src/execution/datafusion/operators/expand.rs
rename to native/core/src/execution/operators/expand.rs
index a3dd0650..fb43a6e4 100644
--- a/native/core/src/execution/datafusion/operators/expand.rs
+++ b/native/core/src/execution/operators/expand.rs
@@ -37,14 +37,14 @@ use std::{
 /// A Comet native operator that expands a single row into multiple rows. This 
behaves as same as
 /// Spark Expand operator.
 #[derive(Debug)]
-pub struct CometExpandExec {
+pub struct ExpandExec {
     projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
     child: Arc<dyn ExecutionPlan>,
     schema: SchemaRef,
     cache: PlanProperties,
 }
 
-impl CometExpandExec {
+impl ExpandExec {
     /// Create a new ExpandExec
     pub fn new(
         projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
@@ -66,7 +66,7 @@ impl CometExpandExec {
     }
 }
 
-impl DisplayAs for CometExpandExec {
+impl DisplayAs for ExpandExec {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
@@ -87,7 +87,7 @@ impl DisplayAs for CometExpandExec {
     }
 }
 
-impl ExecutionPlan for CometExpandExec {
+impl ExecutionPlan for ExpandExec {
     fn as_any(&self) -> &dyn Any {
         self
     }
@@ -104,7 +104,7 @@ impl ExecutionPlan for CometExpandExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
-        let new_expand = CometExpandExec::new(
+        let new_expand = ExpandExec::new(
             self.projections.clone(),
             Arc::clone(&children[0]),
             Arc::clone(&self.schema),
diff --git a/native/core/src/execution/operators/mod.rs 
b/native/core/src/execution/operators/mod.rs
index bdc233e9..4e15e434 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -27,6 +27,8 @@ pub use filter::FilterExec;
 pub use scan::*;
 
 mod copy;
+mod expand;
+pub use expand::ExpandExec;
 mod filter;
 mod scan;
 
diff --git a/native/core/src/execution/operators/scan.rs 
b/native/core/src/execution/operators/scan.rs
index 0d35859d..a297f87c 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -18,8 +18,7 @@
 use crate::{
     errors::CometError,
     execution::{
-        datafusion::planner::TEST_EXEC_CONTEXT_ID, operators::ExecutionError,
-        utils::SparkArrowConvert,
+        operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID, 
utils::SparkArrowConvert,
     },
     jvm_bridge::{jni_call, JVMClasses},
 };
diff --git a/native/core/src/execution/datafusion/planner.rs 
b/native/core/src/execution/planner.rs
similarity index 98%
rename from native/core/src/execution/datafusion/planner.rs
rename to native/core/src/execution/planner.rs
index 0e64ed6a..3ac830c0 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -22,17 +22,13 @@ use crate::execution::operators::{CopyMode, FilterExec};
 use crate::{
     errors::ExpressionError,
     execution::{
-        datafusion::{
-            expressions::{
-                bloom_filter_agg::BloomFilterAgg,
-                bloom_filter_might_contain::BloomFilterMightContain, 
checkoverflow::CheckOverflow,
-                negative, subquery::Subquery, unbound::UnboundColumn,
-            },
-            operators::expand::CometExpandExec,
-            shuffle_writer::ShuffleWriterExec,
+        expressions::{
+            bloom_filter_agg::BloomFilterAgg, 
bloom_filter_might_contain::BloomFilterMightContain,
+            subquery::Subquery,
         },
-        operators::{CopyExec, ExecutionError, ScanExec},
+        operators::{CopyExec, ExecutionError, ExpandExec, ScanExec},
         serde::to_arrow_datatype,
+        shuffle::ShuffleWriterExec,
     },
 };
 use arrow::compute::CastOptions;
@@ -68,11 +64,11 @@ use datafusion::{
     },
     prelude::SessionContext,
 };
-use datafusion_comet_spark_expr::create_comet_physical_fun;
+use datafusion_comet_spark_expr::{create_comet_physical_fun, 
create_negate_expr};
 use datafusion_functions_nested::concat::ArrayAppend;
 use datafusion_physical_expr::aggregate::{AggregateExprBuilder, 
AggregateFunctionExpr};
 
-use crate::execution::datafusion::spark_plan::SparkPlan;
+use crate::execution::spark_plan::SparkPlan;
 use datafusion_comet_proto::{
     spark_expression::{
         self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct, 
literal::Value, AggExpr,
@@ -86,11 +82,11 @@ use datafusion_comet_proto::{
     spark_partitioning::{partitioning::PartitioningStruct, Partitioning as 
SparkPartitioning},
 };
 use datafusion_comet_spark_expr::{
-    ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation, 
Covariance,
-    CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields, 
GetStructField, HourExpr,
-    IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike, 
SecondExpr,
+    ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow, 
Contains, Correlation,
+    Covariance, CreateNamedStruct, DateTruncExpr, EndsWith, 
GetArrayStructFields, GetStructField,
+    HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, 
RLike, SecondExpr,
     SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr, 
SumDecimal,
-    TimestampTruncExpr, ToJson, Variance,
+    TimestampTruncExpr, ToJson, UnboundColumn, Variance,
 };
 use datafusion_common::scalar::ScalarStructBuilder;
 use datafusion_common::{
@@ -611,7 +607,7 @@ impl PhysicalPlanner {
             ExprStruct::UnaryMinus(expr) => {
                 let child: Arc<dyn PhysicalExpr> =
                     self.create_expr(expr.child.as_ref().unwrap(), 
Arc::clone(&input_schema))?;
-                let result = negative::create_negate_expr(child, 
expr.fail_on_error);
+                let result = create_negate_expr(child, expr.fail_on_error);
                 result.map_err(|e| ExecutionError::GeneralError(e.to_string()))
             }
             ExprStruct::NormalizeNanAndZero(expr) => {
@@ -1118,7 +1114,7 @@ impl PhysicalPlanner {
                 } else {
                     Arc::clone(&child.native_plan)
                 };
-                let expand = Arc::new(CometExpandExec::new(projections, input, 
schema));
+                let expand = Arc::new(ExpandExec::new(projections, input, 
schema));
                 Ok((
                     scans,
                     Arc::new(SparkPlan::new(spark_plan.plan_id, expand, 
vec![child])),
@@ -2270,7 +2266,7 @@ mod tests {
     use datafusion::{physical_plan::common::collect, prelude::SessionContext};
     use tokio::sync::mpsc;
 
-    use crate::execution::{datafusion::planner::PhysicalPlanner, 
operators::InputBatch};
+    use crate::execution::{operators::InputBatch, planner::PhysicalPlanner};
 
     use crate::execution::operators::ExecutionError;
     use datafusion_comet_proto::{
diff --git a/native/core/src/execution/shuffle/mod.rs 
b/native/core/src/execution/shuffle/mod.rs
index b052df29..8721ead7 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -18,3 +18,5 @@
 mod list;
 mod map;
 pub mod row;
+mod shuffle_writer;
+pub use shuffle_writer::ShuffleWriterExec;
diff --git a/native/core/src/execution/shuffle/row.rs 
b/native/core/src/execution/shuffle/row.rs
index 17b180e9..ce752e68 100644
--- a/native/core/src/execution/shuffle/row.rs
+++ b/native/core/src/execution/shuffle/row.rs
@@ -20,10 +20,10 @@
 use crate::{
     errors::CometError,
     execution::{
-        datafusion::shuffle_writer::{write_ipc_compressed, Checksum},
         shuffle::{
             list::{append_list_element, SparkUnsafeArray},
             map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap},
+            shuffle_writer::{write_ipc_compressed, Checksum},
         },
         utils::bytes_to_i128,
     },
diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
similarity index 100%
rename from native/core/src/execution/datafusion/shuffle_writer.rs
rename to native/core/src/execution/shuffle/shuffle_writer.rs
diff --git a/native/core/src/execution/datafusion/spark_plan.rs 
b/native/core/src/execution/spark_plan.rs
similarity index 100%
rename from native/core/src/execution/datafusion/spark_plan.rs
rename to native/core/src/execution/spark_plan.rs
diff --git a/native/core/src/execution/datafusion/util/mod.rs 
b/native/core/src/execution/util/mod.rs
similarity index 100%
rename from native/core/src/execution/datafusion/util/mod.rs
rename to native/core/src/execution/util/mod.rs
diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs 
b/native/core/src/execution/util/spark_bit_array.rs
similarity index 100%
rename from native/core/src/execution/datafusion/util/spark_bit_array.rs
rename to native/core/src/execution/util/spark_bit_array.rs
diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs 
b/native/core/src/execution/util/spark_bloom_filter.rs
similarity index 98%
rename from native/core/src/execution/datafusion/util/spark_bloom_filter.rs
rename to native/core/src/execution/util/spark_bloom_filter.rs
index 35fa23b4..2c3af169 100644
--- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs
+++ b/native/core/src/execution/util/spark_bloom_filter.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::execution::datafusion::util::spark_bit_array;
-use crate::execution::datafusion::util::spark_bit_array::SparkBitArray;
+use crate::execution::util::spark_bit_array;
+use crate::execution::util::spark_bit_array::SparkBitArray;
 use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array};
 use arrow_buffer::ToByteSlice;
 use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash;
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index 65517431..d0bc2fd9 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -29,6 +29,7 @@ edition = { workspace = true }
 [dependencies]
 arrow = { workspace = true }
 arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
 arrow-data = { workspace = true }
 arrow-schema = { workspace = true }
 chrono = { workspace = true }
diff --git a/native/core/src/execution/datafusion/expressions/checkoverflow.rs 
b/native/spark-expr/src/checkoverflow.rs
similarity index 100%
rename from native/core/src/execution/datafusion/expressions/checkoverflow.rs
rename to native/spark-expr/src/checkoverflow.rs
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 5dff6e0b..8a574805 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -29,6 +29,8 @@ mod bitwise_not;
 pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
 mod avg_decimal;
 pub use avg_decimal::AvgDecimal;
+mod checkoverflow;
+pub use checkoverflow::CheckOverflow;
 mod correlation;
 pub use correlation::Correlation;
 mod covariance;
@@ -45,10 +47,14 @@ pub use stddev::Stddev;
 mod structs;
 mod sum_decimal;
 pub use sum_decimal::SumDecimal;
+mod negative;
+pub use negative::{create_negate_expr, NegativeExpr};
 mod normalize_nan;
 mod temporal;
 pub mod timezone;
 mod to_json;
+mod unbound;
+pub use unbound::UnboundColumn;
 pub mod utils;
 pub use normalize_nan::NormalizeNaNAndZero;
 
@@ -83,3 +89,9 @@ pub enum EvalMode {
     /// failing the entire query.
     Try,
 }
+
+pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError {
+    SparkError::ArithmeticOverflow {
+        from_type: from_type.to_string(),
+    }
+}
diff --git a/native/core/src/execution/datafusion/expressions/negative.rs 
b/native/spark-expr/src/negative.rs
similarity index 98%
rename from native/core/src/execution/datafusion/expressions/negative.rs
rename to native/spark-expr/src/negative.rs
index 8dfe7174..3d9063e7 100644
--- a/native/core/src/execution/datafusion/expressions/negative.rs
+++ b/native/spark-expr/src/negative.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use super::arithmetic_overflow_error;
-use crate::errors::CometError;
+use crate::SparkError;
 use arrow::{compute::kernels::numeric::neg_wrapping, 
datatypes::IntervalDayTimeType};
 use arrow_array::RecordBatch;
 use arrow_buffer::IntervalDayTime;
@@ -26,8 +26,7 @@ use datafusion::{
     logical_expr::{interval_arithmetic::Interval, ColumnarValue},
     physical_expr::PhysicalExpr,
 };
-use datafusion_comet_spark_expr::SparkError;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::sort_properties::ExprProperties;
 use std::{
     any::Any,
@@ -38,7 +37,7 @@ use std::{
 pub fn create_negate_expr(
     expr: Arc<dyn PhysicalExpr>,
     fail_on_error: bool,
-) -> Result<Arc<dyn PhysicalExpr>, CometError> {
+) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
     Ok(Arc::new(NegativeExpr::new(expr, fail_on_error)))
 }
 
diff --git a/native/core/src/execution/datafusion/expressions/unbound.rs 
b/native/spark-expr/src/unbound.rs
similarity index 100%
rename from native/core/src/execution/datafusion/expressions/unbound.rs
rename to native/spark-expr/src/unbound.rs


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to