alamb commented on code in PR #18348:
URL: https://github.com/apache/datafusion/pull/18348#discussion_r2515277593


##########
datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs:
##########


Review Comment:
   Yeah, we definitely don't want to be hashing if we can avoid taht



##########
datafusion/functions-aggregate-common/src/utils.rs:
##########
@@ -167,3 +175,90 @@ impl<T: DecimalType> DecimalAverager<T> {
         }
     }
 }
+
+/// Generic way to collect distinct values for accumulators.
+///
+/// The intermediate state is represented as a List of scalar values updated by
+/// `merge_batch` and a `Vec` of `ArrayRef` that are converted to scalar values
+/// in the final evaluation step so that we avoid expensive conversions and
+/// allocations during `update_batch`.
+pub struct GenericDistinctBuffer<T: ArrowPrimitiveType> {
+    pub values: HashSet<Hashable<T::Native>, RandomState>,
+    data_type: DataType,
+}
+
+impl<T: ArrowPrimitiveType> std::fmt::Debug for GenericDistinctBuffer<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "GenericDistinctBuffer({}, values={})",
+            self.data_type,
+            self.values.len()
+        )
+    }
+}
+
+impl<T: ArrowPrimitiveType> GenericDistinctBuffer<T> {
+    pub fn new(data_type: DataType) -> Self {
+        Self {
+            values: HashSet::default(),
+            data_type,
+        }
+    }
+
+    /// Mirrors [`Accumulator::state`].
+    pub fn state(&self) -> Result<Vec<ScalarValue>> {
+        let arr = Arc::new(
+            PrimitiveArray::<T>::from_iter_values(self.values.iter().map(|v| 
v.0))
+                // Ideally we'd just use T::DATA_TYPE but this misses things 
like
+                // decimal scale/precision and timestamp timezones, which need 
to
+                // match up with Accumulator::state_fields
+                .with_data_type(self.data_type.clone()),
+        );
+        Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
+    }
+
+    /// Mirrors [`Accumulator::update_batch`].
+    pub fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+
+        debug_assert_eq!(
+            values.len(),
+            1,
+            "DistinctValuesBuffer::update_batch expects only a single input 
array"
+        );
+
+        let arr = as_primitive_array::<T>(&values[0])?;
+        if arr.null_count() > 0 {
+            self.values.extend(arr.iter().flatten().map(Hashable));
+        } else {
+            self.values
+                .extend(arr.values().iter().cloned().map(Hashable));

Review Comment:
   nice -- this is an elegant way to special case nulls/non nulls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to