This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d2207664 Replace `unwrap` in `convert_to_ordered_float` and add 
`downcast_value` (#3347)
4d2207664 is described below

commit 4d22076643b3966e18e3d658bc818cca240ccd95
Author: Ian Alexander Joiner <iajoiner...@gmail.com>
AuthorDate: Sat Sep 10 11:54:35 2022 -0400

    Replace `unwrap` in `convert_to_ordered_float` and add `downcast_value` 
(#3347)
    
    * approx agg
    
    * expand downcast_value
    
    * Update datafusion/common/src/lib.rs
    
    * Update datafusion/common/src/lib.rs
    
    Co-authored-by: Andy Grove <andygrov...@gmail.com>
---
 datafusion/common/src/lib.rs                       | 24 +++++++++++++++++++
 .../physical-expr/src/aggregate/approx_distinct.rs | 24 ++++---------------
 .../src/aggregate/approx_percentile_cont.rs        | 28 ++++++++++++----------
 datafusion/physical-expr/src/aggregate/average.rs  |  7 +++---
 datafusion/physical-expr/src/aggregate/count.rs    |  9 +++----
 .../physical-expr/src/aggregate/count_distinct.rs  | 10 +++++---
 .../physical-expr/src/aggregate/covariance.rs      | 17 ++++---------
 7 files changed, 65 insertions(+), 54 deletions(-)

diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 3e1506763..b2ff66af0 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -27,3 +27,27 @@ pub use column::Column;
 pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
 pub use error::{field_not_found, DataFusionError, Result, SchemaError};
 pub use scalar::{ScalarType, ScalarValue};
+
+/// Downcast an Arrow Array to a concrete type, return an `Err` if the cast is
+/// not possible.
+///
+/// Example: `let array = downcast_value!(values, Int32Array)`
+#[macro_export]
+macro_rules! downcast_value {
+    ($Value: expr, $Type: ident) => {{
+        $Value.as_any().downcast_ref::<$Type>().ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "could not cast value to {}",
+                type_name::<$Type>()
+            ))
+        })?
+    }};
+    ($Value: expr, $Type: ident, $T: tt) => {{
+        $Value.as_any().downcast_ref::<$Type<T>>().ok_or_else(|| {
+            DataFusionError::Internal(format!(
+                "could not cast value to {}",
+                type_name::<$Type<T>>()
+            ))
+        })?
+    }};
+}
diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs 
b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
index 5b391ed84..7252ac122 100644
--- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -28,7 +28,7 @@ use arrow::datatypes::{
     ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, 
Int8Type,
     UInt16Type, UInt32Type, UInt64Type, UInt8Type,
 };
-use datafusion_common::ScalarValue;
+use datafusion_common::{downcast_value, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{Accumulator, AggregateState};
 use std::any::type_name;
@@ -219,7 +219,7 @@ macro_rules! default_accumulator_impl {
     () => {
         fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
             assert_eq!(1, states.len(), "expect only 1 element in the states");
-            let binary_array = 
states[0].as_any().downcast_ref::<BinaryArray>().unwrap();
+            let binary_array = downcast_value!(states[0], BinaryArray);
             for v in binary_array.iter() {
                 let v = v.ok_or_else(|| {
                     DataFusionError::Internal(
@@ -243,27 +243,13 @@ macro_rules! default_accumulator_impl {
     };
 }
 
-macro_rules! downcast_value {
-    ($Value: expr, $Type: ident, $T: tt) => {{
-        $Value[0]
-            .as_any()
-            .downcast_ref::<$Type<T>>()
-            .ok_or_else(|| {
-                DataFusionError::Internal(format!(
-                    "could not cast value to {}",
-                    type_name::<$Type<T>>()
-                ))
-            })?
-    }};
-}
-
 impl<T> Accumulator for BinaryHLLAccumulator<T>
 where
     T: OffsetSizeTrait,
 {
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let array: &GenericBinaryArray<T> =
-            downcast_value!(values, GenericBinaryArray, T);
+            downcast_value!(values[0], GenericBinaryArray, T);
         // flatten because we would skip nulls
         self.hll
             .extend(array.into_iter().flatten().map(|v| v.to_vec()));
@@ -279,7 +265,7 @@ where
 {
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let array: &GenericStringArray<T> =
-            downcast_value!(values, GenericStringArray, T);
+            downcast_value!(values[0], GenericStringArray, T);
         // flatten because we would skip nulls
         self.hll
             .extend(array.into_iter().flatten().map(|i| i.to_string()));
@@ -295,7 +281,7 @@ where
     T::Native: Hash,
 {
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let array: &PrimitiveArray<T> = downcast_value!(values, 
PrimitiveArray, T);
+        let array: &PrimitiveArray<T> = downcast_value!(values[0], 
PrimitiveArray, T);
         // flatten because we would skip nulls
         self.hll.extend(array.into_iter().flatten());
         Ok(())
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs 
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index fc5cc920e..b2457887b 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -28,10 +28,14 @@ use arrow::{
 };
 use datafusion_common::DataFusionError;
 use datafusion_common::Result;
-use datafusion_common::ScalarValue;
+use datafusion_common::{downcast_value, ScalarValue};
 use datafusion_expr::{Accumulator, AggregateState};
 use ordered_float::OrderedFloat;
-use std::{any::Any, iter, sync::Arc};
+use std::{
+    any::{type_name, Any},
+    iter,
+    sync::Arc,
+};
 
 /// APPROX_PERCENTILE_CONT aggregate expression
 #[derive(Debug)]
@@ -272,7 +276,7 @@ impl ApproxPercentileAccumulator {
     ) -> Result<Vec<OrderedFloat<f64>>> {
         match values.data_type() {
             DataType::Float64 => {
-                let array = 
values.as_any().downcast_ref::<Float64Array>().unwrap();
+                let array = downcast_value!(values, Float64Array);
                 Ok(array
                     .values()
                     .iter()
@@ -280,7 +284,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::Float32 => {
-                let array = 
values.as_any().downcast_ref::<Float32Array>().unwrap();
+                let array = downcast_value!(values, Float32Array);
                 Ok(array
                     .values()
                     .iter()
@@ -288,7 +292,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::Int64 => {
-                let array = 
values.as_any().downcast_ref::<Int64Array>().unwrap();
+                let array = downcast_value!(values, Int64Array);
                 Ok(array
                     .values()
                     .iter()
@@ -296,7 +300,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::Int32 => {
-                let array = 
values.as_any().downcast_ref::<Int32Array>().unwrap();
+                let array = downcast_value!(values, Int32Array);
                 Ok(array
                     .values()
                     .iter()
@@ -304,7 +308,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::Int16 => {
-                let array = 
values.as_any().downcast_ref::<Int16Array>().unwrap();
+                let array = downcast_value!(values, Int16Array);
                 Ok(array
                     .values()
                     .iter()
@@ -312,7 +316,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::Int8 => {
-                let array = 
values.as_any().downcast_ref::<Int8Array>().unwrap();
+                let array = downcast_value!(values, Int8Array);
                 Ok(array
                     .values()
                     .iter()
@@ -320,7 +324,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::UInt64 => {
-                let array = 
values.as_any().downcast_ref::<UInt64Array>().unwrap();
+                let array = downcast_value!(values, UInt64Array);
                 Ok(array
                     .values()
                     .iter()
@@ -328,7 +332,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::UInt32 => {
-                let array = 
values.as_any().downcast_ref::<UInt32Array>().unwrap();
+                let array = downcast_value!(values, UInt32Array);
                 Ok(array
                     .values()
                     .iter()
@@ -336,7 +340,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::UInt16 => {
-                let array = 
values.as_any().downcast_ref::<UInt16Array>().unwrap();
+                let array = downcast_value!(values, UInt16Array);
                 Ok(array
                     .values()
                     .iter()
@@ -344,7 +348,7 @@ impl ApproxPercentileAccumulator {
                     .collect::<Result<Vec<_>>>()?)
             }
             DataType::UInt8 => {
-                let array = 
values.as_any().downcast_ref::<UInt8Array>().unwrap();
+                let array = downcast_value!(values, UInt8Array);
                 Ok(array
                     .values()
                     .iter()
diff --git a/datafusion/physical-expr/src/aggregate/average.rs 
b/datafusion/physical-expr/src/aggregate/average.rs
index 45e19bb82..623ccbb32 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -17,6 +17,7 @@
 
 //! Defines physical expressions that can evaluated at runtime during query 
execution
 
+use std::any::type_name;
 use std::any::Any;
 use std::convert::TryFrom;
 use std::sync::Arc;
@@ -31,7 +32,7 @@ use arrow::{
     array::{ArrayRef, UInt64Array},
     datatypes::Field,
 };
-use datafusion_common::ScalarValue;
+use datafusion_common::{downcast_value, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{Accumulator, AggregateState};
 use datafusion_row::accessor::RowAccessor;
@@ -169,7 +170,7 @@ impl Accumulator for AvgAccumulator {
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let counts = downcast_value!(states[0], UInt64Array);
         // counts are summed
         self.count += compute::sum(counts).unwrap_or(0);
 
@@ -245,7 +246,7 @@ impl RowAccumulator for AvgRowAccumulator {
         states: &[ArrayRef],
         accessor: &mut RowAccessor,
     ) -> Result<()> {
-        let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap();
+        let counts = downcast_value!(states[0], UInt64Array);
         // count
         let delta = compute::sum(counts).unwrap_or(0);
         accessor.add_u64(self.state_index(), delta);
diff --git a/datafusion/physical-expr/src/aggregate/count.rs 
b/datafusion/physical-expr/src/aggregate/count.rs
index 982c1dc09..78a86a3cd 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -17,6 +17,7 @@
 
 //! Defines physical expressions that can evaluated at runtime during query 
execution
 
+use std::any::type_name;
 use std::any::Any;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -27,8 +28,8 @@ use arrow::array::Int64Array;
 use arrow::compute;
 use arrow::datatypes::DataType;
 use arrow::{array::ArrayRef, datatypes::Field};
-use datafusion_common::Result;
-use datafusion_common::ScalarValue;
+use datafusion_common::{downcast_value, ScalarValue};
+use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{Accumulator, AggregateState};
 use datafusion_row::accessor::RowAccessor;
 
@@ -126,7 +127,7 @@ impl Accumulator for CountAccumulator {
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        let counts = states[0].as_any().downcast_ref::<Int64Array>().unwrap();
+        let counts = downcast_value!(states[0], Int64Array);
         let delta = &compute::sum(counts);
         if let Some(d) = delta {
             self.count += *d;
@@ -173,7 +174,7 @@ impl RowAccumulator for CountRowAccumulator {
         states: &[ArrayRef],
         accessor: &mut RowAccessor,
     ) -> Result<()> {
-        let counts = states[0].as_any().downcast_ref::<Int64Array>().unwrap();
+        let counts = downcast_value!(states[0], Int64Array);
         let delta = &compute::sum(counts);
         if let Some(d) = delta {
             accessor.add_i64(self.state_index, *d);
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index b3ff9dea7..fe1503bdb 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -190,10 +190,14 @@ impl Accumulator for DistinctCountAccumulator {
         let mut cols_vec = cols_out
             .iter_mut()
             .map(|c| match c {
-                ScalarValue::List(Some(ref mut v), _) => v,
-                _ => unreachable!(),
+                ScalarValue::List(Some(ref mut v), _) => Ok(v),
+                t => Err(DataFusionError::Internal(format!(
+                    "cols_out should only consist of ScalarValue::List. {:?} 
is found",
+                    t
+                ))),
             })
-            .collect::<Vec<_>>();
+            .into_iter()
+            .collect::<Result<Vec<_>>>()?;
 
         self.values.iter().for_each(|distinct_values| {
             distinct_values.0.iter().enumerate().for_each(
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs 
b/datafusion/physical-expr/src/aggregate/covariance.rs
index 9cd319127..e54edce14 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -17,6 +17,7 @@
 
 //! Defines physical expressions that can evaluated at runtime during query 
execution
 
+use std::any::type_name;
 use std::any::Any;
 use std::sync::Arc;
 
@@ -28,7 +29,7 @@ use arrow::{
     datatypes::DataType,
     datatypes::Field,
 };
-use datafusion_common::ScalarValue;
+use datafusion_common::{downcast_value, ScalarValue};
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{Accumulator, AggregateState};
 
@@ -250,18 +251,8 @@ impl Accumulator for CovarianceAccumulator {
         let values1 = &cast(&values[0], &DataType::Float64)?;
         let values2 = &cast(&values[1], &DataType::Float64)?;
 
-        let mut arr1 = values1
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap()
-            .iter()
-            .flatten();
-        let mut arr2 = values2
-            .as_any()
-            .downcast_ref::<Float64Array>()
-            .unwrap()
-            .iter()
-            .flatten();
+        let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten();
+        let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten();
 
         for _i in 0..values1.len() {
             let value1 = arr1.next();

Reply via email to