This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 4d44512946 `PrimitiveGroupsAccumulator` should propagate timestamp
timezone information properly (#7494)
4d44512946 is described below
commit 4d4451294629940d340160cdd06be273139728b4
Author: Chao Sun <[email protected]>
AuthorDate: Thu Sep 7 13:58:22 2023 -0700
`PrimitiveGroupsAccumulator` should propagate timestamp timezone
information properly (#7494)
---
datafusion/physical-expr/src/aggregate/average.rs | 12 ++---
.../src/aggregate/groups_accumulator/prim_op.rs | 8 ++--
datafusion/physical-expr/src/aggregate/min_max.rs | 24 +++++++++-
datafusion/physical-expr/src/aggregate/utils.rs | 24 ----------
datafusion/physical-expr/src/expressions/mod.rs | 54 +++++++++++++++++++++-
5 files changed, 83 insertions(+), 39 deletions(-)
diff --git a/datafusion/physical-expr/src/aggregate/average.rs
b/datafusion/physical-expr/src/aggregate/average.rs
index ccadb2c9b8..92c806f76f 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -41,7 +41,7 @@ use
datafusion_expr::type_coercion::aggregates::avg_return_type;
use datafusion_expr::Accumulator;
use super::groups_accumulator::EmitTo;
-use super::utils::{adjust_output_array, Decimal128Averager};
+use super::utils::Decimal128Averager;
/// AVG aggregate expression
#[derive(Debug, Clone)]
@@ -488,12 +488,10 @@ where
.map(|(sum, count)| (self.avg_fn)(sum, count))
.collect::<Result<Vec<_>>>()?;
PrimitiveArray::new(averages.into(), Some(nulls)) // no copy
+ .with_data_type(self.return_data_type.clone())
};
- // fix up decimal precision and scale for decimals
- let array = adjust_output_array(&self.return_data_type,
Arc::new(array))?;
-
- Ok(array)
+ Ok(Arc::new(array))
}
// return arrays for sums and counts
@@ -505,8 +503,8 @@ where
let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero
copy
let sums = emit_to.take_needed(&mut self.sums);
- let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
- let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;
+ let sums = PrimitiveArray::<T>::new(sums.into(), nulls) // zero copy
+ .with_data_type(self.sum_data_type.clone());
Ok(vec![
Arc::new(counts) as ArrayRef,
diff --git
a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
index adeaea712c..130d562712 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
@@ -22,7 +22,7 @@ use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray};
use arrow_schema::DataType;
use datafusion_common::Result;
-use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator};
+use crate::GroupsAccumulator;
use super::{accumulate::NullState, EmitTo};
@@ -115,9 +115,9 @@ where
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let values = emit_to.take_needed(&mut self.values);
let nulls = self.null_state.build(emit_to);
- let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)); //
no copy
-
- adjust_output_array(&self.data_type, Arc::new(values))
+ let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)) //
no copy
+ .with_data_type(self.data_type.clone());
+ Ok(Arc::new(values))
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs
b/datafusion/physical-expr/src/aggregate/min_max.rs
index 14e515861d..5c4c48b158 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -1075,8 +1075,8 @@ impl Accumulator for SlidingMinAccumulator {
mod tests {
use super::*;
use crate::expressions::col;
- use crate::expressions::tests::aggregate;
- use crate::generic_test_op;
+ use crate::expressions::tests::{aggregate, aggregate_new};
+ use crate::{generic_test_op, generic_test_op_new};
use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
@@ -1494,6 +1494,26 @@ mod tests {
)
}
+ #[test]
+ fn max_new_timestamp_micro() -> Result<()> {
+ let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
+ let actual = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5])
+ .with_data_type(dt.clone());
+ let expected: ArrayRef =
+
Arc::new(TimestampMicrosecondArray::from(vec![5]).with_data_type(dt.clone()));
+ generic_test_op_new!(Arc::new(actual), dt.clone(), Max, &expected)
+ }
+
+ #[test]
+ fn max_new_timestamp_micro_with_tz() -> Result<()> {
+ let dt = DataType::Timestamp(TimeUnit::Microsecond,
Some("UTC".into()));
+ let actual = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5])
+ .with_data_type(dt.clone());
+ let expected: ArrayRef =
+
Arc::new(TimestampMicrosecondArray::from(vec![5]).with_data_type(dt.clone()));
+ generic_test_op_new!(Arc::new(actual), dt.clone(), Max, &expected)
+ }
+
#[test]
fn max_bool() -> Result<()> {
let a: ArrayRef = Arc::new(BooleanArray::from(vec![false, false]));
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs
b/datafusion/physical-expr/src/aggregate/utils.rs
index 3ed7905e29..343fe6dae6 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -20,8 +20,6 @@
use crate::{AggregateExpr, PhysicalSortExpr};
use arrow::array::ArrayRef;
use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION,
MIN_DECIMAL_FOR_EACH_PRECISION};
-use arrow_array::cast::AsArray;
-use arrow_array::types::Decimal128Type;
use arrow_schema::{DataType, Field};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
@@ -111,28 +109,6 @@ impl Decimal128Averager {
}
}
-/// Adjust array type metadata if needed
-///
-/// Since `Decimal128Arrays` created from `Vec<NativeType>` have
-/// default precision and scale, this function adjusts the output to
-/// match `data_type`, if necessary
-pub fn adjust_output_array(
- data_type: &DataType,
- array: ArrayRef,
-) -> Result<ArrayRef, DataFusionError> {
- let array = match data_type {
- DataType::Decimal128(p, s) => Arc::new(
- array
- .as_primitive::<Decimal128Type>()
- .clone()
- .with_precision_and_scale(*p, *s)?,
- ),
- // no adjustment needed for other arrays
- _ => array,
- };
- Ok(array)
-}
-
/// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
/// and return the inner trait object as [`Any`](std::any::Any) so
/// that it can be downcast to a specific implementation.
diff --git a/datafusion/physical-expr/src/expressions/mod.rs
b/datafusion/physical-expr/src/expressions/mod.rs
index bce1240e50..fbc609611e 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -101,7 +101,7 @@ pub use crate::PhysicalSortExpr;
#[cfg(test)]
pub(crate) mod tests {
use crate::expressions::{col, create_aggregate_expr, try_cast};
- use crate::AggregateExpr;
+ use crate::{AggregateExpr, EmitTo};
use arrow::record_batch::RecordBatch;
use arrow_array::ArrayRef;
use arrow_schema::{Field, Schema};
@@ -111,7 +111,8 @@ pub(crate) mod tests {
use datafusion_expr::AggregateFunction;
use std::sync::Arc;
- /// macro to perform an aggregation and verify the result.
+ /// macro to perform an aggregation using [`datafusion_expr::Accumulator`]
and verify the
+ /// result.
#[macro_export]
macro_rules! generic_test_op {
($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr) => {
@@ -136,6 +137,39 @@ pub(crate) mod tests {
}};
}
+ /// macro to perform an aggregation using [`crate::GroupsAccumulator`] and
verify the result.
+ ///
+ /// The difference between this and the above `generic_test_op` is that
the former checks
+ /// the old slow-path [`datafusion_expr::Accumulator`] implementation,
while this checks
+ /// the new [`crate::GroupsAccumulator`] implementation.
+ #[macro_export]
+ macro_rules! generic_test_op_new {
+ ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr) => {
+ generic_test_op_new!(
+ $ARRAY,
+ $DATATYPE,
+ $OP,
+ $EXPECTED,
+ $EXPECTED.data_type().clone()
+ )
+ };
+ ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr,
$EXPECTED_DATATYPE:expr) => {{
+ let schema = Schema::new(vec![Field::new("a", $DATATYPE, true)]);
+
+ let batch = RecordBatch::try_new(Arc::new(schema.clone()),
vec![$ARRAY])?;
+
+ let agg = Arc::new(<$OP>::new(
+ col("a", &schema)?,
+ "bla".to_string(),
+ $EXPECTED_DATATYPE,
+ ));
+ let actual = aggregate_new(&batch, agg)?;
+ assert_eq!($EXPECTED, &actual);
+
+ Ok(()) as Result<(), DataFusionError>
+ }};
+ }
+
/// Assert `function(array) == expected` performing any necessary type
coercion
pub fn assert_aggregate(
array: ArrayRef,
@@ -218,4 +252,20 @@ pub(crate) mod tests {
accum.update_batch(&values)?;
accum.evaluate()
}
+
+ pub fn aggregate_new(
+ batch: &RecordBatch,
+ agg: Arc<dyn AggregateExpr>,
+ ) -> Result<ArrayRef> {
+ let mut accum = agg.create_groups_accumulator()?;
+ let expr = agg.expressions();
+ let values = expr
+ .iter()
+ .map(|e| e.evaluate(batch))
+ .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+ .collect::<Result<Vec<_>>>()?;
+ let indices = vec![0; batch.num_rows()];
+ accum.update_batch(&values, &indices, None, 1)?;
+ accum.evaluate(EmitTo::All)
+ }
}