Repository: spark
Updated Branches:
  refs/heads/master 62b1c07e7 -> 2228f074e


[SPARK-13293][SQL] generate Expand

Expand suffer from create the UnsafeRow from same input multiple times, with 
codegen, it only need to copy some of the columns.

After this, we can see 3X improvements (from 43 seconds to 13 seconds) on a 
TPCDS query (Q67) that have eight columns in Rollup.

Ideally, we could mask some of the columns based on bitmask, I'd leave that in 
the future, because currently Aggregation (50 ns) is much slower than that just 
copy the variables (1-2 ns).

Author: Davies Liu <dav...@databricks.com>

Closes #11177 from davies/gen_expand.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2228f074
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2228f074
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2228f074

Branch: refs/heads/master
Commit: 2228f074e1bc11ee452925e10f780eaf24faf9e5
Parents: 62b1c07
Author: Davies Liu <dav...@databricks.com>
Authored: Fri Feb 12 17:32:15 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Feb 12 17:32:15 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/Expand.scala | 124 ++++++++++++++++++-
 .../execution/BenchmarkWholeStageCodegen.scala  |  17 +++
 2 files changed, 140 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2228f074/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index c3683cc..d26a0b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -17,11 +17,15 @@
 
 package org.apache.spark.sql.execution
 
+import scala.collection.immutable.IndexedSeq
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * Apply the all of the GroupExpressions to every input row, hence we will get
@@ -35,7 +39,10 @@ case class Expand(
     projections: Seq[Seq[Expression]],
     output: Seq[Attribute],
     child: SparkPlan)
-  extends UnaryNode {
+  extends UnaryNode with CodegenSupport {
+
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
 
   // The GroupExpressions can output data with arbitrary partitioning, so set 
it
   // as UNKNOWN partitioning
@@ -48,6 +55,8 @@ case class Expand(
     (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
+    val numOutputRows = longMetric("numOutputRows")
+
     child.execute().mapPartitions { iter =>
       val groups = projections.map(projection).toArray
       new Iterator[InternalRow] {
@@ -71,9 +80,122 @@ case class Expand(
             idx = 0
           }
 
+          numOutputRows += 1
           result
         }
       }
     }
   }
+
+  override def upstream(): RDD[InternalRow] = {
+    child.asInstanceOf[CodegenSupport].upstream()
+  }
+
+  protected override def doProduce(ctx: CodegenContext): String = {
+    child.asInstanceOf[CodegenSupport].produce(ctx, this)
+  }
+
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+    /*
+     * When the projections list looks like:
+     *   expr1A, exprB, expr1C
+     *   expr2A, exprB, expr2C
+     *   ...
+     *   expr(N-1)A, exprB, expr(N-1)C
+     *
+     * i.e. column A and C have different values for each output row, but 
column B stays constant.
+     *
+     * The generated code looks something like (note that B is only computed 
once in declaration):
+     *
+     * // part 1: declare all the columns
+     * colA = ...
+     * colB = ...
+     * colC = ...
+     *
+     * // part 2: code that computes the columns
+     * for (row = 0; row < N; row++) {
+     *   switch (row) {
+     *     case 0:
+     *       colA = ...
+     *       colC = ...
+     *     case 1:
+     *       colA = ...
+     *       colC = ...
+     *     ...
+     *     case N - 1:
+     *       colA = ...
+     *       colC = ...
+     *   }
+     *   // increment metrics and consume output values
+     * }
+     *
+     * We use a for loop here so we only includes one copy of the consume code 
and avoid code
+     * size explosion.
+     */
+
+    // Set input variables
+    ctx.currentVars = input
+
+    // Tracks whether a column has the same output for all rows.
+    // Size of sameOutput array should equal N.
+    // If sameOutput(i) is true, then the i-th column has the same value for 
all output rows given
+    // an input row.
+    val sameOutput: Array[Boolean] = output.indices.map { colIndex =>
+      projections.map(p => p(colIndex)).toSet.size == 1
+    }.toArray
+
+    // Part 1: declare variables for each column
+    // If a column has the same value for all output rows, then we also 
generate its computation
+    // right after declaration. Otherwise its value is computed in the part 2.
+    val outputColumns = output.indices.map { col =>
+      val firstExpr = projections.head(col)
+      if (sameOutput(col)) {
+        // This column is the same across all output rows. Just generate code 
for it here.
+        BindReferences.bindReference(firstExpr, child.output).gen(ctx)
+      } else {
+        val isNull = ctx.freshName("isNull")
+        val value = ctx.freshName("value")
+        val code = s"""
+          |boolean $isNull = true;
+          |${ctx.javaType(firstExpr.dataType)} $value = 
${ctx.defaultValue(firstExpr.dataType)};
+         """.stripMargin
+        ExprCode(code, isNull, value)
+      }
+    }
+
+    // Part 2: switch/case statements
+    val cases = projections.zipWithIndex.map { case (exprs, row) =>
+      var updateCode = ""
+      for (col <- exprs.indices) {
+        if (!sameOutput(col)) {
+          val ev = BindReferences.bindReference(exprs(col), 
child.output).gen(ctx)
+          updateCode +=
+            s"""
+               |${ev.code}
+               |${outputColumns(col).isNull} = ${ev.isNull};
+               |${outputColumns(col).value} = ${ev.value};
+            """.stripMargin
+        }
+      }
+
+      s"""
+         |case $row:
+         |  ${updateCode.trim}
+         |  break;
+       """.stripMargin
+    }
+
+    val numOutput = metricTerm(ctx, "numOutputRows")
+    val i = ctx.freshName("i")
+    s"""
+       |${outputColumns.map(_.code).mkString("\n").trim}
+       |for (int $i = 0; $i < ${projections.length}; $i ++) {
+       |  switch ($i) {
+       |    ${cases.mkString("\n").trim}
+       |  }
+       |  $numOutput.add(1);
+       |  ${consume(ctx, outputColumns)}
+       |}
+     """.stripMargin
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2228f074/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 1c7e69f..4a15117 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -157,6 +157,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
   }
 
+  ignore("rube") {
+    val N = 5 << 20
+
+    runBenchmark("cube", N) {
+      sqlContext.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
+        .cube("k1", "k2").sum("id").collect()
+    }
+
+    /**
+      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+      cube:                               Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+      
-------------------------------------------------------------------------------------------
+      cube codegen=false                       3188 / 3392          1.6        
 608.2       1.0X
+      cube codegen=true                        1239 / 1394          4.2        
 236.3       2.6X
+      */
+  }
+
   ignore("hash and BytesToBytesMap") {
     val N = 50 << 20
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to