Repository: spark
Updated Branches:
  refs/heads/master 3b5eb7083 -> 4238c17dc


[SPARK-3197] [SQL] Reduce the Expression tree object creations for aggregation 
function (min/max)

Aggregation function min/max in catalyst will create expression tree for each 
single row, however, the expression tree creation is quite expensive in a 
multithreading env currently. Hence we got a very bad performance for the 
min/max.
Here is the benchmark that I've done in my local.

Master | Previous Result (ms) | Current Result (ms)
------------ | ------------- | -------------
local | 3645 | 3416
local[6] | 3602 | 1002

The Benchmark source code.
```
case class Record(key: Int, value: Int)

object TestHive2 extends HiveContext(new SparkContext("local[6]", 
"TestSQLContext", new SparkConf()))

object DataPrepare extends App {
  import TestHive2._

  val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 3000, 
i)), 12)

  runSqlHive("SHOW TABLES")
  runSqlHive("DROP TABLE if exists a")
  runSqlHive("DROP TABLE if exists result")
  rdd.registerAsTable("records")

  runSqlHive("""CREATE TABLE a (key INT, value INT)
                 | ROW FORMAT SERDE
                 | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)
  runSqlHive("""CREATE TABLE result (key INT, value INT)
                 | ROW FORMAT SERDE
                 | 
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
                 | STORED AS RCFILE
               """.stripMargin)

  hql(s"""from records
             | insert into table a
             | select key, value
           """.stripMargin)
}

object PerformanceTest extends App {
  import TestHive2._

  hql("SHOW TABLES")
  hql("set spark.sql.shuffle.partitions=12")

  val cmd = "select min(value), max(value) from a group by key"

  val results = ("Result1", benchmark(cmd)) ::
                ("Result2", benchmark(cmd)) ::
                ("Result3", benchmark(cmd)) :: Nil
  results.foreach { case (prompt, result) => {
      println(s"$prompt: took ${result._1} ms (${result._2} records)")
    }
  }

  def benchmark(cmd: String) = {
    val begin = System.currentTimeMillis()
    val count = hql(cmd).count
    val end = System.currentTimeMillis()
    ((end - begin), count)
  }
}
```

Author: Cheng Hao <hao.ch...@intel.com>

Closes #2113 from chenghao-intel/aggregation_expression_optimization and 
squashes the following commits:

db40395 [Cheng Hao] remove the transient and add val for the expression property
d56167d [Cheng Hao] Reduce the Expressions creation


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4238c17d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4238c17d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4238c17d

Branch: refs/heads/master
Commit: 4238c17dc9e1f2f93cc9e6c768f92bd27bf1df66
Parents: 3b5eb70
Author: Cheng Hao <hao.ch...@intel.com>
Authored: Wed Aug 27 12:50:47 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Aug 27 12:50:47 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/aggregates.scala   | 30 +++++++++++---------
 .../sql/catalyst/expressions/literals.scala     |  5 ++--
 2 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4238c17d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
index dbc0c29..15560a2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
@@ -105,17 +105,18 @@ case class Min(child: Expression) extends 
PartialAggregate with trees.UnaryNode[
 case class MinFunction(expr: Expression, base: AggregateExpression) extends 
AggregateFunction {
   def this() = this(null, null) // Required for serialization.
 
-  var currentMin: Any = _
+  val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType)
+  val cmp = GreaterThan(currentMin, expr)
 
   override def update(input: Row): Unit = {
-    if (currentMin == null) {
-      currentMin = expr.eval(input)
-    } else if(GreaterThan(Literal(currentMin, expr.dataType), 
expr).eval(input) == true) {
-      currentMin = expr.eval(input)
+    if (currentMin.value == null) {
+      currentMin.value = expr.eval(input)
+    } else if(cmp.eval(input) == true) {
+      currentMin.value = expr.eval(input)
     }
   }
 
-  override def eval(input: Row): Any = currentMin
+  override def eval(input: Row): Any = currentMin.value
 }
 
 case class Max(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
@@ -135,17 +136,18 @@ case class Max(child: Expression) extends 
PartialAggregate with trees.UnaryNode[
 case class MaxFunction(expr: Expression, base: AggregateExpression) extends 
AggregateFunction {
   def this() = this(null, null) // Required for serialization.
 
-  var currentMax: Any = _
+  val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType)
+  val cmp = LessThan(currentMax, expr)
 
   override def update(input: Row): Unit = {
-    if (currentMax == null) {
-      currentMax = expr.eval(input)
-    } else if(LessThan(Literal(currentMax, expr.dataType), expr).eval(input) 
== true) {
-      currentMax = expr.eval(input)
+    if (currentMax.value == null) {
+      currentMax.value = expr.eval(input)
+    } else if(cmp.eval(input) == true) {
+      currentMax.value = expr.eval(input)
     }
   }
 
-  override def eval(input: Row): Any = currentMax
+  override def eval(input: Row): Any = currentMax.value
 }
 
 case class Count(child: Expression) extends PartialAggregate with 
trees.UnaryNode[Expression] {
@@ -350,7 +352,7 @@ case class AverageFunction(expr: Expression, base: 
AggregateExpression)
   private val zero = Cast(Literal(0), expr.dataType)
 
   private var count: Long = _
-  private val sum = MutableLiteral(zero.eval(EmptyRow))
+  private val sum = MutableLiteral(zero.eval(null), expr.dataType)
   private val sumAsDouble = Cast(sum, DoubleType)
 
   private def addFunction(value: Any) = Add(sum, Literal(value))
@@ -423,7 +425,7 @@ case class SumFunction(expr: Expression, base: 
AggregateExpression) extends Aggr
 
   private val zero = Cast(Literal(0), expr.dataType)
 
-  private val sum = MutableLiteral(zero.eval(null))
+  private val sum = MutableLiteral(zero.eval(null), expr.dataType)
 
   private val addFunction = Add(sum, Coalesce(Seq(expr, zero)))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4238c17d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
index a8c2396..78a0c55 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
@@ -61,11 +61,10 @@ case class Literal(value: Any, dataType: DataType) extends 
LeafExpression {
 }
 
 // TODO: Specialize
-case class MutableLiteral(var value: Any, nullable: Boolean = true) extends 
LeafExpression {
+case class MutableLiteral(var value: Any, dataType: DataType, nullable: 
Boolean = true) 
+    extends LeafExpression {
   type EvaluatedType = Any
 
-  val dataType = Literal(value).dataType
-
   def update(expression: Expression, input: Row) = {
     value = expression.eval(input)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to