korowa commented on code in PR #8721:
URL: https://github.com/apache/arrow-datafusion/pull/8721#discussion_r1441602644


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,6 +262,164 @@ impl Accumulator for DistinctCountAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct NativeDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+    T::Native: Eq + Hash,
+{
+    values: HashSet<T::Native, RandomState>,
+}
+
+impl<T> NativeDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+    T::Native: Eq + Hash,
+{
+    fn new() -> Self {
+        Self {
+            values: HashSet::default(),
+        }
+    }
+}
+
+impl<T> Accumulator for NativeDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send + Debug,
+    T::Native: Eq + Hash,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
+            self.values.iter().cloned(),
+        )) as ArrayRef;
+        let list = Arc::new(array_into_list_array(arr)) as ArrayRef;
+        Ok(vec![ScalarValue::List(list)])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        let arr = as_primitive_array::<T>(&values[0])?;
+        arr.iter().for_each(|value| {
+            if let Some(value) = value {
+                self.values.insert(value);
+            }
+        });
+
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        }
+        assert_eq!(
+            states.len(),
+            1,
+            "count_distinct states must be single array"
+        );
+
+        let arr = as_list_array(&states[0])?;
+        arr.iter().try_for_each(|maybe_list| {
+            if let Some(list) = maybe_list {
+                let list = as_primitive_array::<T>(&list)?;
+                self.values.extend(list.values())
+            };
+            Ok(())
+        })
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+            + std::mem::size_of_val(&self.values)
+            + (std::mem::size_of::<T::Native>() * self.values.capacity())
+    }
+}
+
+#[derive(Debug)]
+struct FloatDistinctCountAccumulator<T>
+where
+    T: ArrowPrimitiveType + Send,
+{
+    values: HashSet<Hashable<T::Native>, RandomState>,

Review Comment:
   I might be wrong, but don't we need smth like 
[specialization](https://rust-lang.github.io/rfcs/1210-impl-specialization.html)
 for this? It would allow to have different implementations of eq + hash for 
`Hashable` depending on generic bounds -- but it doesn't seem to be stable now 
:thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to