Repository: spark Updated Branches: refs/heads/master 3b5eb7083 -> 4238c17dc
[SPARK-3197] [SQL] Reduce the Expression tree object creations for aggregation function (min/max) Aggregation function min/max in catalyst will create expression tree for each single row, however, the expression tree creation is quite expensive in a multithreading env currently. Hence we got a very bad performance for the min/max. Here is the benchmark that I've done in my local. Master | Previous Result (ms) | Current Result (ms) ------------ | ------------- | ------------- local | 3645 | 3416 local[6] | 3602 | 1002 The Benchmark source code. ``` case class Record(key: Int, value: Int) object TestHive2 extends HiveContext(new SparkContext("local[6]", "TestSQLContext", new SparkConf())) object DataPrepare extends App { import TestHive2._ val rdd = sparkContext.parallelize((1 to 10000000).map(i => Record(i % 3000, i)), 12) runSqlHive("SHOW TABLES") runSqlHive("DROP TABLE if exists a") runSqlHive("DROP TABLE if exists result") rdd.registerAsTable("records") runSqlHive("""CREATE TABLE a (key INT, value INT) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) runSqlHive("""CREATE TABLE result (key INT, value INT) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) hql(s"""from records | insert into table a | select key, value """.stripMargin) } object PerformanceTest extends App { import TestHive2._ hql("SHOW TABLES") hql("set spark.sql.shuffle.partitions=12") val cmd = "select min(value), max(value) from a group by key" val results = ("Result1", benchmark(cmd)) :: ("Result2", benchmark(cmd)) :: ("Result3", benchmark(cmd)) :: Nil results.foreach { case (prompt, result) => { println(s"$prompt: took ${result._1} ms (${result._2} records)") } } def benchmark(cmd: String) = { val begin = System.currentTimeMillis() val count = hql(cmd).count val end = System.currentTimeMillis() ((end - begin), count) } } ``` Author: Cheng Hao <hao.ch...@intel.com> Closes #2113 from chenghao-intel/aggregation_expression_optimization and squashes the following commits: db40395 [Cheng Hao] remove the transient and add val for the expression property d56167d [Cheng Hao] Reduce the Expressions creation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4238c17d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4238c17d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4238c17d Branch: refs/heads/master Commit: 4238c17dc9e1f2f93cc9e6c768f92bd27bf1df66 Parents: 3b5eb70 Author: Cheng Hao <hao.ch...@intel.com> Authored: Wed Aug 27 12:50:47 2014 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Aug 27 12:50:47 2014 -0700 ---------------------------------------------------------------------- .../sql/catalyst/expressions/aggregates.scala | 30 +++++++++++--------- .../sql/catalyst/expressions/literals.scala | 5 ++-- 2 files changed, 18 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4238c17d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index dbc0c29..15560a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -105,17 +105,18 @@ case class Min(child: Expression) extends PartialAggregate with trees.UnaryNode[ case class MinFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) // Required for serialization. - var currentMin: Any = _ + val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType) + val cmp = GreaterThan(currentMin, expr) override def update(input: Row): Unit = { - if (currentMin == null) { - currentMin = expr.eval(input) - } else if(GreaterThan(Literal(currentMin, expr.dataType), expr).eval(input) == true) { - currentMin = expr.eval(input) + if (currentMin.value == null) { + currentMin.value = expr.eval(input) + } else if(cmp.eval(input) == true) { + currentMin.value = expr.eval(input) } } - override def eval(input: Row): Any = currentMin + override def eval(input: Row): Any = currentMin.value } case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -135,17 +136,18 @@ case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[ case class MaxFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { def this() = this(null, null) // Required for serialization. - var currentMax: Any = _ + val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType) + val cmp = LessThan(currentMax, expr) override def update(input: Row): Unit = { - if (currentMax == null) { - currentMax = expr.eval(input) - } else if(LessThan(Literal(currentMax, expr.dataType), expr).eval(input) == true) { - currentMax = expr.eval(input) + if (currentMax.value == null) { + currentMax.value = expr.eval(input) + } else if(cmp.eval(input) == true) { + currentMax.value = expr.eval(input) } } - override def eval(input: Row): Any = currentMax + override def eval(input: Row): Any = currentMax.value } case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -350,7 +352,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression) private val zero = Cast(Literal(0), expr.dataType) private var count: Long = _ - private val sum = MutableLiteral(zero.eval(EmptyRow)) + private val sum = MutableLiteral(zero.eval(null), expr.dataType) private val sumAsDouble = Cast(sum, DoubleType) private def addFunction(value: Any) = Add(sum, Literal(value)) @@ -423,7 +425,7 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr private val zero = Cast(Literal(0), expr.dataType) - private val sum = MutableLiteral(zero.eval(null)) + private val sum = MutableLiteral(zero.eval(null), expr.dataType) private val addFunction = Add(sum, Coalesce(Seq(expr, zero))) http://git-wip-us.apache.org/repos/asf/spark/blob/4238c17d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a8c2396..78a0c55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -61,11 +61,10 @@ case class Literal(value: Any, dataType: DataType) extends LeafExpression { } // TODO: Specialize -case class MutableLiteral(var value: Any, nullable: Boolean = true) extends LeafExpression { +case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true) + extends LeafExpression { type EvaluatedType = Any - val dataType = Literal(value).dataType - def update(expression: Expression, input: Row) = { value = expression.eval(input) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org