cloud-fan commented on a change in pull request #24149: [SPARK-27207] : Ensure 
aggregate buffers are initialized again for So…
URL: https://github.com/apache/spark/pull/24149#discussion_r277728419
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 ##########
 @@ -258,21 +275,29 @@ class SortBasedAggregator(
         if (hasNextInput || hasNextAggBuffer) {
           // Find smaller key of the initialAggBufferIterator and 
initialAggBufferIterator
           groupingKey = findGroupingKey()
-          result = new AggregationBufferEntry(groupingKey, 
makeEmptyAggregationBuffer)
+          updateResult = new AggregationBufferEntry(
+            groupingKey, 
makeEmptyAggregationBufferForSortBasedUpdateAggFunctions)
+          finalResult = new AggregationBufferEntry(
+            groupingKey, 
makeEmptyAggregationBufferForSortBasedMergeAggFunctions)
 
           // Firstly, update the aggregation buffer with input rows.
           while (hasNextInput &&
             groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 
0) {
-            processRow(result.aggregationBuffer, inputIterator.getValue)
+            processRow(updateResult.aggregationBuffer, inputIterator.getValue)
             hasNextInput = inputIterator.next()
           }
 
+          // This step ensures that the contents of the updateResult 
aggregation buffer are
+          // merged with the finalResult aggregation buffer to maintain 
consistency
+          serializeBuffer(updateResult.aggregationBuffer)
+          mergeAggregationBuffers(finalResult.aggregationBuffer, 
updateResult.aggregationBuffer)
           // Secondly, merge the aggregation buffer with existing aggregation 
buffers.
           // NOTE: the ordering of these two while-block matter, 
mergeAggregationBuffer() should
           // be called after calling processRow.
           while (hasNextAggBuffer &&
             groupingKeyOrdering.compare(initialAggBufferIterator.getKey, 
groupingKey) == 0) {
 
 Review comment:
   I think the data flow is
   1. start with hash agg, create and initialize an empty buffer for each group 
key, and consume input records by calling `update`
   2. fallback to sort based agg
   3. create and initialize an empty buffer for current group key, and consume 
input records by calling `update`.
   4. when all the input records for current group key are consumed up, call 
`merge` to merge the agg buffer that is generated by hash agg.
   
   This data flow is fine most of the time, but is problematic for hive UDAF, 
because it doesn't support INIT -> UPDATE -> MERGE -> FINISH.
   
   That said, this patch may cause perf regression for normal UDAFs. We should 
only do it for hive UDAF, to switch from `INIT -> UPDATE -> MERGE -> FINISH` to 
`INIT -> UPDATE -> FINISH, INIT -> MERGE -> FINISH`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to