Hi All,

I just started looking into DataFusion and am considering using it as the
platform for our next gen analytics solution. To get started, I tried to
add a few functions such as stddev. While writing the code I noticed some
discrepancies (it may also be my unfamiliarity of the code base) in the
Accumulator API and the implementation of some functions. The API is
defined as the following:

pub trait Accumulator: Send + Sync + Debug {
/// Returns the state of the accumulator at the end of the accumulation.
// in the case of an average on which we track `sum` and `n`, this function
should return a vector
// of two values, sum and n.
fn state(&self) -> Result<Vec<ScalarValue>>;

/// updates the accumulator's state from a vector of scalars.
fn update(&mut self, values: &[ScalarValue]) -> Result<()>;

/// updates the accumulator's state from a vector of arrays.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
};
(0..values[0].len()).try_for_each(|index| {
let v = values
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.update(&v)
})
I am only quoting the update and update_batch functions for brevity, same
for the merge functions. So here it indicates that the update function
takes a *vector* and update_batch takes *vector of array. *

When reading code for some actual implementation for example *sum* and
*average,
*both implementations assume when update is called *only one *value is
passed in; and when update_batch is called *only one *array is passed in.

impl Accumulator for AvgAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::from(self.count), self.sum.clone()])
}

fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
let values = &values[0];

self.count += (!values.is_null()) as u64;
self.sum = sum::sum(&self.sum, values)?;

Ok(())
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];

self.count += (values.len() - values.data().null_count()) as u64;
self.sum = sum::sum(&self.sum, &sum::sum_batch(values)?)?;
Ok(())

impl Accumulator for SumAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.sum.clone()])
}

fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
// sum(v1, v2, v3) = v1 + v2 + v3
self.sum = sum(&self.sum, &values[0])?;
Ok(())
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
self.sum = sum(&self.sum, &sum_batch(values)?)?;
Ok(())
}

Could someone shed some light in case I missed anything?

Regards,
Lin

Reply via email to