[ https://issues.apache.org/jira/browse/ARROW-9937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated ARROW-9937: ---------------------------------- Labels: pull-request-available (was: ) > [Rust] [DataFusion] Average is not correct > ------------------------------------------ > > Key: ARROW-9937 > URL: https://issues.apache.org/jira/browse/ARROW-9937 > Project: Apache Arrow > Issue Type: Bug > Components: Rust, Rust - DataFusion > Reporter: Jorge > Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The current design of aggregates makes the calculation of the average > incorrect. > It also makes it impossible to compute the [geometric > mean|https://en.wikipedia.org/wiki/Geometric_mean], distinct sum, and other > operations. > The central issue is that Accumulator returns a `ScalarValue` during partial > aggregations via {{get_value}}, but very often a `ScalarValue` is not > sufficient information to perform the full aggregation. > A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are > distributed in batches of 2, > {[x1, x2], [x3, x4], [x5]} > . Our current calculation performs partial means, > {(x1+x2)/2, (x3+x4)/2, x5} > , and then reduces them using another average, i.e. > {{((x1+x2)/2 + (x3+x4)/2 + x5)/3}} > which is not equal to {{(x1 + x2 + x3 + x4 + x5)/5}}. > I believe that our Accumulators need to pass more information from the > partial aggregations to the final aggregation. > We could consider taking an API equivalent to > [spark]([https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html]), > i.e. have an `update`, a `merge` and an `evaluate`. > Code with a failing test ({{src/execution/context.rs}}) > {code:java} > #[test] > fn simple_avg() -> Result<()> { > let schema = Schema::new(vec![ > Field::new("a", DataType::Int32, false), > ]); > let batch1 = RecordBatch::try_new( > Arc::new(schema.clone()), > vec![ > Arc::new(Int32Array::from(vec![1, 2, 3])), > ], > )?; > let batch2 = RecordBatch::try_new( > Arc::new(schema.clone()), > vec![ > Arc::new(Int32Array::from(vec![4, 5])), > ], > )?; > let mut ctx = ExecutionContext::new(); > let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], > vec![batch2]])?; > ctx.register_table("t", Box::new(provider)); > let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?; > let batch = &result[0]; > assert_eq!(1, batch.num_columns()); > assert_eq!(1, batch.num_rows()); > let values = batch > .column(0) > .as_any() > .downcast_ref::<Float64Array>() > .expect("failed to cast version"); > assert_eq!(values.len(), 1); > // avg(1,2,3,4,5) = 3.0 > assert_eq!(values.value(0), 3.0_f64); > Ok(()) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)