alamb commented on code in PR #10226:
URL: https://github.com/apache/datafusion/pull/10226#discussion_r1579978178


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -196,6 +184,192 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes 
after taking
+/// all unique values. This may use a lot of memory if the cardinality is high.
+#[derive(Debug)]
+pub struct DistinctMedian {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl DistinctMedian {
+    /// Create a new MEDIAN(DISTINCT) aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for DistinctMedian {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        use arrow_array::types::*;
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(DistinctMedianAccumulator::<$t> {
+                    data_type: $dt.clone(),
+                    distinct_values: Default::default(),
+                }))
+            };
+        }
+        let dt = &self.data_type;
+        downcast_integer! {
+            dt => (helper, dt),
+            DataType::Float16 => helper!(Float16Type, dt),
+            DataType::Float32 => helper!(Float32Type, dt),
+            DataType::Float64 => helper!(Float64Type, dt),
+            DataType::Decimal128(_, _) => helper!(Decimal128Type, dt),
+            DataType::Decimal256(_, _) => helper!(Decimal256Type, dt),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "DistinctMedianAccumulator not supported for {} with {}",
+                self.name(),
+                self.data_type
+            ))),
+        }
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        // Intermediate state is a list of the unique elements we have
+        // collected so far
+        let field = Field::new("item", self.data_type.clone(), true);
+        let data_type = DataType::List(Arc::new(field));
+
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "distinct_median"),
+            data_type,
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for DistinctMedian {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+/// The distinct median accumulator accumulates the raw input values
+/// as `ScalarValue`s
+///
+/// The intermediate state is represented as a List of scalar values updated by
+/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
+/// in the final evaluation step so that we avoid expensive conversions and
+/// allocations during `update_batch`.
+struct DistinctMedianAccumulator<T: ArrowNumericType> {
+    data_type: DataType,
+    distinct_values: HashSet<Hashable<T::Native>>,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for DistinctMedianAccumulator<T> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "DistinctMedianAccumulator({})", self.data_type)
+    }
+}
+
+impl<T: ArrowNumericType> Accumulator for DistinctMedianAccumulator<T> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
+        let all_values = self
+            .distinct_values
+            .iter()
+            .map(|x| ScalarValue::new_primitive::<T>(Some(x.0), 
&self.data_type))
+            .collect::<Result<Vec<_>>>()?;
+
+        let arr = ScalarValue::new_list(&all_values, &self.data_type);
+        Ok(vec![ScalarValue::List(arr)])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        let array = values[0].as_primitive::<T>();
+        match array.nulls().filter(|x| x.null_count() > 0) {

Review Comment:
   Another way to check this I think that might be clearer is 
`array.null_count()` 
https://docs.rs/arrow/latest/arrow/array/trait.Array.html#method.null_count
   
   



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -329,4 +503,147 @@ mod tests {
         ]));
         generic_test_op!(a, DataType::Float64, Median, 
ScalarValue::from(3.5_f64))
     }
+
+    #[test]
+    fn distinct_median_decimal() -> Result<()> {
+        let array: ArrayRef = Arc::new(
+            vec![1, 1, 1, 1, 1, 1, 2, 3, 3]
+                .into_iter()
+                .map(Some)
+                .collect::<Decimal128Array>()
+                .with_precision_and_scale(10, 4)?,
+        );
+
+        generic_test_op!(
+            array,
+            DataType::Decimal128(10, 4),
+            DistinctMedian,
+            ScalarValue::Decimal128(Some(2), 10, 4)
+        )
+    }
+
+    #[test]
+    fn distinct_median_decimal_with_nulls() -> Result<()> {
+        let array: ArrayRef = Arc::new(
+            vec![Some(1), Some(2), None, Some(3), Some(3), Some(3), Some(3)]

Review Comment:
   I recommend adding values in non sorted order in these tests to make sure 
there is nothing related to sorting going on



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -196,6 +184,192 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes 
after taking
+/// all unique values. This may use a lot of memory if the cardinality is high.
+#[derive(Debug)]
+pub struct DistinctMedian {

Review Comment:
   The main difference seems to be the `Accumulator` implementation
   
   What do you think about adding a field on `Median` like `distinct`
   
   ```rust
   pub struct DistinctMedian {
   ...
     distinct: bool
   }
   ```
   
   And then instantiating the correct accumulator in`create_accumulator` ? That 
would add an additional check when creating an accumulator but that seems 
inconsequential compared to the work to actually allocate and compute the 
median 



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -196,6 +184,192 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes 
after taking
+/// all unique values. This may use a lot of memory if the cardinality is high.
+#[derive(Debug)]
+pub struct DistinctMedian {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl DistinctMedian {
+    /// Create a new MEDIAN(DISTINCT) aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for DistinctMedian {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        use arrow_array::types::*;
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(DistinctMedianAccumulator::<$t> {
+                    data_type: $dt.clone(),
+                    distinct_values: Default::default(),
+                }))
+            };
+        }
+        let dt = &self.data_type;
+        downcast_integer! {
+            dt => (helper, dt),
+            DataType::Float16 => helper!(Float16Type, dt),
+            DataType::Float32 => helper!(Float32Type, dt),
+            DataType::Float64 => helper!(Float64Type, dt),
+            DataType::Decimal128(_, _) => helper!(Decimal128Type, dt),
+            DataType::Decimal256(_, _) => helper!(Decimal256Type, dt),
+            _ => Err(DataFusionError::NotImplemented(format!(
+                "DistinctMedianAccumulator not supported for {} with {}",
+                self.name(),
+                self.data_type
+            ))),
+        }
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        // Intermediate state is a list of the unique elements we have
+        // collected so far
+        let field = Field::new("item", self.data_type.clone(), true);
+        let data_type = DataType::List(Arc::new(field));
+
+        Ok(vec![Field::new(
+            format_state_name(&self.name, "distinct_median"),
+            data_type,
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+impl PartialEq<dyn Any> for DistinctMedian {
+    fn eq(&self, other: &dyn Any) -> bool {
+        down_cast_any_ref(other)
+            .downcast_ref::<Self>()
+            .map(|x| {
+                self.name == x.name
+                    && self.data_type == x.data_type
+                    && self.expr.eq(&x.expr)
+            })
+            .unwrap_or(false)
+    }
+}
+
+/// The distinct median accumulator accumulates the raw input values
+/// as `ScalarValue`s
+///
+/// The intermediate state is represented as a List of scalar values updated by
+/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
+/// in the final evaluation step so that we avoid expensive conversions and
+/// allocations during `update_batch`.
+struct DistinctMedianAccumulator<T: ArrowNumericType> {

Review Comment:
   I started playing around with trying to make a generic trait that could 
handle both Vec and HashSet. I couldn't make the types work out and I convinced 
myself it would end up being at least as much code as having the replication 
across accumulators. Thus I think having a copy/paste/modify version of 
`DistinctMedianAccumulator` is fine
   
   ```rust
   
   /// A trait for a container of numeric types that can be compared
   /// A `Vec` is used for Median and `HashSet` for DistinctMedian
   trait MedianValues: Send + Sync + std::fmt::Debug {
       type T: ArrowNativeType;
   
       fn reserve(&mut self, additional: usize);
       fn extend(&mut self, values: impl Iterator<Item = Self::T>);
       fn into_iter(self) -> Box<dyn Iterator<Item = Self::T>>;
       /// Convert the elements of this container into a ListArray
       fn into_list_array(self) -> ListArray;
   }
   
   impl <T:ArrowNativeType> MedianValues for Vec<T> {
       type T = T;
   
       fn reserve(&mut self, additional: usize) {
           todo!()
       }
   
       fn extend(&mut self, values: impl Iterator<Item=Self::T>) {
           todo!()
       }
   
       fn into_iter(self) -> Box<dyn Iterator<Item=Self::T>> {
           todo!()
       }
   
       fn into_list_array(self) -> ListArray {
           todo!()
       }
   }
   
   
   /// The median accumulator accumulates the raw input values
   /// as `ScalarValue`s
   ///
   /// The intermediate state is represented as a List of scalar values updated 
by
   /// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar 
values
   /// in the final evaluation step so that we avoid expensive conversions and
   /// allocations during `update_batch`.
   struct MedianAccumulator<T: ArrowNumericType, V: MedianValues<T = T>>  {
       data_type: DataType,
       all_values: V,
   }
   ```
   
   I couldn't quite make this work -- it errors like this
   
   ```
   error[E0271]: type mismatch resolving `<Vec<i8> as MedianValues>::T == 
Int8Type`
      --> datafusion/physical-expr/src/aggregate/median.rs:76:33
       |
   76  |                       all_values: vec![],
       |                                   ^^^^^^ type mismatch resolving 
`<Vec<i8> as MedianValues>::T == Int8Type`
   ...
   81  | /         downcast_integer! {
   82  | |             dt => (helper, dt),
   83  | |             DataType::Float16 => helper!(Float16Type, dt),
   84  | |             DataType::Float32 => helper!(Float32Type, dt),
   ...   |
   92  | |             ))),
   ```
   
   
   Here is the full diff if anyone wants to play around
   
   <details><summary>Details</summary>
   <p>
   
   ```diff
   diff --git a/datafusion/physical-expr/src/aggregate/median.rs 
b/datafusion/physical-expr/src/aggregate/median.rs
   index 1049187a5..0e9b0b87d 100644
   --- a/datafusion/physical-expr/src/aggregate/median.rs
   +++ b/datafusion/physical-expr/src/aggregate/median.rs
   @@ -23,7 +23,7 @@ use crate::{AggregateExpr, PhysicalExpr};
    use arrow::array::{Array, ArrayRef};
    use arrow::datatypes::{DataType, Field};
    use arrow_array::cast::AsArray;
   -use arrow_array::{downcast_integer, ArrowNativeTypeOp, ArrowNumericType};
   +use arrow_array::{downcast_integer, ArrowNativeTypeOp, ArrowNumericType, 
ListArray};
    use arrow_buffer::ArrowNativeType;
    use datafusion_common::{DataFusionError, Result, ScalarValue};
    use datafusion_expr::Accumulator;
   @@ -71,7 +71,7 @@ impl AggregateExpr for Median {
            use arrow_array::types::*;
            macro_rules! helper {
                ($t:ty, $dt:expr) => {
   -                Ok(Box::new(MedianAccumulator::<$t> {
   +                Ok(Box::new(MedianAccumulator::<$t, Vec<<$t as 
ArrowPrimitiveType>::Native>> {
                        data_type: $dt.clone(),
                        all_values: vec![],
                    }))
   @@ -127,6 +127,39 @@ impl PartialEq<dyn Any> for Median {
        }
    }
   
   +/// A trait for a container of numeric types that can be compared
   +/// A `Vec` is used for Median and `HashSet` for DistinctMedian
   +trait MedianValues: Send + Sync + std::fmt::Debug {
   +    type T: ArrowNativeType;
   +
   +    fn reserve(&mut self, additional: usize);
   +    fn extend(&mut self, values: impl Iterator<Item = Self::T>);
   +    fn into_iter(self) -> Box<dyn Iterator<Item = Self::T>>;
   +    /// Convert the elements of this container into a ListArray
   +    fn into_list_array(self) -> ListArray;
   +}
   +
   +impl <T:ArrowNativeType> MedianValues for Vec<T> {
   +    type T = T;
   +
   +    fn reserve(&mut self, additional: usize) {
   +        todo!()
   +    }
   +
   +    fn extend(&mut self, values: impl Iterator<Item=Self::T>) {
   +        todo!()
   +    }
   +
   +    fn into_iter(self) -> Box<dyn Iterator<Item=Self::T>> {
   +        todo!()
   +    }
   +
   +    fn into_list_array(self) -> ListArray {
   +        todo!()
   +    }
   +}
   +
   +
    /// The median accumulator accumulates the raw input values
    /// as `ScalarValue`s
    ///
   @@ -134,18 +167,18 @@ impl PartialEq<dyn Any> for Median {
    /// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar 
values
    /// in the final evaluation step so that we avoid expensive conversions and
    /// allocations during `update_batch`.
   -struct MedianAccumulator<T: ArrowNumericType> {
   +struct MedianAccumulator<T: ArrowNumericType, V: MedianValues<T = T>>  {
        data_type: DataType,
   -    all_values: Vec<T::Native>,
   +    all_values: V,
    }
   
   -impl<T: ArrowNumericType> std::fmt::Debug for MedianAccumulator<T> {
   +impl<T: ArrowNumericType, V: MedianValues<T = T>> std::fmt::Debug for 
MedianAccumulator<T, V> {
        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
            write!(f, "MedianAccumulator({})", self.data_type)
        }
    }
   
   -impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
   +impl<T: ArrowNumericType, V: MedianValues<T = T>> Accumulator for 
MedianAccumulator<T, V> {
        fn state(&mut self) -> Result<Vec<ScalarValue>> {
            let all_values = self
                .all_values
   ```
   
   </p>
   </details> 
   



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -196,6 +184,192 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
     }
 }
 
+/// MEDIAN(DISTINCT) aggregate expression. Similar to MEDIAN but computes 
after taking
+/// all unique values. This may use a lot of memory if the cardinality is high.
+#[derive(Debug)]
+pub struct DistinctMedian {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl DistinctMedian {
+    /// Create a new MEDIAN(DISTINCT) aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for DistinctMedian {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        use arrow_array::types::*;
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(DistinctMedianAccumulator::<$t> {
+                    data_type: $dt.clone(),
+                    distinct_values: Default::default(),
+                }))
+            };
+        }

Review Comment:
   I think it follows the name used in `Median`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to