Repository: spark Updated Branches: refs/heads/master 74267beb3 -> d24801ad2
[SPARK-13636] [SQL] Directly consume UnsafeRow in wholestage codegen plans JIRA: https://issues.apache.org/jira/browse/SPARK-13636 ## What changes were proposed in this pull request? As shown in the wholestage codegen verion of Sort operator, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables. We should avoid the unnecessary unpack and pack variables from UnsafeRows. ## How was this patch tested? All existing wholestage codegen tests should be passed. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #11484 from viirya/direct-consume-unsaferow. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d24801ad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d24801ad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d24801ad Branch: refs/heads/master Commit: d24801ad285ac3f2282fe20d1250a010673e2f96 Parents: 74267be Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Thu Mar 10 10:04:56 2016 -0800 Committer: Davies Liu <davies....@gmail.com> Committed: Thu Mar 10 10:04:56 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/Expand.scala | 2 +- .../org/apache/spark/sql/execution/Sort.scala | 28 ++++++++++++-------- .../spark/sql/execution/WholeStageCodegen.scala | 24 +++++++++++++---- .../execution/aggregate/TungstenAggregate.scala | 2 +- .../spark/sql/execution/basicOperators.scala | 4 +-- .../spark/sql/execution/debug/package.scala | 2 +- .../sql/execution/joins/BroadcastHashJoin.scala | 2 +- .../org/apache/spark/sql/execution/limit.scala | 2 +- 8 files changed, 43 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index 524285b..a84e180 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -93,7 +93,7 @@ case class Expand( child.asInstanceOf[CodegenSupport].produce(ctx, this) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { /* * When the projections list looks like: * expr1A, exprB, expr1C http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala index 2ea889e..5a67cd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala @@ -105,6 +105,8 @@ case class Sort( // Name of sorter variable used in codegen. private var sorterVariable: String = _ + override def preferUnsafeRow: Boolean = true + override protected def doProduce(ctx: CodegenContext): String = { val needToSort = ctx.freshName("needToSort") ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") @@ -153,18 +155,22 @@ case class Sort( """.stripMargin.trim } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val colExprs = child.output.zipWithIndex.map { case (attr, i) => - BoundReference(i, attr.dataType, attr.nullable) - } + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { + if (row != null) { + s"$sorterVariable.insertRow((UnsafeRow)$row);" + } else { + val colExprs = child.output.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) + } - ctx.currentVars = input - val code = GenerateUnsafeProjection.createCode(ctx, colExprs) + ctx.currentVars = input + val code = GenerateUnsafeProjection.createCode(ctx, colExprs) - s""" - | // Convert the input attributes to an UnsafeRow and add it to the sorter - | ${code.code} - | $sorterVariable.insertRow(${code.value}); - """.stripMargin.trim + s""" + | // Convert the input attributes to an UnsafeRow and add it to the sorter + | ${code.code} + | $sorterVariable.insertRow(${code.value}); + """.stripMargin.trim + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index dd831e6..e8e42d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -65,7 +65,12 @@ trait CodegenSupport extends SparkPlan { /** * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. */ - private var parent: CodegenSupport = null + protected var parent: CodegenSupport = null + + /** + * Whether this SparkPlan prefers to accept UnsafeRow as input in doConsume. + */ + def preferUnsafeRow: Boolean = false /** * Returns all the RDDs of InternalRow which generates the input rows. @@ -176,11 +181,20 @@ trait CodegenSupport extends SparkPlan { } else { input } + + val evaluated = + if (row != null && preferUnsafeRow) { + // Current plan can consume UnsafeRows directly. + "" + } else { + evaluateRequiredVariables(child.output, inputVars, usedInputs) + } + s""" | |/*** CONSUME: ${toCommentSafeString(this.simpleString)} */ - |${evaluateRequiredVariables(child.output, inputVars, usedInputs)} - |${doConsume(ctx, inputVars)} + |${evaluated} + |${doConsume(ctx, inputVars, row)} """.stripMargin } @@ -195,7 +209,7 @@ trait CodegenSupport extends SparkPlan { * if (isNull1 || !value2) continue; * # call consume(), which will call parent.doConsume() */ - protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { throw new UnsupportedOperationException } } @@ -238,7 +252,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport s""" | while (!shouldStop() && $input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); - | ${consume(ctx, columns).trim} + | ${consume(ctx, columns, row).trim} | } """.stripMargin } http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index f856634..1c4d594 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -139,7 +139,7 @@ case class TungstenAggregate( } } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { if (groupingExpressions.isEmpty) { doConsumeWithoutKeys(ctx, input) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4901298..6ebbc8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -49,7 +49,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) references.filter(a => usedMoreThanOnce.contains(a.exprId)) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { val exprs = projectList.map(x => ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) ctx.currentVars = input @@ -88,7 +88,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit child.asInstanceOf[CodegenSupport].produce(ctx, this) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { val numOutput = metricTerm(ctx, "numOutputRows") val expr = ExpressionCanonicalizer.execute( BindReferences.bindReference(condition, child.output)) http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index fed88b8..034bf15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -136,7 +136,7 @@ package object debug { child.asInstanceOf[CodegenSupport].produce(ctx, this) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { consume(ctx, input) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index c52662a..4c8f808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -107,7 +107,7 @@ case class BroadcastHashJoin( streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { if (joinType == Inner) { codegenInner(ctx, input) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 5a7516b..ca624a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -65,7 +65,7 @@ trait BaseLimit extends UnaryNode with CodegenSupport { child.asInstanceOf[CodegenSupport].produce(ctx, this) } - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = { val stopEarly = ctx.freshName("stopEarly") ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org