This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d44512946 `PrimitiveGroupsAccumulator` should propagate timestamp 
timezone information properly (#7494)
4d44512946 is described below

commit 4d4451294629940d340160cdd06be273139728b4
Author: Chao Sun <[email protected]>
AuthorDate: Thu Sep 7 13:58:22 2023 -0700

    `PrimitiveGroupsAccumulator` should propagate timestamp timezone 
information properly (#7494)
---
 datafusion/physical-expr/src/aggregate/average.rs  | 12 ++---
 .../src/aggregate/groups_accumulator/prim_op.rs    |  8 ++--
 datafusion/physical-expr/src/aggregate/min_max.rs  | 24 +++++++++-
 datafusion/physical-expr/src/aggregate/utils.rs    | 24 ----------
 datafusion/physical-expr/src/expressions/mod.rs    | 54 +++++++++++++++++++++-
 5 files changed, 83 insertions(+), 39 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/average.rs 
b/datafusion/physical-expr/src/aggregate/average.rs
index ccadb2c9b8..92c806f76f 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -41,7 +41,7 @@ use 
datafusion_expr::type_coercion::aggregates::avg_return_type;
 use datafusion_expr::Accumulator;
 
 use super::groups_accumulator::EmitTo;
-use super::utils::{adjust_output_array, Decimal128Averager};
+use super::utils::Decimal128Averager;
 
 /// AVG aggregate expression
 #[derive(Debug, Clone)]
@@ -488,12 +488,10 @@ where
                 .map(|(sum, count)| (self.avg_fn)(sum, count))
                 .collect::<Result<Vec<_>>>()?;
             PrimitiveArray::new(averages.into(), Some(nulls)) // no copy
+                .with_data_type(self.return_data_type.clone())
         };
 
-        // fix up decimal precision and scale for decimals
-        let array = adjust_output_array(&self.return_data_type, 
Arc::new(array))?;
-
-        Ok(array)
+        Ok(Arc::new(array))
     }
 
     // return arrays for sums and counts
@@ -505,8 +503,8 @@ where
         let counts = UInt64Array::new(counts.into(), nulls.clone()); // zero 
copy
 
         let sums = emit_to.take_needed(&mut self.sums);
-        let sums = PrimitiveArray::<T>::new(sums.into(), nulls); // zero copy
-        let sums = adjust_output_array(&self.sum_data_type, Arc::new(sums))?;
+        let sums = PrimitiveArray::<T>::new(sums.into(), nulls) // zero copy
+            .with_data_type(self.sum_data_type.clone());
 
         Ok(vec![
             Arc::new(counts) as ArrayRef,
diff --git 
a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs 
b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
index adeaea712c..130d562712 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/prim_op.rs
@@ -22,7 +22,7 @@ use arrow_array::{ArrayRef, BooleanArray, PrimitiveArray};
 use arrow_schema::DataType;
 use datafusion_common::Result;
 
-use crate::{aggregate::utils::adjust_output_array, GroupsAccumulator};
+use crate::GroupsAccumulator;
 
 use super::{accumulate::NullState, EmitTo};
 
@@ -115,9 +115,9 @@ where
     fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
         let values = emit_to.take_needed(&mut self.values);
         let nulls = self.null_state.build(emit_to);
-        let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)); // 
no copy
-
-        adjust_output_array(&self.data_type, Arc::new(values))
+        let values = PrimitiveArray::<T>::new(values.into(), Some(nulls)) // 
no copy
+            .with_data_type(self.data_type.clone());
+        Ok(Arc::new(values))
     }
 
     fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs 
b/datafusion/physical-expr/src/aggregate/min_max.rs
index 14e515861d..5c4c48b158 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -1075,8 +1075,8 @@ impl Accumulator for SlidingMinAccumulator {
 mod tests {
     use super::*;
     use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use crate::generic_test_op;
+    use crate::expressions::tests::{aggregate, aggregate_new};
+    use crate::{generic_test_op, generic_test_op_new};
     use arrow::datatypes::*;
     use arrow::record_batch::RecordBatch;
     use datafusion_common::Result;
@@ -1494,6 +1494,26 @@ mod tests {
         )
     }
 
+    #[test]
+    fn max_new_timestamp_micro() -> Result<()> {
+        let dt = DataType::Timestamp(TimeUnit::Microsecond, None);
+        let actual = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5])
+            .with_data_type(dt.clone());
+        let expected: ArrayRef =
+            
Arc::new(TimestampMicrosecondArray::from(vec![5]).with_data_type(dt.clone()));
+        generic_test_op_new!(Arc::new(actual), dt.clone(), Max, &expected)
+    }
+
+    #[test]
+    fn max_new_timestamp_micro_with_tz() -> Result<()> {
+        let dt = DataType::Timestamp(TimeUnit::Microsecond, 
Some("UTC".into()));
+        let actual = TimestampMicrosecondArray::from(vec![1, 2, 3, 4, 5])
+            .with_data_type(dt.clone());
+        let expected: ArrayRef =
+            
Arc::new(TimestampMicrosecondArray::from(vec![5]).with_data_type(dt.clone()));
+        generic_test_op_new!(Arc::new(actual), dt.clone(), Max, &expected)
+    }
+
     #[test]
     fn max_bool() -> Result<()> {
         let a: ArrayRef = Arc::new(BooleanArray::from(vec![false, false]));
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs 
b/datafusion/physical-expr/src/aggregate/utils.rs
index 3ed7905e29..343fe6dae6 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -20,8 +20,6 @@
 use crate::{AggregateExpr, PhysicalSortExpr};
 use arrow::array::ArrayRef;
 use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION, 
MIN_DECIMAL_FOR_EACH_PRECISION};
-use arrow_array::cast::AsArray;
-use arrow_array::types::Decimal128Type;
 use arrow_schema::{DataType, Field};
 use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_expr::Accumulator;
@@ -111,28 +109,6 @@ impl Decimal128Averager {
     }
 }
 
-/// Adjust array type metadata if needed
-///
-/// Since `Decimal128Arrays` created from `Vec<NativeType>` have
-/// default precision and scale, this function adjusts the output to
-/// match `data_type`, if necessary
-pub fn adjust_output_array(
-    data_type: &DataType,
-    array: ArrayRef,
-) -> Result<ArrayRef, DataFusionError> {
-    let array = match data_type {
-        DataType::Decimal128(p, s) => Arc::new(
-            array
-                .as_primitive::<Decimal128Type>()
-                .clone()
-                .with_precision_and_scale(*p, *s)?,
-        ),
-        // no adjustment needed for other arrays
-        _ => array,
-    };
-    Ok(array)
-}
-
 /// Downcast a `Box<dyn AggregateExpr>` or `Arc<dyn AggregateExpr>`
 /// and return the inner trait object as [`Any`](std::any::Any) so
 /// that it can be downcast to a specific implementation.
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index bce1240e50..fbc609611e 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -101,7 +101,7 @@ pub use crate::PhysicalSortExpr;
 #[cfg(test)]
 pub(crate) mod tests {
     use crate::expressions::{col, create_aggregate_expr, try_cast};
-    use crate::AggregateExpr;
+    use crate::{AggregateExpr, EmitTo};
     use arrow::record_batch::RecordBatch;
     use arrow_array::ArrayRef;
     use arrow_schema::{Field, Schema};
@@ -111,7 +111,8 @@ pub(crate) mod tests {
     use datafusion_expr::AggregateFunction;
     use std::sync::Arc;
 
-    /// macro to perform an aggregation and verify the result.
+    /// macro to perform an aggregation using [`datafusion_expr::Accumulator`] 
and verify the
+    /// result.
     #[macro_export]
     macro_rules! generic_test_op {
         ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr) => {
@@ -136,6 +137,39 @@ pub(crate) mod tests {
         }};
     }
 
+    /// macro to perform an aggregation using [`crate::GroupsAccumulator`] and 
verify the result.
+    ///
+    /// The difference between this and the above `generic_test_op` is that 
the former checks
+    /// the old slow-path [`datafusion_expr::Accumulator`] implementation, 
while this checks
+    /// the new [`crate::GroupsAccumulator`] implementation.
+    #[macro_export]
+    macro_rules! generic_test_op_new {
+        ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr) => {
+            generic_test_op_new!(
+                $ARRAY,
+                $DATATYPE,
+                $OP,
+                $EXPECTED,
+                $EXPECTED.data_type().clone()
+            )
+        };
+        ($ARRAY:expr, $DATATYPE:expr, $OP:ident, $EXPECTED:expr, 
$EXPECTED_DATATYPE:expr) => {{
+            let schema = Schema::new(vec![Field::new("a", $DATATYPE, true)]);
+
+            let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![$ARRAY])?;
+
+            let agg = Arc::new(<$OP>::new(
+                col("a", &schema)?,
+                "bla".to_string(),
+                $EXPECTED_DATATYPE,
+            ));
+            let actual = aggregate_new(&batch, agg)?;
+            assert_eq!($EXPECTED, &actual);
+
+            Ok(()) as Result<(), DataFusionError>
+        }};
+    }
+
     /// Assert `function(array) == expected` performing any necessary type 
coercion
     pub fn assert_aggregate(
         array: ArrayRef,
@@ -218,4 +252,20 @@ pub(crate) mod tests {
         accum.update_batch(&values)?;
         accum.evaluate()
     }
+
+    pub fn aggregate_new(
+        batch: &RecordBatch,
+        agg: Arc<dyn AggregateExpr>,
+    ) -> Result<ArrayRef> {
+        let mut accum = agg.create_groups_accumulator()?;
+        let expr = agg.expressions();
+        let values = expr
+            .iter()
+            .map(|e| e.evaluate(batch))
+            .map(|r| r.map(|v| v.into_array(batch.num_rows())))
+            .collect::<Result<Vec<_>>>()?;
+        let indices = vec![0; batch.num_rows()];
+        accum.update_batch(&values, &indices, None, 1)?;
+        accum.evaluate(EmitTo::All)
+    }
 }

Reply via email to