m44444 commented on a change in pull request #24149: [SPARK-27207] : Ensure 
aggregate buffers are initialized again for So…
URL: https://github.com/apache/spark/pull/24149#discussion_r277914333
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 ##########
 @@ -258,21 +275,29 @@ class SortBasedAggregator(
         if (hasNextInput || hasNextAggBuffer) {
           // Find smaller key of the initialAggBufferIterator and 
initialAggBufferIterator
           groupingKey = findGroupingKey()
-          result = new AggregationBufferEntry(groupingKey, 
makeEmptyAggregationBuffer)
+          updateResult = new AggregationBufferEntry(
+            groupingKey, 
makeEmptyAggregationBufferForSortBasedUpdateAggFunctions)
+          finalResult = new AggregationBufferEntry(
+            groupingKey, 
makeEmptyAggregationBufferForSortBasedMergeAggFunctions)
 
           // Firstly, update the aggregation buffer with input rows.
           while (hasNextInput &&
             groupingKeyOrdering.compare(inputIterator.getKey, groupingKey) == 
0) {
-            processRow(result.aggregationBuffer, inputIterator.getValue)
+            processRow(updateResult.aggregationBuffer, inputIterator.getValue)
             hasNextInput = inputIterator.next()
           }
 
+          // This step ensures that the contents of the updateResult 
aggregation buffer are
+          // merged with the finalResult aggregation buffer to maintain 
consistency
+          serializeBuffer(updateResult.aggregationBuffer)
+          mergeAggregationBuffers(finalResult.aggregationBuffer, 
updateResult.aggregationBuffer)
           // Secondly, merge the aggregation buffer with existing aggregation 
buffers.
           // NOTE: the ordering of these two while-block matter, 
mergeAggregationBuffer() should
           // be called after calling processRow.
           while (hasNextAggBuffer &&
             groupingKeyOrdering.compare(initialAggBufferIterator.getKey, 
groupingKey) == 0) {
 
 Review comment:
   Hi @cloud-fan, what you said here is really killing! With Spark 2.4.1 by 
turning this conf off I see the DataSketches hll issue is solved: `--conf 
spark.sql.execution.useObjectHashAggregateExec=false`. Which basically disable 
the hash agg attempt. But as you said, this is downgrading the general 
performance (not so bad when people have a sense about how big their data is).
   However my question is, is it possible that it can be automatically done for 
the framework to recognize Hive UDAF and only apply its way to it? e.g. For 
such a query `select year, count(month), hive_udaf_count(month) from ... group 
by year` I want the Spark count() to behave as the way INIT -> UPDATE -> MERGE 
-> FINISH, while the hive_udaf_count() to behave as the way INIT -> UPDATE -> 
FINISH, INIT -> MERGE -> FINISH

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to