Hi All, I just started looking into DataFusion and am considering using it as the platform for our next gen analytics solution. To get started, I tried to add a few functions such as stddev. While writing the code I noticed some discrepancies (it may also be my unfamiliarity of the code base) in the Accumulator API and the implementation of some functions. The API is defined as the following:
pub trait Accumulator: Send + Sync + Debug { /// Returns the state of the accumulator at the end of the accumulation. // in the case of an average on which we track `sum` and `n`, this function should return a vector // of two values, sum and n. fn state(&self) -> Result<Vec<ScalarValue>>; /// updates the accumulator's state from a vector of scalars. fn update(&mut self, values: &[ScalarValue]) -> Result<()>; /// updates the accumulator's state from a vector of arrays. fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { if values.is_empty() { return Ok(()); }; (0..values[0].len()).try_for_each(|index| { let v = values .iter() .map(|array| ScalarValue::try_from_array(array, index)) .collect::<Result<Vec<_>>>()?; self.update(&v) }) I am only quoting the update and update_batch functions for brevity, same for the merge functions. So here it indicates that the update function takes a *vector* and update_batch takes *vector of array. * When reading code for some actual implementation for example *sum* and *average, *both implementations assume when update is called *only one *value is passed in; and when update_batch is called *only one *array is passed in. impl Accumulator for AvgAccumulator { fn state(&self) -> Result<Vec<ScalarValue>> { Ok(vec![ScalarValue::from(self.count), self.sum.clone()]) } fn update(&mut self, values: &[ScalarValue]) -> Result<()> { let values = &values[0]; self.count += (!values.is_null()) as u64; self.sum = sum::sum(&self.sum, values)?; Ok(()) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; self.count += (values.len() - values.data().null_count()) as u64; self.sum = sum::sum(&self.sum, &sum::sum_batch(values)?)?; Ok(()) impl Accumulator for SumAccumulator { fn state(&self) -> Result<Vec<ScalarValue>> { Ok(vec![self.sum.clone()]) } fn update(&mut self, values: &[ScalarValue]) -> Result<()> { // sum(v1, v2, v3) = v1 + v2 + v3 self.sum = sum(&self.sum, &values[0])?; Ok(()) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = &values[0]; self.sum = sum(&self.sum, &sum_batch(values)?)?; Ok(()) } Could someone shed some light in case I missed anything? Regards, Lin