This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 7db9aa67 chore: Move remaining expressions to spark-expr crate + some
minor refactoring (#1165)
7db9aa67 is described below
commit 7db9aa67ed0049bb25e783bc4ba991e53df34bbf
Author: Andy Grove <[email protected]>
AuthorDate: Thu Dec 12 14:51:19 2024 -0700
chore: Move remaining expressions to spark-expr crate + some minor
refactoring (#1165)
* move CheckOverflow to spark-expr crate
* move NegativeExpr to spark-expr crate
* move UnboundColumn to spark-expr crate
* move ExpandExec from execution::datafusion::operators to
execution::operators
* refactoring to remove datafusion subpackage
* update imports in benches
* fix
* fix
---
native/Cargo.lock | 1 +
native/core/benches/bloom_filter_agg.rs | 2 +-
native/core/benches/shuffle_writer.rs | 2 +-
.../src/execution/datafusion/expressions/mod.rs | 35 ----------------------
.../core/src/execution/datafusion/operators/mod.rs | 18 -----------
.../expressions/bloom_filter_agg.rs | 4 +--
.../expressions/bloom_filter_might_contain.rs | 4 +--
.../execution/{datafusion => expressions}/mod.rs | 13 ++++----
.../{datafusion => }/expressions/subquery.rs | 0
native/core/src/execution/jni_api.rs | 6 ++--
native/core/src/execution/metrics/utils.rs | 2 +-
native/core/src/execution/mod.rs | 5 +++-
.../execution/{datafusion => }/operators/expand.rs | 10 +++----
native/core/src/execution/operators/mod.rs | 2 ++
native/core/src/execution/operators/scan.rs | 3 +-
.../core/src/execution/{datafusion => }/planner.rs | 32 +++++++++-----------
native/core/src/execution/shuffle/mod.rs | 2 ++
native/core/src/execution/shuffle/row.rs | 2 +-
.../{datafusion => shuffle}/shuffle_writer.rs | 0
.../src/execution/{datafusion => }/spark_plan.rs | 0
.../src/execution/{datafusion => }/util/mod.rs | 0
.../{datafusion => }/util/spark_bit_array.rs | 0
.../{datafusion => }/util/spark_bloom_filter.rs | 4 +--
native/spark-expr/Cargo.toml | 1 +
.../src}/checkoverflow.rs | 0
native/spark-expr/src/lib.rs | 12 ++++++++
.../expressions => spark-expr/src}/negative.rs | 7 ++---
.../expressions => spark-expr/src}/unbound.rs | 0
28 files changed, 63 insertions(+), 104 deletions(-)
diff --git a/native/Cargo.lock b/native/Cargo.lock
index 9a8eab83..7966bb80 100644
--- a/native/Cargo.lock
+++ b/native/Cargo.lock
@@ -931,6 +931,7 @@ version = "0.5.0"
dependencies = [
"arrow",
"arrow-array",
+ "arrow-buffer",
"arrow-data",
"arrow-schema",
"chrono",
diff --git a/native/core/benches/bloom_filter_agg.rs
b/native/core/benches/bloom_filter_agg.rs
index 25d27d17..b83ff3fa 100644
--- a/native/core/benches/bloom_filter_agg.rs
+++ b/native/core/benches/bloom_filter_agg.rs
@@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::builder::Int64Builder;
use arrow_array::{ArrayRef, RecordBatch};
use arrow_schema::SchemaRef;
-use
comet::execution::datafusion::expressions::bloom_filter_agg::BloomFilterAgg;
+use comet::execution::expressions::bloom_filter_agg::BloomFilterAgg;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
diff --git a/native/core/benches/shuffle_writer.rs
b/native/core/benches/shuffle_writer.rs
index 6f287186..27288723 100644
--- a/native/core/benches/shuffle_writer.rs
+++ b/native/core/benches/shuffle_writer.rs
@@ -17,7 +17,7 @@
use arrow_array::{builder::StringBuilder, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
-use comet::execution::datafusion::shuffle_writer::ShuffleWriterExec;
+use comet::execution::shuffle::ShuffleWriterExec;
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion::{
physical_plan::{common::collect, memory::MemoryExec, ExecutionPlan},
diff --git a/native/core/src/execution/datafusion/expressions/mod.rs
b/native/core/src/execution/datafusion/expressions/mod.rs
deleted file mode 100644
index 5f9f322b..00000000
--- a/native/core/src/execution/datafusion/expressions/mod.rs
+++ /dev/null
@@ -1,35 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Native DataFusion expressions
-
-pub mod checkoverflow;
-
-use crate::errors::CometError;
-pub mod bloom_filter_agg;
-pub mod bloom_filter_might_contain;
-pub mod negative;
-pub mod subquery;
-pub mod unbound;
-
-pub use datafusion_comet_spark_expr::{EvalMode, SparkError};
-
-fn arithmetic_overflow_error(from_type: &str) -> CometError {
- CometError::Spark(SparkError::ArithmeticOverflow {
- from_type: from_type.to_string(),
- })
-}
diff --git a/native/core/src/execution/datafusion/operators/mod.rs
b/native/core/src/execution/datafusion/operators/mod.rs
deleted file mode 100644
index 3d28a266..00000000
--- a/native/core/src/execution/datafusion/operators/mod.rs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-pub mod expand;
diff --git
a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
b/native/core/src/execution/expressions/bloom_filter_agg.rs
similarity index 97%
rename from native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
rename to native/core/src/execution/expressions/bloom_filter_agg.rs
index 1300e08c..ea8bb364 100644
--- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
+++ b/native/core/src/execution/expressions/bloom_filter_agg.rs
@@ -19,8 +19,8 @@ use arrow_schema::Field;
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use std::{any::Any, sync::Arc};
-use crate::execution::datafusion::util::spark_bloom_filter;
-use crate::execution::datafusion::util::spark_bloom_filter::SparkBloomFilter;
+use crate::execution::util::spark_bloom_filter;
+use crate::execution::util::spark_bloom_filter::SparkBloomFilter;
use arrow::array::ArrayRef;
use arrow_array::BinaryArray;
use datafusion::error::Result;
diff --git
a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs
b/native/core/src/execution/expressions/bloom_filter_might_contain.rs
similarity index 97%
rename from
native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs
rename to native/core/src/execution/expressions/bloom_filter_might_contain.rs
index de922d83..af6a5a47 100644
---
a/native/core/src/execution/datafusion/expressions/bloom_filter_might_contain.rs
+++ b/native/core/src/execution/expressions/bloom_filter_might_contain.rs
@@ -15,9 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::{
- execution::datafusion::util::spark_bloom_filter::SparkBloomFilter,
parquet::data_type::AsBytes,
-};
+use crate::{execution::util::spark_bloom_filter::SparkBloomFilter,
parquet::data_type::AsBytes};
use arrow::record_batch::RecordBatch;
use arrow_array::cast::as_primitive_array;
use arrow_schema::{DataType, Schema};
diff --git a/native/core/src/execution/datafusion/mod.rs
b/native/core/src/execution/expressions/mod.rs
similarity index 83%
rename from native/core/src/execution/datafusion/mod.rs
rename to native/core/src/execution/expressions/mod.rs
index ca41fa0a..e2f811fa 100644
--- a/native/core/src/execution/datafusion/mod.rs
+++ b/native/core/src/execution/expressions/mod.rs
@@ -15,11 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-//! Native execution through DataFusion
+//! Native DataFusion expressions
-pub mod expressions;
-mod operators;
-pub mod planner;
-pub mod shuffle_writer;
-pub(crate) mod spark_plan;
-mod util;
+pub mod bloom_filter_agg;
+pub mod bloom_filter_might_contain;
+pub mod subquery;
+
+pub use datafusion_comet_spark_expr::EvalMode;
diff --git a/native/core/src/execution/datafusion/expressions/subquery.rs
b/native/core/src/execution/expressions/subquery.rs
similarity index 100%
rename from native/core/src/execution/datafusion/expressions/subquery.rs
rename to native/core/src/execution/expressions/subquery.rs
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 5103f5ce..491b389c 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -45,8 +45,8 @@ use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
use crate::{
errors::{try_unwrap_or_throw, CometError, CometResult},
execution::{
- datafusion::planner::PhysicalPlanner,
metrics::utils::update_comet_metric,
- serde::to_arrow_datatype, shuffle::row::process_sorted_row_partition,
sort::RdxSort,
+ metrics::utils::update_comet_metric, planner::PhysicalPlanner,
serde::to_arrow_datatype,
+ shuffle::row::process_sorted_row_partition, sort::RdxSort,
},
jvm_bridge::{jni_new_global_ref, JVMClasses},
};
@@ -59,8 +59,8 @@ use jni::{
};
use tokio::runtime::Runtime;
-use crate::execution::datafusion::spark_plan::SparkPlan;
use crate::execution::operators::ScanExec;
+use crate::execution::spark_plan::SparkPlan;
use log::info;
/// Comet native execution context. Kept alive across JNI calls.
diff --git a/native/core/src/execution/metrics/utils.rs
b/native/core/src/execution/metrics/utils.rs
index 4bb1c447..0eb4b631 100644
--- a/native/core/src/execution/metrics/utils.rs
+++ b/native/core/src/execution/metrics/utils.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::datafusion::spark_plan::SparkPlan;
+use crate::execution::spark_plan::SparkPlan;
use crate::jvm_bridge::jni_new_global_ref;
use crate::{
errors::CometError,
diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs
index 3dba747f..a74ec301 100644
--- a/native/core/src/execution/mod.rs
+++ b/native/core/src/execution/mod.rs
@@ -16,13 +16,16 @@
// under the License.
//! PoC of vectorization execution through JNI to Rust.
-pub mod datafusion;
+pub mod expressions;
pub mod jni_api;
mod metrics;
pub mod operators;
+pub(crate) mod planner;
pub mod serde;
pub mod shuffle;
pub(crate) mod sort;
+pub(crate) mod spark_plan;
+pub(crate) mod util;
pub use datafusion_comet_spark_expr::timezone;
pub(crate) mod utils;
diff --git a/native/core/src/execution/datafusion/operators/expand.rs
b/native/core/src/execution/operators/expand.rs
similarity index 97%
rename from native/core/src/execution/datafusion/operators/expand.rs
rename to native/core/src/execution/operators/expand.rs
index a3dd0650..fb43a6e4 100644
--- a/native/core/src/execution/datafusion/operators/expand.rs
+++ b/native/core/src/execution/operators/expand.rs
@@ -37,14 +37,14 @@ use std::{
/// A Comet native operator that expands a single row into multiple rows. This
behaves as same as
/// Spark Expand operator.
#[derive(Debug)]
-pub struct CometExpandExec {
+pub struct ExpandExec {
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
child: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
cache: PlanProperties,
}
-impl CometExpandExec {
+impl ExpandExec {
/// Create a new ExpandExec
pub fn new(
projections: Vec<Vec<Arc<dyn PhysicalExpr>>>,
@@ -66,7 +66,7 @@ impl CometExpandExec {
}
}
-impl DisplayAs for CometExpandExec {
+impl DisplayAs for ExpandExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) ->
std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
@@ -87,7 +87,7 @@ impl DisplayAs for CometExpandExec {
}
}
-impl ExecutionPlan for CometExpandExec {
+impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
}
@@ -104,7 +104,7 @@ impl ExecutionPlan for CometExpandExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
- let new_expand = CometExpandExec::new(
+ let new_expand = ExpandExec::new(
self.projections.clone(),
Arc::clone(&children[0]),
Arc::clone(&self.schema),
diff --git a/native/core/src/execution/operators/mod.rs
b/native/core/src/execution/operators/mod.rs
index bdc233e9..4e15e434 100644
--- a/native/core/src/execution/operators/mod.rs
+++ b/native/core/src/execution/operators/mod.rs
@@ -27,6 +27,8 @@ pub use filter::FilterExec;
pub use scan::*;
mod copy;
+mod expand;
+pub use expand::ExpandExec;
mod filter;
mod scan;
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index 0d35859d..a297f87c 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -18,8 +18,7 @@
use crate::{
errors::CometError,
execution::{
- datafusion::planner::TEST_EXEC_CONTEXT_ID, operators::ExecutionError,
- utils::SparkArrowConvert,
+ operators::ExecutionError, planner::TEST_EXEC_CONTEXT_ID,
utils::SparkArrowConvert,
},
jvm_bridge::{jni_call, JVMClasses},
};
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/planner.rs
similarity index 98%
rename from native/core/src/execution/datafusion/planner.rs
rename to native/core/src/execution/planner.rs
index 0e64ed6a..3ac830c0 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -22,17 +22,13 @@ use crate::execution::operators::{CopyMode, FilterExec};
use crate::{
errors::ExpressionError,
execution::{
- datafusion::{
- expressions::{
- bloom_filter_agg::BloomFilterAgg,
- bloom_filter_might_contain::BloomFilterMightContain,
checkoverflow::CheckOverflow,
- negative, subquery::Subquery, unbound::UnboundColumn,
- },
- operators::expand::CometExpandExec,
- shuffle_writer::ShuffleWriterExec,
+ expressions::{
+ bloom_filter_agg::BloomFilterAgg,
bloom_filter_might_contain::BloomFilterMightContain,
+ subquery::Subquery,
},
- operators::{CopyExec, ExecutionError, ScanExec},
+ operators::{CopyExec, ExecutionError, ExpandExec, ScanExec},
serde::to_arrow_datatype,
+ shuffle::ShuffleWriterExec,
},
};
use arrow::compute::CastOptions;
@@ -68,11 +64,11 @@ use datafusion::{
},
prelude::SessionContext,
};
-use datafusion_comet_spark_expr::create_comet_physical_fun;
+use datafusion_comet_spark_expr::{create_comet_physical_fun,
create_negate_expr};
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_physical_expr::aggregate::{AggregateExprBuilder,
AggregateFunctionExpr};
-use crate::execution::datafusion::spark_plan::SparkPlan;
+use crate::execution::spark_plan::SparkPlan;
use datafusion_comet_proto::{
spark_expression::{
self, agg_expr::ExprStruct as AggExprStruct, expr::ExprStruct,
literal::Value, AggExpr,
@@ -86,11 +82,11 @@ use datafusion_comet_proto::{
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as
SparkPartitioning},
};
use datafusion_comet_spark_expr::{
- ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, Contains, Correlation,
Covariance,
- CreateNamedStruct, DateTruncExpr, EndsWith, GetArrayStructFields,
GetStructField, HourExpr,
- IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero, RLike,
SecondExpr,
+ ArrayInsert, Avg, AvgDecimal, BitwiseNotExpr, Cast, CheckOverflow,
Contains, Correlation,
+ Covariance, CreateNamedStruct, DateTruncExpr, EndsWith,
GetArrayStructFields, GetStructField,
+ HourExpr, IfExpr, Like, ListExtract, MinuteExpr, NormalizeNaNAndZero,
RLike, SecondExpr,
SparkCastOptions, StartsWith, Stddev, StringSpaceExpr, SubstringExpr,
SumDecimal,
- TimestampTruncExpr, ToJson, Variance,
+ TimestampTruncExpr, ToJson, UnboundColumn, Variance,
};
use datafusion_common::scalar::ScalarStructBuilder;
use datafusion_common::{
@@ -611,7 +607,7 @@ impl PhysicalPlanner {
ExprStruct::UnaryMinus(expr) => {
let child: Arc<dyn PhysicalExpr> =
self.create_expr(expr.child.as_ref().unwrap(),
Arc::clone(&input_schema))?;
- let result = negative::create_negate_expr(child,
expr.fail_on_error);
+ let result = create_negate_expr(child, expr.fail_on_error);
result.map_err(|e| ExecutionError::GeneralError(e.to_string()))
}
ExprStruct::NormalizeNanAndZero(expr) => {
@@ -1118,7 +1114,7 @@ impl PhysicalPlanner {
} else {
Arc::clone(&child.native_plan)
};
- let expand = Arc::new(CometExpandExec::new(projections, input,
schema));
+ let expand = Arc::new(ExpandExec::new(projections, input,
schema));
Ok((
scans,
Arc::new(SparkPlan::new(spark_plan.plan_id, expand,
vec![child])),
@@ -2270,7 +2266,7 @@ mod tests {
use datafusion::{physical_plan::common::collect, prelude::SessionContext};
use tokio::sync::mpsc;
- use crate::execution::{datafusion::planner::PhysicalPlanner,
operators::InputBatch};
+ use crate::execution::{operators::InputBatch, planner::PhysicalPlanner};
use crate::execution::operators::ExecutionError;
use datafusion_comet_proto::{
diff --git a/native/core/src/execution/shuffle/mod.rs
b/native/core/src/execution/shuffle/mod.rs
index b052df29..8721ead7 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -18,3 +18,5 @@
mod list;
mod map;
pub mod row;
+mod shuffle_writer;
+pub use shuffle_writer::ShuffleWriterExec;
diff --git a/native/core/src/execution/shuffle/row.rs
b/native/core/src/execution/shuffle/row.rs
index 17b180e9..ce752e68 100644
--- a/native/core/src/execution/shuffle/row.rs
+++ b/native/core/src/execution/shuffle/row.rs
@@ -20,10 +20,10 @@
use crate::{
errors::CometError,
execution::{
- datafusion::shuffle_writer::{write_ipc_compressed, Checksum},
shuffle::{
list::{append_list_element, SparkUnsafeArray},
map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap},
+ shuffle_writer::{write_ipc_compressed, Checksum},
},
utils::bytes_to_i128,
},
diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
similarity index 100%
rename from native/core/src/execution/datafusion/shuffle_writer.rs
rename to native/core/src/execution/shuffle/shuffle_writer.rs
diff --git a/native/core/src/execution/datafusion/spark_plan.rs
b/native/core/src/execution/spark_plan.rs
similarity index 100%
rename from native/core/src/execution/datafusion/spark_plan.rs
rename to native/core/src/execution/spark_plan.rs
diff --git a/native/core/src/execution/datafusion/util/mod.rs
b/native/core/src/execution/util/mod.rs
similarity index 100%
rename from native/core/src/execution/datafusion/util/mod.rs
rename to native/core/src/execution/util/mod.rs
diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs
b/native/core/src/execution/util/spark_bit_array.rs
similarity index 100%
rename from native/core/src/execution/datafusion/util/spark_bit_array.rs
rename to native/core/src/execution/util/spark_bit_array.rs
diff --git a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs
b/native/core/src/execution/util/spark_bloom_filter.rs
similarity index 98%
rename from native/core/src/execution/datafusion/util/spark_bloom_filter.rs
rename to native/core/src/execution/util/spark_bloom_filter.rs
index 35fa23b4..2c3af169 100644
--- a/native/core/src/execution/datafusion/util/spark_bloom_filter.rs
+++ b/native/core/src/execution/util/spark_bloom_filter.rs
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use crate::execution::datafusion::util::spark_bit_array;
-use crate::execution::datafusion::util::spark_bit_array::SparkBitArray;
+use crate::execution::util::spark_bit_array;
+use crate::execution::util::spark_bit_array::SparkBitArray;
use arrow_array::{ArrowNativeTypeOp, BooleanArray, Int64Array};
use arrow_buffer::ToByteSlice;
use datafusion_comet_spark_expr::spark_hash::spark_compatible_murmur3_hash;
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index 65517431..d0bc2fd9 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -29,6 +29,7 @@ edition = { workspace = true }
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
diff --git a/native/core/src/execution/datafusion/expressions/checkoverflow.rs
b/native/spark-expr/src/checkoverflow.rs
similarity index 100%
rename from native/core/src/execution/datafusion/expressions/checkoverflow.rs
rename to native/spark-expr/src/checkoverflow.rs
diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs
index 5dff6e0b..8a574805 100644
--- a/native/spark-expr/src/lib.rs
+++ b/native/spark-expr/src/lib.rs
@@ -29,6 +29,8 @@ mod bitwise_not;
pub use bitwise_not::{bitwise_not, BitwiseNotExpr};
mod avg_decimal;
pub use avg_decimal::AvgDecimal;
+mod checkoverflow;
+pub use checkoverflow::CheckOverflow;
mod correlation;
pub use correlation::Correlation;
mod covariance;
@@ -45,10 +47,14 @@ pub use stddev::Stddev;
mod structs;
mod sum_decimal;
pub use sum_decimal::SumDecimal;
+mod negative;
+pub use negative::{create_negate_expr, NegativeExpr};
mod normalize_nan;
mod temporal;
pub mod timezone;
mod to_json;
+mod unbound;
+pub use unbound::UnboundColumn;
pub mod utils;
pub use normalize_nan::NormalizeNaNAndZero;
@@ -83,3 +89,9 @@ pub enum EvalMode {
/// failing the entire query.
Try,
}
+
+pub(crate) fn arithmetic_overflow_error(from_type: &str) -> SparkError {
+ SparkError::ArithmeticOverflow {
+ from_type: from_type.to_string(),
+ }
+}
diff --git a/native/core/src/execution/datafusion/expressions/negative.rs
b/native/spark-expr/src/negative.rs
similarity index 98%
rename from native/core/src/execution/datafusion/expressions/negative.rs
rename to native/spark-expr/src/negative.rs
index 8dfe7174..3d9063e7 100644
--- a/native/core/src/execution/datafusion/expressions/negative.rs
+++ b/native/spark-expr/src/negative.rs
@@ -16,7 +16,7 @@
// under the License.
use super::arithmetic_overflow_error;
-use crate::errors::CometError;
+use crate::SparkError;
use arrow::{compute::kernels::numeric::neg_wrapping,
datatypes::IntervalDayTimeType};
use arrow_array::RecordBatch;
use arrow_buffer::IntervalDayTime;
@@ -26,8 +26,7 @@ use datafusion::{
logical_expr::{interval_arithmetic::Interval, ColumnarValue},
physical_expr::PhysicalExpr,
};
-use datafusion_comet_spark_expr::SparkError;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::sort_properties::ExprProperties;
use std::{
any::Any,
@@ -38,7 +37,7 @@ use std::{
pub fn create_negate_expr(
expr: Arc<dyn PhysicalExpr>,
fail_on_error: bool,
-) -> Result<Arc<dyn PhysicalExpr>, CometError> {
+) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
Ok(Arc::new(NegativeExpr::new(expr, fail_on_error)))
}
diff --git a/native/core/src/execution/datafusion/expressions/unbound.rs
b/native/spark-expr/src/unbound.rs
similarity index 100%
rename from native/core/src/execution/datafusion/expressions/unbound.rs
rename to native/spark-expr/src/unbound.rs
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]