16pierre opened a new issue, #20724:
URL: https://github.com/apache/datafusion/issues/20724

   ### Describe the bug
   
   We (CC @rgehan) encountered a rare data regression in production during 
spillable aggregations, where `GROUP BY some_key` would return duplicate 
entries for `some_key` .
   
   We reproduced in both Datafusion 50 and 52, and we believe we’ve found the 
root-cause and the fix.
   
   Our understanding of the bug is the following:
   
   when the `Final` aggregation spills and is done iterating over the `Partial` 
aggregation, it switches to a sorted streaming aggregation. This is implemented 
by resetting its input to point to the sorted spill stream, and by mutating 
some its internal state, notably 
   `self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());`
   
   
https://github.com/apache/datafusion/blob/0aab6a3d2f67deb8eafe475b20942c6f2b40e886/datafusion/physical-plan/src/aggregates/row_hash.rs#L1220-L1222
   
   This ordering then unlocks streaming the output here:
   
   
https://github.com/apache/datafusion/blob/0aab6a3d2f67deb8eafe475b20942c6f2b40e886/datafusion/physical-plan/src/aggregates/row_hash.rs#L754-L762
   
   What’s problematic is that at this point, the 
`GroupedHashAggregateStream#group_values` state doesn’t get mutated: in our 
case it’s still a `GroupValuesColumn<Streaming = false>` , which uses 
`vectorized_intern` under the hood and can lead to correctness bugs in the case 
of streaming aggregations, as documented here:
   
   
https://github.com/apache/datafusion/blob/65a6bc423142678528cdde03e4e2823952758328/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/mod.rs#L321
   
   This bug correlates quite well with the data correctness symptoms we 
experienced in production, aka, duplicated grouping keys output by the 
aggregation.
   
   We fixed the correctness problem by resetting  `group_values` to the 
streaming implementation when  `group_ordering` get mutated :
   
   ```
   self.group_ordering = GroupOrdering::Full(GroupOrderingFull::new());
   self.group_values = new_group_values(group_schema, &self.group_ordering)?;
   ```
   
   ### To Reproduce
   
   We have a reproduction based on production data which isn’t really 
comfortable to share as is / takes a while to run / is a bit complex.
   
   We’ve tried to simplify the reproduction without much success so far, we’d 
be happy to get assistance of maintainers to build a synthetic repro, which 
probably involves hash collisions here.
   
   ### Expected behavior
   
   No duplicate groups after an aggregation.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to