cloud-fan commented on a change in pull request #24149: [SPARK-27207] : Ensure aggregate buffers are initialized again for So… URL: https://github.com/apache/spark/pull/24149#discussion_r277728419
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala ########## @@ -258,21 +275,29 @@ class SortBasedAggregator( if (hasNextInput || hasNextAggBuffer) { // Find smaller key of the initialAggBufferIterator and initialAggBufferIterator groupingKey = findGroupingKey() - result = new AggregationBufferEntry(groupingKey, makeEmptyAggregationBuffer) + updateResult = new AggregationBufferEntry( + groupingKey, makeEmptyAggregationBufferForSortBasedUpdateAggFunctions) + finalResult = new AggregationBufferEntry( + groupingKey, makeEmptyAggregationBufferForSortBasedMergeAggFunctions) // Firstly, update the aggregation buffer with input rows. while (hasNextInput && groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 0) { - processRow(result.aggregationBuffer, inputIterator.getValue) + processRow(updateResult.aggregationBuffer, inputIterator.getValue) hasNextInput = inputIterator.next() } + // This step ensures that the contents of the updateResult aggregation buffer are + // merged with the finalResult aggregation buffer to maintain consistency + serializeBuffer(updateResult.aggregationBuffer) + mergeAggregationBuffers(finalResult.aggregationBuffer, updateResult.aggregationBuffer) // Secondly, merge the aggregation buffer with existing aggregation buffers. // NOTE: the ordering of these two while-block matter, mergeAggregationBuffer() should // be called after calling processRow. while (hasNextAggBuffer && groupingKeyOrdering.compare(initialAggBufferIterator.getKey, groupingKey) == 0) { Review comment: I think the data flow is 1. start with hash agg, create and initialize an empty buffer for each group key, and consume input records by calling `update` 2. fallback to sort based agg 3. create and initialize an empty buffer for current group key, and consume input records by calling `update`. 4. when all the input records for current group key are consumed up, call `merge` to merge the agg buffer that is generated by hash agg. This data flow is fine most of the time, but is problematic for hive UDAF, because it doesn't support INIT -> UPDATE -> MERGE -> FINISH. That said, this patch may cause perf regression for normal UDAFs. We should only do it for hive UDAF, to switch from `INIT -> UPDATE -> MERGE -> FINISH` to `INIT -> UPDATE -> FINISH, INIT -> MERGE -> FINISH` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org