Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19869#discussion_r154528431 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala --- @@ -672,48 +668,56 @@ case class HashAggregateExec( def outputFromRowBasedMap: String = { s""" - while ($iterTermForFastHashMap.next()) { - $numOutput.add(1); - UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); - UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); - $outputFunc($keyTerm, $bufferTerm); - - if (shouldStop()) return; - } - $fastHashMapTerm.close(); - """ + |while ($iterTermForFastHashMap.next()) { + | UnsafeRow $keyTerm = (UnsafeRow) $iterTermForFastHashMap.getKey(); + | UnsafeRow $bufferTerm = (UnsafeRow) $iterTermForFastHashMap.getValue(); + | $outputFunc($keyTerm, $bufferTerm); + | + | if (shouldStop()) return; + |} + |$fastHashMapTerm.close(); + """.stripMargin } // Iterate over the aggregate rows and convert them from ColumnarRow to UnsafeRow def outputFromVectorizedMap: String = { val row = ctx.freshName("fastHashMapRow") ctx.currentVars = null ctx.INPUT_ROW = row - val generateKeyRow = GenerateUnsafeProjection.createCode(ctx, - groupingKeySchema.toAttributes.zipWithIndex + val generateKeyRow = GenerateUnsafeProjection.createCode(ctx, + groupingKeySchema.toAttributes.zipWithIndex .map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) } - ) - val generateBufferRow = GenerateUnsafeProjection.createCode(ctx, - bufferSchema.toAttributes.zipWithIndex - .map { case (attr, i) => - BoundReference(groupingKeySchema.length + i, attr.dataType, attr.nullable) }) - s""" - | while ($iterTermForFastHashMap.hasNext()) { - | $numOutput.add(1); - | org.apache.spark.sql.execution.vectorized.ColumnarRow $row = - | (org.apache.spark.sql.execution.vectorized.ColumnarRow) - | $iterTermForFastHashMap.next(); - | ${generateKeyRow.code} - | ${generateBufferRow.code} - | $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value}); - | - | if (shouldStop()) return; - | } - | - | $fastHashMapTerm.close(); - """.stripMargin + ) + val generateBufferRow = GenerateUnsafeProjection.createCode(ctx, + bufferSchema.toAttributes.zipWithIndex.map { case (attr, i) => + BoundReference(groupingKeySchema.length + i, attr.dataType, attr.nullable) + }) + val columnarRowCls = classOf[ColumnarRow].getName + s""" + |while ($iterTermForFastHashMap.hasNext()) { + | $columnarRowCls $row = ($columnarRowCls) $iterTermForFastHashMap.next(); + | ${generateKeyRow.code} + | ${generateBufferRow.code} + | $outputFunc(${generateKeyRow.value}, ${generateBufferRow.value}); + | + | if (shouldStop()) return; + |} + | + |$fastHashMapTerm.close(); + """.stripMargin } + def outputFromRegularHashMap: String = { + s""" + |while ($iterTerm.next()) { --- End diff -- moved from https://github.com/apache/spark/pull/19869/files#diff-2eb948516b5beaeb746aadac27fbd5b4L731
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org