[ 
https://issues.apache.org/jira/browse/ARROW-9937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Lamb updated ARROW-9937:
-------------------------------
    Description: 
The current design of aggregates makes the calculation of the average incorrect.

Namely, if there are multiple input partitions, the result is average of the 
averages. For example if the input it in two batches {{[1,2]}}, and 
{{[3,4,5]}}, datafusion will say "average=3.25" rather than "average=3".

 It also makes it impossible to compute the [geometric 
mean|https://en.wikipedia.org/wiki/Geometric_mean], distinct sum, and other 
operations.

The central issue is that Accumulator returns a `ScalarValue` during partial 
aggregations via {{get_value}}, but very often a `ScalarValue` is not 
sufficient information to perform the full aggregation.

A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are 
distributed in batches of 2,

{[x1, x2], [x3, x4], [x5]}

. Our current calculation performs partial means,

{(x1+x2)/2, (x3+x4)/2, x5}

, and then reduces them using another average, i.e.

{{((x1+x2)/2 + (x3+x4)/2 + x5)/3}}

which is not equal to {{(x1 + x2 + x3 + x4 + x5)/5}}.

I believe that our Accumulators need to pass more information from the partial 
aggregations to the final aggregation.

We could consider taking an API equivalent to 
[spark]([https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html]), 
i.e. have an `update`, a `merge` and an `evaluate`.

Code with a failing test ({{src/execution/context.rs}})
{code:java}
    #[test]
    fn simple_avg() -> Result<()> {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
        ]);

        let batch1 = RecordBatch::try_new(
            Arc::new(schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![1, 2, 3])),
            ],
        )?;
        let batch2 = RecordBatch::try_new(
            Arc::new(schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![4, 5])),
            ],
        )?;

        let mut ctx = ExecutionContext::new();

        let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], 
vec![batch2]])?;
        ctx.register_table("t", Box::new(provider));

        let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;

        let batch = &result[0];
        assert_eq!(1, batch.num_columns());
        assert_eq!(1, batch.num_rows());

        let values = batch
            .column(0)
            .as_any()
            .downcast_ref::<Float64Array>()
            .expect("failed to cast version");
        assert_eq!(values.len(), 1);
        // avg(1,2,3,4,5) = 3.0
        assert_eq!(values.value(0), 3.0_f64);
        Ok(())
    }
{code}

  was:
The current design of aggregates makes the calculation of the average incorrect.
 It also makes it impossible to compute the [geometric 
mean|https://en.wikipedia.org/wiki/Geometric_mean], distinct sum, and other 
operations.

The central issue is that Accumulator returns a `ScalarValue` during partial 
aggregations via {{get_value}}, but very often a `ScalarValue` is not 
sufficient information to perform the full aggregation.

A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are 
distributed in batches of 2,

{[x1, x2], [x3, x4], [x5]}

. Our current calculation performs partial means,

{(x1+x2)/2, (x3+x4)/2, x5}

, and then reduces them using another average, i.e.

{{((x1+x2)/2 + (x3+x4)/2 + x5)/3}}

which is not equal to {{(x1 + x2 + x3 + x4 + x5)/5}}.

I believe that our Accumulators need to pass more information from the partial 
aggregations to the final aggregation.

We could consider taking an API equivalent to 
[spark]([https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html]), 
i.e. have an `update`, a `merge` and an `evaluate`.

Code with a failing test ({{src/execution/context.rs}})
{code:java}
    #[test]
    fn simple_avg() -> Result<()> {
        let schema = Schema::new(vec![
            Field::new("a", DataType::Int32, false),
        ]);

        let batch1 = RecordBatch::try_new(
            Arc::new(schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![1, 2, 3])),
            ],
        )?;
        let batch2 = RecordBatch::try_new(
            Arc::new(schema.clone()),
            vec![
                Arc::new(Int32Array::from(vec![4, 5])),
            ],
        )?;

        let mut ctx = ExecutionContext::new();

        let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], 
vec![batch2]])?;
        ctx.register_table("t", Box::new(provider));

        let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;

        let batch = &result[0];
        assert_eq!(1, batch.num_columns());
        assert_eq!(1, batch.num_rows());

        let values = batch
            .column(0)
            .as_any()
            .downcast_ref::<Float64Array>()
            .expect("failed to cast version");
        assert_eq!(values.len(), 1);
        // avg(1,2,3,4,5) = 3.0
        assert_eq!(values.value(0), 3.0_f64);
        Ok(())
    }
{code}


> [Rust] [DataFusion] Average is not correct
> ------------------------------------------
>
>                 Key: ARROW-9937
>                 URL: https://issues.apache.org/jira/browse/ARROW-9937
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust, Rust - DataFusion
>            Reporter: Jorge
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> The current design of aggregates makes the calculation of the average 
> incorrect.
> Namely, if there are multiple input partitions, the result is average of the 
> averages. For example if the input it in two batches {{[1,2]}}, and 
> {{[3,4,5]}}, datafusion will say "average=3.25" rather than "average=3".
>  It also makes it impossible to compute the [geometric 
> mean|https://en.wikipedia.org/wiki/Geometric_mean], distinct sum, and other 
> operations.
> The central issue is that Accumulator returns a `ScalarValue` during partial 
> aggregations via {{get_value}}, but very often a `ScalarValue` is not 
> sufficient information to perform the full aggregation.
> A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are 
> distributed in batches of 2,
> {[x1, x2], [x3, x4], [x5]}
> . Our current calculation performs partial means,
> {(x1+x2)/2, (x3+x4)/2, x5}
> , and then reduces them using another average, i.e.
> {{((x1+x2)/2 + (x3+x4)/2 + x5)/3}}
> which is not equal to {{(x1 + x2 + x3 + x4 + x5)/5}}.
> I believe that our Accumulators need to pass more information from the 
> partial aggregations to the final aggregation.
> We could consider taking an API equivalent to 
> [spark]([https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html]),
>  i.e. have an `update`, a `merge` and an `evaluate`.
> Code with a failing test ({{src/execution/context.rs}})
> {code:java}
>     #[test]
>     fn simple_avg() -> Result<()> {
>         let schema = Schema::new(vec![
>             Field::new("a", DataType::Int32, false),
>         ]);
>         let batch1 = RecordBatch::try_new(
>             Arc::new(schema.clone()),
>             vec![
>                 Arc::new(Int32Array::from(vec![1, 2, 3])),
>             ],
>         )?;
>         let batch2 = RecordBatch::try_new(
>             Arc::new(schema.clone()),
>             vec![
>                 Arc::new(Int32Array::from(vec![4, 5])),
>             ],
>         )?;
>         let mut ctx = ExecutionContext::new();
>         let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], 
> vec![batch2]])?;
>         ctx.register_table("t", Box::new(provider));
>         let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;
>         let batch = &result[0];
>         assert_eq!(1, batch.num_columns());
>         assert_eq!(1, batch.num_rows());
>         let values = batch
>             .column(0)
>             .as_any()
>             .downcast_ref::<Float64Array>()
>             .expect("failed to cast version");
>         assert_eq!(values.len(), 1);
>         // avg(1,2,3,4,5) = 3.0
>         assert_eq!(values.value(0), 3.0_f64);
>         Ok(())
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to