Hi,

The accumulator API is designed to accept multiple columns (e.g. the
pearson correlation takes 2 columns, not one). &values[0] corresponds to
the first column passed to the accumulator. All concrete implementations of
accumulators in DataFusion atm only accept one column (Sum, Avg, Count,
Min, Max), but the API is designed to accept with multiple columns.

So, update_batch(&mut self, values: &[ArrayRef]) corresponds to: update the
accumulator from n columns. For sum, this would be 1, for pearson
correlation this would be 2, for e.g. a ML model whose weights are computed
over all columns, this would be the number of input columns N of the model.
For stddev, you should use 1, since stddev is a function of a single
column.

`update(&mut self, values: &[ScalarValue])` corresponds to updating the
state with intermediary states. In a HashAggregate, we reduce each
partition, and use `update` to compute the final value from the
intermediary (scalar) states.

Hope this helps,
Jorge



On Tue, Jan 4, 2022 at 5:55 AM LM <rea...@gmail.com> wrote:

> Hi All,
>
> I just started looking into DataFusion and am considering using it as the
> platform for our next gen analytics solution. To get started, I tried to
> add a few functions such as stddev. While writing the code I noticed some
> discrepancies (it may also be my unfamiliarity of the code base) in the
> Accumulator API and the implementation of some functions. The API is
> defined as the following:
>
> pub trait Accumulator: Send + Sync + Debug {
> /// Returns the state of the accumulator at the end of the accumulation.
> // in the case of an average on which we track `sum` and `n`, this function
> should return a vector
> // of two values, sum and n.
> fn state(&self) -> Result<Vec<ScalarValue>>;
>
> /// updates the accumulator's state from a vector of scalars.
> fn update(&mut self, values: &[ScalarValue]) -> Result<()>;
>
> /// updates the accumulator's state from a vector of arrays.
> fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
> if values.is_empty() {
> return Ok(());
> };
> (0..values[0].len()).try_for_each(|index| {
> let v = values
> .iter()
> .map(|array| ScalarValue::try_from_array(array, index))
> .collect::<Result<Vec<_>>>()?;
> self.update(&v)
> })
> I am only quoting the update and update_batch functions for brevity, same
> for the merge functions. So here it indicates that the update function
> takes a *vector* and update_batch takes *vector of array. *
>
> When reading code for some actual implementation for example *sum* and
> *average,
> *both implementations assume when update is called *only one *value is
> passed in; and when update_batch is called *only one *array is passed in.
>
> impl Accumulator for AvgAccumulator {
> fn state(&self) -> Result<Vec<ScalarValue>> {
> Ok(vec![ScalarValue::from(self.count), self.sum.clone()])
> }
>
> fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
> let values = &values[0];
>
> self.count += (!values.is_null()) as u64;
> self.sum = sum::sum(&self.sum, values)?;
>
> Ok(())
> }
>
> fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
> let values = &values[0];
>
> self.count += (values.len() - values.data().null_count()) as u64;
> self.sum = sum::sum(&self.sum, &sum::sum_batch(values)?)?;
> Ok(())
>
> impl Accumulator for SumAccumulator {
> fn state(&self) -> Result<Vec<ScalarValue>> {
> Ok(vec![self.sum.clone()])
> }
>
> fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
> // sum(v1, v2, v3) = v1 + v2 + v3
> self.sum = sum(&self.sum, &values[0])?;
> Ok(())
> }
>
> fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
> let values = &values[0];
> self.sum = sum(&self.sum, &sum_batch(values)?)?;
> Ok(())
> }
>
> Could someone shed some light in case I missed anything?
>
> Regards,
> Lin
>

Reply via email to