Repository: spark Updated Branches: refs/heads/master f8484e49e -> 3c0e9ce94
[SPARK-24901][SQL] Merge the codegen of RegularHashMap and fastHashMap to reduce compiler maxCodesize when VectorizedHashMap is false. ## What changes were proposed in this pull request? Currently, Generate code of update UnsafeRow in hash aggregation. FastHashMap and RegularHashMap are two separate codesï¼These two separate codes need only when VectorizedHashMap is true. but other cases, we can merge together to reduce compiler maxCodesize. thanks. ``` import org.apache.spark.sql.execution.debug._ sparkSession.range(1).selectExpr("id AS key", "id AS value").groupBy("key").sum("value").debugCodegen ``` Generate code like: **Before modified:** ``` Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ ............... /* 420 */ if (agg_fastAggBuffer_0 != null) { /* 421 */ // common sub-expressions /* 422 */ /* 423 */ // evaluate aggregate function /* 424 */ agg_agg_isNull_14_0 = true; /* 425 */ long agg_value_15 = -1L; /* 426 */ do { /* 427 */ boolean agg_isNull_15 = agg_fastAggBuffer_0.isNullAt(0); /* 428 */ long agg_value_16 = agg_isNull_15 ? /* 429 */ -1L : (agg_fastAggBuffer_0.getLong(0)); /* 430 */ if (!agg_isNull_15) { /* 431 */ agg_agg_isNull_14_0 = false; /* 432 */ agg_value_15 = agg_value_16; /* 433 */ continue; /* 434 */ } /* 435 */ /* 436 */ // This comment is added for manually tracking reference of 0, false /* 437 */ /* 438 */ boolean agg_isNull_16 = false; /* 439 */ long agg_value_17 = -1L; /* 440 */ if (!false) { /* 441 */ agg_value_17 = (long) 0; /* 442 */ } /* 443 */ if (!agg_isNull_16) { /* 444 */ agg_agg_isNull_14_0 = false; /* 445 */ agg_value_15 = agg_value_17; /* 446 */ continue; /* 447 */ } /* 448 */ /* 449 */ } while (false); /* 450 */ /* 451 */ long agg_value_14 = -1L; /* 452 */ agg_value_14 = agg_value_15 + agg_expr_1_0; /* 453 */ // update fast row /* 454 */ agg_fastAggBuffer_0.setLong(0, agg_value_14); /* 455 */ } else { /* 456 */ // common sub-expressions /* 457 */ /* 458 */ // evaluate aggregate function /* 459 */ agg_agg_isNull_8_0 = true; /* 460 */ long agg_value_9 = -1L; /* 461 */ do { /* 462 */ boolean agg_isNull_9 = agg_unsafeRowAggBuffer_0.isNullAt(0); /* 463 */ long agg_value_10 = agg_isNull_9 ? /* 464 */ -1L : (agg_unsafeRowAggBuffer_0.getLong(0)); /* 465 */ if (!agg_isNull_9) { /* 466 */ agg_agg_isNull_8_0 = false; /* 467 */ agg_value_9 = agg_value_10; /* 468 */ continue; /* 469 */ } /* 470 */ /* 471 */ // This comment is added for manually tracking reference of 0, false /* 472 */ /* 473 */ boolean agg_isNull_10 = false; /* 474 */ long agg_value_11 = -1L; /* 475 */ if (!false) { /* 476 */ agg_value_11 = (long) 0; /* 477 */ } /* 478 */ if (!agg_isNull_10) { /* 479 */ agg_agg_isNull_8_0 = false; /* 480 */ agg_value_9 = agg_value_11; /* 481 */ continue; /* 482 */ } /* 483 */ /* 484 */ } while (false); /* 485 */ /* 486 */ long agg_value_8 = -1L; /* 487 */ agg_value_8 = agg_value_9 + agg_expr_1_0; /* 488 */ // update unsafe row buffer /* 489 */ agg_unsafeRowAggBuffer_0.setLong(0, agg_value_8); /* 490 */ /* 491 */ } ...................... ``` **After modified:** ``` Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ ............. /* 423 */ // Updates the proper row buffer /* 424 */ UnsafeRow agg_aggBuffer_0 = null; /* 425 */ if (agg_fastAggBuffer_0 != null) { /* 426 */ agg_aggBuffer_0 = agg_fastAggBuffer_0; /* 427 */ } else { /* 428 */ agg_aggBuffer_0 = agg_unsafeRowAggBuffer_0; /* 429 */ } /* 430 */ /* 431 */ // common sub-expressions /* 432 */ /* 433 */ // evaluate aggregate function /* 434 */ agg_agg_isNull_8_0 = true; /* 435 */ long agg_value_9 = -1L; /* 436 */ do { /* 437 */ boolean agg_isNull_9 = agg_aggBuffer_0.isNullAt(0); /* 438 */ long agg_value_10 = agg_isNull_9 ? /* 439 */ -1L : (agg_aggBuffer_0.getLong(0)); /* 440 */ if (!agg_isNull_9) { /* 441 */ agg_agg_isNull_8_0 = false; /* 442 */ agg_value_9 = agg_value_10; /* 443 */ continue; /* 444 */ } /* 445 */ /* 446 */ // This comment is added for manually tracking reference of 0, false /* 447 */ /* 448 */ boolean agg_isNull_10 = false; /* 449 */ long agg_value_11 = -1L; /* 450 */ if (!false) { /* 451 */ agg_value_11 = (long) 0; /* 452 */ } /* 453 */ if (!agg_isNull_10) { /* 454 */ agg_agg_isNull_8_0 = false; /* 455 */ agg_value_9 = agg_value_11; /* 456 */ continue; /* 457 */ } /* 458 */ /* 459 */ } while (false); /* 460 */ /* 461 */ long agg_value_8 = -1L; /* 462 */ agg_value_8 = agg_value_9 + agg_expr_1_0; /* 463 */ // update unsafe row buffer /* 464 */ agg_aggBuffer_0.setLong(0, agg_value_8); ........... ``` ## How was this patch tested? the Existed test cases. Closes #21860 from heary-cao/fastHashMap. Authored-by: caoxuewen <cao.xue...@zte.com.cn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c0e9ce9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c0e9ce9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c0e9ce9 Branch: refs/heads/master Commit: 3c0e9ce944d98859939bbcbf21c610f4b9b224dd Parents: f8484e4 Author: caoxuewen <cao.xue...@zte.com.cn> Authored: Wed Oct 31 18:39:15 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Oct 31 18:39:15 2018 +0800 ---------------------------------------------------------------------- .../execution/aggregate/HashAggregateExec.scala | 66 ++++++++++++-------- 1 file changed, 40 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3c0e9ce9/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 6155ec9..25d8e7d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -844,33 +844,47 @@ case class HashAggregateExec( val updateRowInHashMap: String = { if (isFastHashMapEnabled) { - ctx.INPUT_ROW = fastRowBuffer - val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) - val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) - val effectiveCodes = subExprs.codes.mkString("\n") - val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { - boundUpdateExpr.map(_.genCode(ctx)) - } - val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => - val dt = updateExpr(i).dataType - CodeGenerator.updateColumn( - fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorizedHashMapEnabled) - } + if (isVectorizedHashMapEnabled) { + ctx.INPUT_ROW = fastRowBuffer + val boundUpdateExpr = updateExpr.map(BindReferences.bindReference(_, inputAttr)) + val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundUpdateExpr) + val effectiveCodes = subExprs.codes.mkString("\n") + val fastRowEvals = ctx.withSubExprEliminationExprs(subExprs.states) { + boundUpdateExpr.map(_.genCode(ctx)) + } + val updateFastRow = fastRowEvals.zipWithIndex.map { case (ev, i) => + val dt = updateExpr(i).dataType + CodeGenerator.updateColumn( + fastRowBuffer, dt, i, ev, updateExpr(i).nullable, isVectorized = true) + } - // If fast hash map is on, we first generate code to update row in fast hash map, if the - // previous loop up hit fast hash map. Otherwise, update row in regular hash map. - s""" - |if ($fastRowBuffer != null) { - | // common sub-expressions - | $effectiveCodes - | // evaluate aggregate function - | ${evaluateVariables(fastRowEvals)} - | // update fast row - | ${updateFastRow.mkString("\n").trim} - |} else { - | $updateRowInRegularHashMap - |} - """.stripMargin + // If vectorized fast hash map is on, we first generate code to update row + // in vectorized fast hash map, if the previous loop up hit vectorized fast hash map. + // Otherwise, update row in regular hash map. + s""" + |if ($fastRowBuffer != null) { + | // common sub-expressions + | $effectiveCodes + | // evaluate aggregate function + | ${evaluateVariables(fastRowEvals)} + | // update fast row + | ${updateFastRow.mkString("\n").trim} + |} else { + | $updateRowInRegularHashMap + |} + """.stripMargin + } else { + // If row-based hash map is on and the previous loop up hit fast hash map, + // we reuse regular hash buffer to update row of fast hash map. + // Otherwise, update row in regular hash map. + s""" + |// Updates the proper row buffer + |if ($fastRowBuffer != null) { + | $unsafeRowBuffer = $fastRowBuffer; + |} + |$updateRowInRegularHashMap + """.stripMargin + } } else { updateRowInRegularHashMap } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org