Repository: spark
Updated Branches:
  refs/heads/master f8484e49e -> 3c0e9ce94


[SPARK-24901][SQL] Merge the codegen of RegularHashMap and fastHashMap to 
reduce compiler maxCodesize when VectorizedHashMap is false.

## What changes were proposed in this pull request?

Currently, Generate code of update UnsafeRow in hash aggregation.
FastHashMap and RegularHashMap are two separate codes,These two separate 
codes need only when VectorizedHashMap is true. but other cases, we can merge 
together to reduce compiler maxCodesize. thanks.
```
import org.apache.spark.sql.execution.debug._
sparkSession.range(1).selectExpr("id AS key", "id AS 
value").groupBy("key").sum("value").debugCodegen

```
Generate code like:
 **Before modified:**
```
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
...............
/* 420 */     if (agg_fastAggBuffer_0 != null) {
/* 421 */       // common sub-expressions
/* 422 */
/* 423 */       // evaluate aggregate function
/* 424 */       agg_agg_isNull_14_0 = true;
/* 425 */       long agg_value_15 = -1L;
/* 426 */       do {
/* 427 */         boolean agg_isNull_15 = agg_fastAggBuffer_0.isNullAt(0);
/* 428 */         long agg_value_16 = agg_isNull_15 ?
/* 429 */         -1L : (agg_fastAggBuffer_0.getLong(0));
/* 430 */         if (!agg_isNull_15) {
/* 431 */           agg_agg_isNull_14_0 = false;
/* 432 */           agg_value_15 = agg_value_16;
/* 433 */           continue;
/* 434 */         }
/* 435 */
/* 436 */         // This comment is added for manually tracking reference of 
0, false
/* 437 */
/* 438 */         boolean agg_isNull_16 = false;
/* 439 */         long agg_value_17 = -1L;
/* 440 */         if (!false) {
/* 441 */           agg_value_17 = (long) 0;
/* 442 */         }
/* 443 */         if (!agg_isNull_16) {
/* 444 */           agg_agg_isNull_14_0 = false;
/* 445 */           agg_value_15 = agg_value_17;
/* 446 */           continue;
/* 447 */         }
/* 448 */
/* 449 */       } while (false);
/* 450 */
/* 451 */       long agg_value_14 = -1L;
/* 452 */       agg_value_14 = agg_value_15 + agg_expr_1_0;
/* 453 */       // update fast row
/* 454 */       agg_fastAggBuffer_0.setLong(0, agg_value_14);
/* 455 */     } else {
/* 456 */       // common sub-expressions
/* 457 */
/* 458 */       // evaluate aggregate function
/* 459 */       agg_agg_isNull_8_0 = true;
/* 460 */       long agg_value_9 = -1L;
/* 461 */       do {
/* 462 */         boolean agg_isNull_9 = agg_unsafeRowAggBuffer_0.isNullAt(0);
/* 463 */         long agg_value_10 = agg_isNull_9 ?
/* 464 */         -1L : (agg_unsafeRowAggBuffer_0.getLong(0));
/* 465 */         if (!agg_isNull_9) {
/* 466 */           agg_agg_isNull_8_0 = false;
/* 467 */           agg_value_9 = agg_value_10;
/* 468 */           continue;
/* 469 */         }
/* 470 */
/* 471 */         // This comment is added for manually tracking reference of 
0, false
/* 472 */
/* 473 */         boolean agg_isNull_10 = false;
/* 474 */         long agg_value_11 = -1L;
/* 475 */         if (!false) {
/* 476 */           agg_value_11 = (long) 0;
/* 477 */         }
/* 478 */         if (!agg_isNull_10) {
/* 479 */           agg_agg_isNull_8_0 = false;
/* 480 */           agg_value_9 = agg_value_11;
/* 481 */           continue;
/* 482 */         }
/* 483 */
/* 484 */       } while (false);
/* 485 */
/* 486 */       long agg_value_8 = -1L;
/* 487 */       agg_value_8 = agg_value_9 + agg_expr_1_0;
/* 488 */       // update unsafe row buffer
/* 489 */       agg_unsafeRowAggBuffer_0.setLong(0, agg_value_8);
/* 490 */
/* 491 */     }
......................

```

 **After modified:**
```
Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
.............
/* 423 */     // Updates the proper row buffer
/* 424 */     UnsafeRow agg_aggBuffer_0 = null;
/* 425 */     if (agg_fastAggBuffer_0 != null) {
/* 426 */       agg_aggBuffer_0 = agg_fastAggBuffer_0;
/* 427 */     } else {
/* 428 */       agg_aggBuffer_0 = agg_unsafeRowAggBuffer_0;
/* 429 */     }
/* 430 */
/* 431 */     // common sub-expressions
/* 432 */
/* 433 */     // evaluate aggregate function
/* 434 */     agg_agg_isNull_8_0 = true;
/* 435 */     long agg_value_9 = -1L;
/* 436 */     do {
/* 437 */       boolean agg_isNull_9 = agg_aggBuffer_0.isNullAt(0);
/* 438 */       long agg_value_10 = agg_isNull_9 ?
/* 439 */       -1L : (agg_aggBuffer_0.getLong(0));
/* 440 */       if (!agg_isNull_9) {
/* 441 */         agg_agg_isNull_8_0 = false;
/* 442 */         agg_value_9 = agg_value_10;
/* 443 */         continue;
/* 444 */       }
/* 445 */
/* 446 */       // This comment is added for manually tracking reference of 0, 
false
/* 447 */
/* 448 */       boolean agg_isNull_10 = false;
/* 449 */       long agg_value_11 = -1L;
/* 450 */       if (!false) {
/* 451 */         agg_value_11 = (long) 0;
/* 452 */       }
/* 453 */       if (!agg_isNull_10) {
/* 454 */         agg_agg_isNull_8_0 = false;
/* 455 */         agg_value_9 = agg_value_11;
/* 456 */         continue;
/* 457 */       }
/* 458 */
/* 459 */     } while (false);
/* 460 */
/* 461 */     long agg_value_8 = -1L;
/* 462 */     agg_value_8 = agg_value_9 + agg_expr_1_0;
/* 463 */     // update unsafe row buffer
/* 464 */     agg_aggBuffer_0.setLong(0, agg_value_8);
...........

```
## How was this patch tested?

the Existed test cases.

Closes #21860 from heary-cao/fastHashMap.

Authored-by: caoxuewen <cao.xue...@zte.com.cn>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0e9ce9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0e9ce9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0e9ce9

Branch: refs/heads/master
Commit: 3c0e9ce944d98859939bbcbf21c610f4b9b224dd
Parents: f8484e4
Author: caoxuewen <cao.xue...@zte.com.cn>
Authored: Wed Oct 31 18:39:15 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Oct 31 18:39:15 2018 +0800

----------------------------------------------------------------------
 .../execution/aggregate/HashAggregateExec.scala | 66 ++++++++++++--------
 1 file changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3c0e9ce9/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 6155ec9..25d8e7d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -844,33 +844,47 @@ case class HashAggregateExec(
 
     val updateRowInHashMap: String = {
       if (isFastHashMapEnabled) {
-        ctx.INPUT_ROW = fastRowBuffer
-        val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
-        val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
-        val effectiveCodes = subExprs.codes.mkString("\n")
-        val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
-          boundUpdateExpr.map(_.genCode(ctx))
-        }
-        val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
-          val dt = updateExpr(i).dataType
-          CodeGenerator.updateColumn(
-            fastRowBuffer, dt, i, ev, updateExpr(i).nullable, 
isVectorizedHashMapEnabled)
-        }
+        if (isVectorizedHashMapEnabled) {
+          ctx.INPUT_ROW = fastRowBuffer
+          val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, 
inputAttr))
+          val subExprs = 
ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr)
+          val effectiveCodes = subExprs.codes.mkString("\n")
+          val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) {
+            boundUpdateExpr.map(_.genCode(ctx))
+          }
+          val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) =>
+            val dt = updateExpr(i).dataType
+            CodeGenerator.updateColumn(
+              fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized = 
true)
+          }
 
-        // If fast hash map is on, we first generate code to update row in 
fast hash map, if the
-        // previous loop up hit fast hash map. Otherwise, update row in 
regular hash map.
-        s"""
-           |if ($fastRowBuffer != null) {
-           |  // common sub-expressions
-           |  $effectiveCodes
-           |  // evaluate aggregate function
-           |  ${evaluateVariables(fastRowEvals)}
-           |  // update fast row
-           |  ${updateFastRow.mkString("\n").trim}
-           |} else {
-           |  $updateRowInRegularHashMap
-           |}
-       """.stripMargin
+          // If vectorized fast hash map is on, we first generate code to 
update row
+          // in vectorized fast hash map, if the previous loop up hit 
vectorized fast hash map.
+          // Otherwise, update row in regular hash map.
+          s"""
+             |if ($fastRowBuffer != null) {
+             |  // common sub-expressions
+             |  $effectiveCodes
+             |  // evaluate aggregate function
+             |  ${evaluateVariables(fastRowEvals)}
+             |  // update fast row
+             |  ${updateFastRow.mkString("\n").trim}
+             |} else {
+             |  $updateRowInRegularHashMap
+             |}
+          """.stripMargin
+        } else {
+          // If row-based hash map is on and the previous loop up hit fast 
hash map,
+          // we reuse regular hash buffer to update row of fast hash map.
+          // Otherwise, update row in regular hash map.
+          s"""
+             |// Updates the proper row buffer
+             |if ($fastRowBuffer != null) {
+             |  $unsafeRowBuffer = $fastRowBuffer;
+             |}
+             |$updateRowInRegularHashMap
+          """.stripMargin
+        }
       } else {
         updateRowInRegularHashMap
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to