This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push: new 4d2207664 Replace `unwrap` in `convert_to_ordered_float` and add `downcast_value` (#3347) 4d2207664 is described below commit 4d22076643b3966e18e3d658bc818cca240ccd95 Author: Ian Alexander Joiner <iajoiner...@gmail.com> AuthorDate: Sat Sep 10 11:54:35 2022 -0400 Replace `unwrap` in `convert_to_ordered_float` and add `downcast_value` (#3347) * approx agg * expand downcast_value * Update datafusion/common/src/lib.rs * Update datafusion/common/src/lib.rs Co-authored-by: Andy Grove <andygrov...@gmail.com> --- datafusion/common/src/lib.rs | 24 +++++++++++++++++++ .../physical-expr/src/aggregate/approx_distinct.rs | 24 ++++--------------- .../src/aggregate/approx_percentile_cont.rs | 28 ++++++++++++---------- datafusion/physical-expr/src/aggregate/average.rs | 7 +++--- datafusion/physical-expr/src/aggregate/count.rs | 9 +++---- .../physical-expr/src/aggregate/count_distinct.rs | 10 +++++--- .../physical-expr/src/aggregate/covariance.rs | 17 ++++--------- 7 files changed, 65 insertions(+), 54 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 3e1506763..b2ff66af0 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -27,3 +27,27 @@ pub use column::Column; pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema}; pub use error::{field_not_found, DataFusionError, Result, SchemaError}; pub use scalar::{ScalarType, ScalarValue}; + +/// Downcast an Arrow Array to a concrete type, return an `Err` if the cast is +/// not possible. +/// +/// Example: `let array = downcast_value!(values, Int32Array)` +#[macro_export] +macro_rules! downcast_value { + ($Value: expr, $Type: ident) => {{ + $Value.as_any().downcast_ref::<$Type>().ok_or_else(|| { + DataFusionError::Internal(format!( + "could not cast value to {}", + type_name::<$Type>() + )) + })? + }}; + ($Value: expr, $Type: ident, $T: tt) => {{ + $Value.as_any().downcast_ref::<$Type<T>>().ok_or_else(|| { + DataFusionError::Internal(format!( + "could not cast value to {}", + type_name::<$Type<T>>() + )) + })? + }}; +} diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs index 5b391ed84..7252ac122 100644 --- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs @@ -28,7 +28,7 @@ use arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; -use datafusion_common::ScalarValue; +use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, AggregateState}; use std::any::type_name; @@ -219,7 +219,7 @@ macro_rules! default_accumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { assert_eq!(1, states.len(), "expect only 1 element in the states"); - let binary_array = states[0].as_any().downcast_ref::<BinaryArray>().unwrap(); + let binary_array = downcast_value!(states[0], BinaryArray); for v in binary_array.iter() { let v = v.ok_or_else(|| { DataFusionError::Internal( @@ -243,27 +243,13 @@ macro_rules! default_accumulator_impl { }; } -macro_rules! downcast_value { - ($Value: expr, $Type: ident, $T: tt) => {{ - $Value[0] - .as_any() - .downcast_ref::<$Type<T>>() - .ok_or_else(|| { - DataFusionError::Internal(format!( - "could not cast value to {}", - type_name::<$Type<T>>() - )) - })? - }}; -} - impl<T> Accumulator for BinaryHLLAccumulator<T> where T: OffsetSizeTrait, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &GenericBinaryArray<T> = - downcast_value!(values, GenericBinaryArray, T); + downcast_value!(values[0], GenericBinaryArray, T); // flatten because we would skip nulls self.hll .extend(array.into_iter().flatten().map(|v| v.to_vec())); @@ -279,7 +265,7 @@ where { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &GenericStringArray<T> = - downcast_value!(values, GenericStringArray, T); + downcast_value!(values[0], GenericStringArray, T); // flatten because we would skip nulls self.hll .extend(array.into_iter().flatten().map(|i| i.to_string())); @@ -295,7 +281,7 @@ where T::Native: Hash, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &PrimitiveArray<T> = downcast_value!(values, PrimitiveArray, T); + let array: &PrimitiveArray<T> = downcast_value!(values[0], PrimitiveArray, T); // flatten because we would skip nulls self.hll.extend(array.into_iter().flatten()); Ok(()) diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs index fc5cc920e..b2457887b 100644 --- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs +++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs @@ -28,10 +28,14 @@ use arrow::{ }; use datafusion_common::DataFusionError; use datafusion_common::Result; -use datafusion_common::ScalarValue; +use datafusion_common::{downcast_value, ScalarValue}; use datafusion_expr::{Accumulator, AggregateState}; use ordered_float::OrderedFloat; -use std::{any::Any, iter, sync::Arc}; +use std::{ + any::{type_name, Any}, + iter, + sync::Arc, +}; /// APPROX_PERCENTILE_CONT aggregate expression #[derive(Debug)] @@ -272,7 +276,7 @@ impl ApproxPercentileAccumulator { ) -> Result<Vec<OrderedFloat<f64>>> { match values.data_type() { DataType::Float64 => { - let array = values.as_any().downcast_ref::<Float64Array>().unwrap(); + let array = downcast_value!(values, Float64Array); Ok(array .values() .iter() @@ -280,7 +284,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::Float32 => { - let array = values.as_any().downcast_ref::<Float32Array>().unwrap(); + let array = downcast_value!(values, Float32Array); Ok(array .values() .iter() @@ -288,7 +292,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::Int64 => { - let array = values.as_any().downcast_ref::<Int64Array>().unwrap(); + let array = downcast_value!(values, Int64Array); Ok(array .values() .iter() @@ -296,7 +300,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::Int32 => { - let array = values.as_any().downcast_ref::<Int32Array>().unwrap(); + let array = downcast_value!(values, Int32Array); Ok(array .values() .iter() @@ -304,7 +308,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::Int16 => { - let array = values.as_any().downcast_ref::<Int16Array>().unwrap(); + let array = downcast_value!(values, Int16Array); Ok(array .values() .iter() @@ -312,7 +316,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::Int8 => { - let array = values.as_any().downcast_ref::<Int8Array>().unwrap(); + let array = downcast_value!(values, Int8Array); Ok(array .values() .iter() @@ -320,7 +324,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::UInt64 => { - let array = values.as_any().downcast_ref::<UInt64Array>().unwrap(); + let array = downcast_value!(values, UInt64Array); Ok(array .values() .iter() @@ -328,7 +332,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::UInt32 => { - let array = values.as_any().downcast_ref::<UInt32Array>().unwrap(); + let array = downcast_value!(values, UInt32Array); Ok(array .values() .iter() @@ -336,7 +340,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::UInt16 => { - let array = values.as_any().downcast_ref::<UInt16Array>().unwrap(); + let array = downcast_value!(values, UInt16Array); Ok(array .values() .iter() @@ -344,7 +348,7 @@ impl ApproxPercentileAccumulator { .collect::<Result<Vec<_>>>()?) } DataType::UInt8 => { - let array = values.as_any().downcast_ref::<UInt8Array>().unwrap(); + let array = downcast_value!(values, UInt8Array); Ok(array .values() .iter() diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index 45e19bb82..623ccbb32 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -17,6 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution +use std::any::type_name; use std::any::Any; use std::convert::TryFrom; use std::sync::Arc; @@ -31,7 +32,7 @@ use arrow::{ array::{ArrayRef, UInt64Array}, datatypes::Field, }; -use datafusion_common::ScalarValue; +use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, AggregateState}; use datafusion_row::accessor::RowAccessor; @@ -169,7 +170,7 @@ impl Accumulator for AvgAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap(); + let counts = downcast_value!(states[0], UInt64Array); // counts are summed self.count += compute::sum(counts).unwrap_or(0); @@ -245,7 +246,7 @@ impl RowAccumulator for AvgRowAccumulator { states: &[ArrayRef], accessor: &mut RowAccessor, ) -> Result<()> { - let counts = states[0].as_any().downcast_ref::<UInt64Array>().unwrap(); + let counts = downcast_value!(states[0], UInt64Array); // count let delta = compute::sum(counts).unwrap_or(0); accessor.add_u64(self.state_index(), delta); diff --git a/datafusion/physical-expr/src/aggregate/count.rs b/datafusion/physical-expr/src/aggregate/count.rs index 982c1dc09..78a86a3cd 100644 --- a/datafusion/physical-expr/src/aggregate/count.rs +++ b/datafusion/physical-expr/src/aggregate/count.rs @@ -17,6 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution +use std::any::type_name; use std::any::Any; use std::fmt::Debug; use std::sync::Arc; @@ -27,8 +28,8 @@ use arrow::array::Int64Array; use arrow::compute; use arrow::datatypes::DataType; use arrow::{array::ArrayRef, datatypes::Field}; -use datafusion_common::Result; -use datafusion_common::ScalarValue; +use datafusion_common::{downcast_value, ScalarValue}; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, AggregateState}; use datafusion_row::accessor::RowAccessor; @@ -126,7 +127,7 @@ impl Accumulator for CountAccumulator { } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let counts = states[0].as_any().downcast_ref::<Int64Array>().unwrap(); + let counts = downcast_value!(states[0], Int64Array); let delta = &compute::sum(counts); if let Some(d) = delta { self.count += *d; @@ -173,7 +174,7 @@ impl RowAccumulator for CountRowAccumulator { states: &[ArrayRef], accessor: &mut RowAccessor, ) -> Result<()> { - let counts = states[0].as_any().downcast_ref::<Int64Array>().unwrap(); + let counts = downcast_value!(states[0], Int64Array); let delta = &compute::sum(counts); if let Some(d) = delta { accessor.add_i64(self.state_index, *d); diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs index b3ff9dea7..fe1503bdb 100644 --- a/datafusion/physical-expr/src/aggregate/count_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs @@ -190,10 +190,14 @@ impl Accumulator for DistinctCountAccumulator { let mut cols_vec = cols_out .iter_mut() .map(|c| match c { - ScalarValue::List(Some(ref mut v), _) => v, - _ => unreachable!(), + ScalarValue::List(Some(ref mut v), _) => Ok(v), + t => Err(DataFusionError::Internal(format!( + "cols_out should only consist of ScalarValue::List. {:?} is found", + t + ))), }) - .collect::<Vec<_>>(); + .into_iter() + .collect::<Result<Vec<_>>>()?; self.values.iter().for_each(|distinct_values| { distinct_values.0.iter().enumerate().for_each( diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs b/datafusion/physical-expr/src/aggregate/covariance.rs index 9cd319127..e54edce14 100644 --- a/datafusion/physical-expr/src/aggregate/covariance.rs +++ b/datafusion/physical-expr/src/aggregate/covariance.rs @@ -17,6 +17,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution +use std::any::type_name; use std::any::Any; use std::sync::Arc; @@ -28,7 +29,7 @@ use arrow::{ datatypes::DataType, datatypes::Field, }; -use datafusion_common::ScalarValue; +use datafusion_common::{downcast_value, ScalarValue}; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::{Accumulator, AggregateState}; @@ -250,18 +251,8 @@ impl Accumulator for CovarianceAccumulator { let values1 = &cast(&values[0], &DataType::Float64)?; let values2 = &cast(&values[1], &DataType::Float64)?; - let mut arr1 = values1 - .as_any() - .downcast_ref::<Float64Array>() - .unwrap() - .iter() - .flatten(); - let mut arr2 = values2 - .as_any() - .downcast_ref::<Float64Array>() - .unwrap() - .iter() - .flatten(); + let mut arr1 = downcast_value!(values1, Float64Array).iter().flatten(); + let mut arr2 = downcast_value!(values2, Float64Array).iter().flatten(); for _i in 0..values1.len() { let value1 = arr1.next();