Repository: spark Updated Branches: refs/heads/master 62b1c07e7 -> 2228f074e
[SPARK-13293][SQL] generate Expand Expand suffer from create the UnsafeRow from same input multiple times, with codegen, it only need to copy some of the columns. After this, we can see 3X improvements (from 43 seconds to 13 seconds) on a TPCDS query (Q67) that have eight columns in Rollup. Ideally, we could mask some of the columns based on bitmask, I'd leave that in the future, because currently Aggregation (50 ns) is much slower than that just copy the variables (1-2 ns). Author: Davies Liu <dav...@databricks.com> Closes #11177 from davies/gen_expand. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2228f074 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2228f074 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2228f074 Branch: refs/heads/master Commit: 2228f074e1bc11ee452925e10f780eaf24faf9e5 Parents: 62b1c07 Author: Davies Liu <dav...@databricks.com> Authored: Fri Feb 12 17:32:15 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Fri Feb 12 17:32:15 2016 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/execution/Expand.scala | 124 ++++++++++++++++++- .../execution/BenchmarkWholeStageCodegen.scala | 17 +++ 2 files changed, 140 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2228f074/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index c3683cc..d26a0b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -17,11 +17,15 @@ package org.apache.spark.sql.execution +import scala.collection.immutable.IndexedSeq + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.metric.SQLMetrics /** * Apply the all of the GroupExpressions to every input row, hence we will get @@ -35,7 +39,10 @@ case class Expand( projections: Seq[Seq[Expression]], output: Seq[Attribute], child: SparkPlan) - extends UnaryNode { + extends UnaryNode with CodegenSupport { + + private[sql] override lazy val metrics = Map( + "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) // The GroupExpressions can output data with arbitrary partitioning, so set it // as UNKNOWN partitioning @@ -48,6 +55,8 @@ case class Expand( (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + val numOutputRows = longMetric("numOutputRows") + child.execute().mapPartitions { iter => val groups = projections.map(projection).toArray new Iterator[InternalRow] { @@ -71,9 +80,122 @@ case class Expand( idx = 0 } + numOutputRows += 1 result } } } } + + override def upstream(): RDD[InternalRow] = { + child.asInstanceOf[CodegenSupport].upstream() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + /* + * When the projections list looks like: + * expr1A, exprB, expr1C + * expr2A, exprB, expr2C + * ... + * expr(N-1)A, exprB, expr(N-1)C + * + * i.e. column A and C have different values for each output row, but column B stays constant. + * + * The generated code looks something like (note that B is only computed once in declaration): + * + * // part 1: declare all the columns + * colA = ... + * colB = ... + * colC = ... + * + * // part 2: code that computes the columns + * for (row = 0; row < N; row++) { + * switch (row) { + * case 0: + * colA = ... + * colC = ... + * case 1: + * colA = ... + * colC = ... + * ... + * case N - 1: + * colA = ... + * colC = ... + * } + * // increment metrics and consume output values + * } + * + * We use a for loop here so we only includes one copy of the consume code and avoid code + * size explosion. + */ + + // Set input variables + ctx.currentVars = input + + // Tracks whether a column has the same output for all rows. + // Size of sameOutput array should equal N. + // If sameOutput(i) is true, then the i-th column has the same value for all output rows given + // an input row. + val sameOutput: Array[Boolean] = output.indices.map { colIndex => + projections.map(p => p(colIndex)).toSet.size == 1 + }.toArray + + // Part 1: declare variables for each column + // If a column has the same value for all output rows, then we also generate its computation + // right after declaration. Otherwise its value is computed in the part 2. + val outputColumns = output.indices.map { col => + val firstExpr = projections.head(col) + if (sameOutput(col)) { + // This column is the same across all output rows. Just generate code for it here. + BindReferences.bindReference(firstExpr, child.output).gen(ctx) + } else { + val isNull = ctx.freshName("isNull") + val value = ctx.freshName("value") + val code = s""" + |boolean $isNull = true; + |${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)}; + """.stripMargin + ExprCode(code, isNull, value) + } + } + + // Part 2: switch/case statements + val cases = projections.zipWithIndex.map { case (exprs, row) => + var updateCode = "" + for (col <- exprs.indices) { + if (!sameOutput(col)) { + val ev = BindReferences.bindReference(exprs(col), child.output).gen(ctx) + updateCode += + s""" + |${ev.code} + |${outputColumns(col).isNull} = ${ev.isNull}; + |${outputColumns(col).value} = ${ev.value}; + """.stripMargin + } + } + + s""" + |case $row: + | ${updateCode.trim} + | break; + """.stripMargin + } + + val numOutput = metricTerm(ctx, "numOutputRows") + val i = ctx.freshName("i") + s""" + |${outputColumns.map(_.code).mkString("\n").trim} + |for (int $i = 0; $i < ${projections.length}; $i ++) { + | switch ($i) { + | ${cases.mkString("\n").trim} + | } + | $numOutput.add(1); + | ${consume(ctx, outputColumns)} + |} + """.stripMargin + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2228f074/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 1c7e69f..4a15117 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -157,6 +157,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { } + ignore("rube") { + val N = 5 << 20 + + runBenchmark("cube", N) { + sqlContext.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2") + .cube("k1", "k2").sum("id").collect() + } + + /** + Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz + cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + cube codegen=false 3188 / 3392 1.6 608.2 1.0X + cube codegen=true 1239 / 1394 4.2 236.3 2.6X + */ + } + ignore("hash and BytesToBytesMap") { val N = 50 << 20 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org