http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index ed810a1..0290faf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -68,7 +68,7 @@ class FilterPushdownSuite extends PlanTest { test("column pruning for group") { val originalQuery = testRelation - .groupBy('a)('a, Count('b)) + .groupBy('a)('a, count('b)) .select('a) val optimized = Optimize.execute(originalQuery.analyze) @@ -84,7 +84,7 @@ class FilterPushdownSuite extends PlanTest { test("column pruning for group with alias") { val originalQuery = testRelation - .groupBy('a)('a as 'c, Count('b)) + .groupBy('a)('a as 'c, count('b)) .select('c) val optimized = Optimize.execute(originalQuery.analyze) @@ -656,7 +656,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: push down filter when filter on group by expression") { val originalQuery = testRelation - .groupBy('a)('a, Count('b) as 'c) + .groupBy('a)('a, count('b) as 'c) .select('a, 'c) .where('a === 2) @@ -664,7 +664,7 @@ class FilterPushdownSuite extends PlanTest { val correctAnswer = testRelation .where('a === 2) - .groupBy('a)('a, Count('b) as 'c) + .groupBy('a)('a, count('b) as 'c) .analyze comparePlans(optimized, correctAnswer) } @@ -672,7 +672,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: don't push down filter when filter not on group by expression") { val originalQuery = testRelation .select('a, 'b) - .groupBy('a)('a, Count('b) as 'c) + .groupBy('a)('a, count('b) as 'c) .where('c === 2L) val optimized = Optimize.execute(originalQuery.analyze) @@ -683,7 +683,7 @@ class FilterPushdownSuite extends PlanTest { test("aggregate: push down filters partially which are subset of group by expressions") { val originalQuery = testRelation .select('a, 'b) - .groupBy('a)('a, Count('b) as 'c) + .groupBy('a)('a, count('b) as 'c) .where('c === 2L && 'a === 3) val optimized = Optimize.execute(originalQuery.analyze) @@ -691,7 +691,7 @@ class FilterPushdownSuite extends PlanTest { val correctAnswer = testRelation .select('a, 'b) .where('a === 3) - .groupBy('a)('a, Count('b) as 'c) + .groupBy('a)('a, count('b) as 'c) .where('c === 2L) .analyze
http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index d25807c..3b69247 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.encoders.Encoder import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} @@ -1338,7 +1339,7 @@ class DataFrame private[sql]( if (groupColExprIds.contains(attr.exprId)) { attr } else { - Alias(First(attr), attr.name)() + Alias(new First(attr).toAggregateExpression(), attr.name)() } } Aggregate(groupCols, aggCols, logicalPlan) @@ -1381,11 +1382,11 @@ class DataFrame private[sql]( // The list of summary statistics to compute, in the form of expressions. val statistics = List[(String, Expression => Expression)]( - "count" -> Count, - "mean" -> Average, - "stddev" -> StddevSamp, - "min" -> Min, - "max" -> Max) + "count" -> ((child: Expression) => Count(child).toAggregateExpression()), + "mean" -> ((child: Expression) => Average(child).toAggregateExpression()), + "stddev" -> ((child: Expression) => StddevSamp(child).toAggregateExpression()), + "min" -> ((child: Expression) => Min(child).toAggregateExpression()), + "max" -> ((child: Expression) => Max(child).toAggregateExpression())) val outputCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index f9eab5c..5babf2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -21,8 +21,9 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, Star} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedAlias, UnresolvedAttribute, Star} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate} import org.apache.spark.sql.types.NumericType @@ -70,7 +71,7 @@ class GroupedData protected[sql]( } } - private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => Expression) + private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => AggregateFunction) : DataFrame = { val columnExprs = if (colNames.isEmpty) { @@ -88,30 +89,28 @@ class GroupedData protected[sql]( namedExpr } } - toDF(columnExprs.map(f)) + toDF(columnExprs.map(expr => f(expr).toAggregateExpression())) } private[this] def strToExpr(expr: String): (Expression => Expression) = { - expr.toLowerCase match { - case "avg" | "average" | "mean" => Average - case "max" => Max - case "min" => Min - case "stddev" | "std" => StddevSamp - case "stddev_pop" => StddevPop - case "stddev_samp" => StddevSamp - case "variance" => VarianceSamp - case "var_pop" => VariancePop - case "var_samp" => VarianceSamp - case "sum" => Sum - case "skewness" => Skewness - case "kurtosis" => Kurtosis - case "count" | "size" => - // Turn count(*) into count(1) - (inputExpr: Expression) => inputExpr match { - case s: Star => Count(Literal(1)) - case _ => Count(inputExpr) - } + val exprToFunc: (Expression => Expression) = { + (inputExpr: Expression) => expr.toLowerCase match { + // We special handle a few cases that have alias that are not in function registry. + case "avg" | "average" | "mean" => + UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false) + case "stddev" | "std" => + UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false) + // Also special handle count because we need to take care count(*). + case "count" | "size" => + // Turn count(*) into count(1) + inputExpr match { + case s: Star => Count(Literal(1)).toAggregateExpression() + case _ => Count(inputExpr).toAggregateExpression() + } + case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = false) + } } + (inputExpr: Expression) => exprToFunc(inputExpr) } /** @@ -213,7 +212,7 @@ class GroupedData protected[sql]( * * @since 1.3.0 */ - def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")())) + def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")())) /** * Compute the average value for each numeric columns for each group. This is an alias for `avg`. http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index ed8b634..b731418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -448,15 +448,24 @@ private[spark] object SQLConf { defaultValue = Some(true), isPublic = false) - val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2", - defaultValue = Some(true), doc = "<TODO>") - val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles", defaultValue = Some(true), isPublic = false, doc = "When true, we could use `datasource`.`path` as table in SQL query" ) + val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING = + booleanConf("spark.sql.specializeSingleDistinctAggPlanning", + defaultValue = Some(true), + isPublic = false, + doc = "When true, if a query only has a single distinct column and it has " + + "grouping expressions, we will use our planner rule to handle this distinct " + + "column (other cases are handled by DistinctAggregationRewriter). " + + "When false, we will always use DistinctAggregationRewriter to plan " + + "aggregation queries with DISTINCT keyword. This is an internal flag that is " + + "used to benchmark the performance impact of using DistinctAggregationRewriter to " + + "plan aggregation queries with a single distinct column.") + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -532,8 +541,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED)) - private[spark] def useSqlAggregate2: Boolean = getConf(USE_SQL_AGGREGATE2) - private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) private[spark] def defaultSizeInBytes: Long = @@ -575,6 +582,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) + protected[spark] override def specializeSingleDistinctAggPlanning: Boolean = + getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala deleted file mode 100644 index 6f3f1bd..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution - -import java.util.HashMap - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.metric.SQLMetrics - -/** - * Groups input data by `groupingExpressions` and computes the `aggregateExpressions` for each - * group. - * - * @param partial if true then aggregation is done partially on local data without shuffling to - * ensure all values where `groupingExpressions` are equal are present. - * @param groupingExpressions expressions that are evaluated to determine grouping. - * @param aggregateExpressions expressions that are computed for each group. - * @param child the input data source. - */ -case class Aggregate( - partial: Boolean, - groupingExpressions: Seq[Expression], - aggregateExpressions: Seq[NamedExpression], - child: SparkPlan) - extends UnaryNode { - - override private[sql] lazy val metrics = Map( - "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"), - "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - - override def requiredChildDistribution: List[Distribution] = { - if (partial) { - UnspecifiedDistribution :: Nil - } else { - if (groupingExpressions == Nil) { - AllTuples :: Nil - } else { - ClusteredDistribution(groupingExpressions) :: Nil - } - } - } - - override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute) - - /** - * An aggregate that needs to be computed for each row in a group. - * - * @param unbound Unbound version of this aggregate, used for result substitution. - * @param aggregate A bound copy of this aggregate used to create a new aggregation buffer. - * @param resultAttribute An attribute used to refer to the result of this aggregate in the final - * output. - */ - case class ComputedAggregate( - unbound: AggregateExpression1, - aggregate: AggregateExpression1, - resultAttribute: AttributeReference) - - /** A list of aggregates that need to be computed for each group. */ - private[this] val computedAggregates = aggregateExpressions.flatMap { agg => - agg.collect { - case a: AggregateExpression1 => - ComputedAggregate( - a, - BindReferences.bindReference(a, child.output), - AttributeReference(s"aggResult:$a", a.dataType, a.nullable)()) - } - }.toArray - - /** The schema of the result of all aggregate evaluations */ - private[this] val computedSchema = computedAggregates.map(_.resultAttribute) - - /** Creates a new aggregate buffer for a group. */ - private[this] def newAggregateBuffer(): Array[AggregateFunction1] = { - val buffer = new Array[AggregateFunction1](computedAggregates.length) - var i = 0 - while (i < computedAggregates.length) { - buffer(i) = computedAggregates(i).aggregate.newInstance() - i += 1 - } - buffer - } - - /** Named attributes used to substitute grouping attributes into the final result. */ - private[this] val namedGroups = groupingExpressions.map { - case ne: NamedExpression => ne -> ne.toAttribute - case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute - } - - /** - * A map of substitutions that are used to insert the aggregate expressions and grouping - * expression into the final result expression. - */ - private[this] val resultMap = - (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ namedGroups).toMap - - /** - * Substituted version of aggregateExpressions expressions which are used to compute final - * output rows given a group and the result of all aggregate computations. - */ - private[this] val resultExpressions = aggregateExpressions.map { agg => - agg.transform { - case e: Expression if resultMap.contains(e) => resultMap(e) - } - } - - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { - val numInputRows = longMetric("numInputRows") - val numOutputRows = longMetric("numOutputRows") - if (groupingExpressions.isEmpty) { - child.execute().mapPartitions { iter => - val buffer = newAggregateBuffer() - var currentRow: InternalRow = null - while (iter.hasNext) { - currentRow = iter.next() - numInputRows += 1 - var i = 0 - while (i < buffer.length) { - buffer(i).update(currentRow) - i += 1 - } - } - val resultProjection = new InterpretedProjection(resultExpressions, computedSchema) - val aggregateResults = new GenericMutableRow(computedAggregates.length) - - var i = 0 - while (i < buffer.length) { - aggregateResults(i) = buffer(i).eval(EmptyRow) - i += 1 - } - - numOutputRows += 1 - Iterator(resultProjection(aggregateResults)) - } - } else { - child.execute().mapPartitions { iter => - val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]] - val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) - - var currentRow: InternalRow = null - while (iter.hasNext) { - currentRow = iter.next() - numInputRows += 1 - val currentGroup = groupingProjection(currentRow) - var currentBuffer = hashTable.get(currentGroup) - if (currentBuffer == null) { - currentBuffer = newAggregateBuffer() - hashTable.put(currentGroup.copy(), currentBuffer) - } - - var i = 0 - while (i < currentBuffer.length) { - currentBuffer(i).update(currentRow) - i += 1 - } - } - - new Iterator[InternalRow] { - private[this] val hashTableIter = hashTable.entrySet().iterator() - private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) - private[this] val resultProjection = - new InterpretedMutableProjection( - resultExpressions, computedSchema ++ namedGroups.map(_._2)) - private[this] val joinedRow = new JoinedRow - - override final def hasNext: Boolean = hashTableIter.hasNext - - override final def next(): InternalRow = { - val currentEntry = hashTableIter.next() - val currentGroup = currentEntry.getKey - val currentBuffer = currentEntry.getValue - numOutputRows += 1 - - var i = 0 - while (i < currentBuffer.length) { - // Evaluating an aggregate buffer returns the result. No row is required since we - // already added all rows in the group using update. - aggregateResults(i) = currentBuffer(i).eval(EmptyRow) - i += 1 - } - resultProjection(joinedRow(aggregateResults, currentGroup)) - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala index 55e9576..91530bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala @@ -45,6 +45,9 @@ case class Expand( override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = true + override def references: AttributeSet = + AttributeSet(projections.flatten.flatMap(_.references)) + private[this] val projection = { if (outputsUnsafeRows) { (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 0f98fe8..a10d1ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -38,7 +38,6 @@ class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies { DataSourceStrategy :: DDLStrategy :: TakeOrderedAndProject :: - HashAggregation :: Aggregation :: LeftSemiJoin :: EquiJoinSelection :: http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dd3bb33..d65cb1b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, Utils} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} @@ -146,148 +146,104 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - object HashAggregation extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Aggregations that can be performed in two phases, before and after the shuffle. - case PartialAggregation( - namedGroupingAttributes, - rewrittenAggregateExpressions, - groupingExpressions, - partialComputation, - child) if !canBeConvertedToNewAggregation(plan) => - execution.Aggregate( - partial = false, - namedGroupingAttributes, - rewrittenAggregateExpressions, - execution.Aggregate( - partial = true, - groupingExpressions, - partialComputation, - planLater(child))) :: Nil - - case _ => Nil - } - - def canBeConvertedToNewAggregation(plan: LogicalPlan): Boolean = plan match { - case a: logical.Aggregate => - if (sqlContext.conf.useSqlAggregate2 && sqlContext.conf.codegenEnabled) { - a.newAggregation.isDefined - } else { - Utils.checkInvalidAggregateFunction2(a) - false - } - case _ => false - } - - def allAggregates(exprs: Seq[Expression]): Seq[AggregateExpression1] = - exprs.flatMap(_.collect { case a: AggregateExpression1 => a }) - } - /** * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. */ object Aggregation extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case p: logical.Aggregate if sqlContext.conf.useSqlAggregate2 && - sqlContext.conf.codegenEnabled => - val converted = p.newAggregation - converted match { - case None => Nil // Cannot convert to new aggregation code path. - case Some(logical.Aggregate(groupingExpressions, resultExpressions, child)) => - // A single aggregate expression might appear multiple times in resultExpressions. - // In order to avoid evaluating an individual aggregate function multiple times, we'll - // build a set of the distinct aggregate expressions and build a function which can - // be used to re-write expressions so that they reference the single copy of the - // aggregate function which actually gets computed. - val aggregateExpressions = resultExpressions.flatMap { expr => - expr.collect { - case agg: AggregateExpression2 => agg - } - }.distinct - // For those distinct aggregate expressions, we create a map from the - // aggregate function to the corresponding attribute of the function. - val aggregateFunctionToAttribute = aggregateExpressions.map { agg => - val aggregateFunction = agg.aggregateFunction - val attribute = Alias(aggregateFunction, aggregateFunction.toString)().toAttribute - (aggregateFunction, agg.isDistinct) -> attribute - }.toMap - - val (functionsWithDistinct, functionsWithoutDistinct) = - aggregateExpressions.partition(_.isDistinct) - if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { - // This is a sanity check. We should not reach here when we have multiple distinct - // column sets (aggregate.NewAggregation will not match). - sys.error( - "Multiple distinct column sets are not supported by the new aggregation" + - "code path.") - } + case logical.Aggregate(groupingExpressions, resultExpressions, child) => + // A single aggregate expression might appear multiple times in resultExpressions. + // In order to avoid evaluating an individual aggregate function multiple times, we'll + // build a set of the distinct aggregate expressions and build a function which can + // be used to re-write expressions so that they reference the single copy of the + // aggregate function which actually gets computed. + val aggregateExpressions = resultExpressions.flatMap { expr => + expr.collect { + case agg: AggregateExpression => agg + } + }.distinct + // For those distinct aggregate expressions, we create a map from the + // aggregate function to the corresponding attribute of the function. + val aggregateFunctionToAttribute = aggregateExpressions.map { agg => + val aggregateFunction = agg.aggregateFunction + val attribute = Alias(aggregateFunction, aggregateFunction.toString)().toAttribute + (aggregateFunction, agg.isDistinct) -> attribute + }.toMap + + val (functionsWithDistinct, functionsWithoutDistinct) = + aggregateExpressions.partition(_.isDistinct) + if (functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) { + // This is a sanity check. We should not reach here when we have multiple distinct + // column sets. Our MultipleDistinctRewriter should take care this case. + sys.error("You hit a query analyzer bug. Please report your query to " + + "Spark user mailing list.") + } - val namedGroupingExpressions = groupingExpressions.map { - case ne: NamedExpression => ne -> ne - // If the expression is not a NamedExpressions, we add an alias. - // So, when we generate the result of the operator, the Aggregate Operator - // can directly get the Seq of attributes representing the grouping expressions. - case other => - val withAlias = Alias(other, other.toString)() - other -> withAlias - } - val groupExpressionMap = namedGroupingExpressions.toMap - - // The original `resultExpressions` are a set of expressions which may reference - // aggregate expressions, grouping column values, and constants. When aggregate operator - // emits output rows, we will use `resultExpressions` to generate an output projection - // which takes the grouping columns and final aggregate result buffer as input. - // Thus, we must re-write the result expressions so that their attributes match up with - // the attributes of the final result projection's input row: - val rewrittenResultExpressions = resultExpressions.map { expr => - expr.transformDown { - case AggregateExpression2(aggregateFunction, _, isDistinct) => - // The final aggregation buffer's attributes will be `finalAggregationAttributes`, - // so replace each aggregate expression by its corresponding attribute in the set: - aggregateFunctionToAttribute(aggregateFunction, isDistinct) - case expression => - // Since we're using `namedGroupingAttributes` to extract the grouping key - // columns, we need to replace grouping key expressions with their corresponding - // attributes. We do not rely on the equality check at here since attributes may - // differ cosmetically. Instead, we use semanticEquals. - groupExpressionMap.collectFirst { - case (expr, ne) if expr semanticEquals expression => ne.toAttribute - }.getOrElse(expression) - }.asInstanceOf[NamedExpression] + val namedGroupingExpressions = groupingExpressions.map { + case ne: NamedExpression => ne -> ne + // If the expression is not a NamedExpressions, we add an alias. + // So, when we generate the result of the operator, the Aggregate Operator + // can directly get the Seq of attributes representing the grouping expressions. + case other => + val withAlias = Alias(other, other.toString)() + other -> withAlias + } + val groupExpressionMap = namedGroupingExpressions.toMap + + // The original `resultExpressions` are a set of expressions which may reference + // aggregate expressions, grouping column values, and constants. When aggregate operator + // emits output rows, we will use `resultExpressions` to generate an output projection + // which takes the grouping columns and final aggregate result buffer as input. + // Thus, we must re-write the result expressions so that their attributes match up with + // the attributes of the final result projection's input row: + val rewrittenResultExpressions = resultExpressions.map { expr => + expr.transformDown { + case AggregateExpression(aggregateFunction, _, isDistinct) => + // The final aggregation buffer's attributes will be `finalAggregationAttributes`, + // so replace each aggregate expression by its corresponding attribute in the set: + aggregateFunctionToAttribute(aggregateFunction, isDistinct) + case expression => + // Since we're using `namedGroupingAttributes` to extract the grouping key + // columns, we need to replace grouping key expressions with their corresponding + // attributes. We do not rely on the equality check at here since attributes may + // differ cosmetically. Instead, we use semanticEquals. + groupExpressionMap.collectFirst { + case (expr, ne) if expr semanticEquals expression => ne.toAttribute + }.getOrElse(expression) + }.asInstanceOf[NamedExpression] + } + + val aggregateOperator = + if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) { + if (functionsWithDistinct.nonEmpty) { + sys.error("Distinct columns cannot exist in Aggregate operator containing " + + "aggregate functions which don't support partial aggregation.") + } else { + aggregate.Utils.planAggregateWithoutPartial( + namedGroupingExpressions.map(_._2), + aggregateExpressions, + aggregateFunctionToAttribute, + rewrittenResultExpressions, + planLater(child)) } + } else if (functionsWithDistinct.isEmpty) { + aggregate.Utils.planAggregateWithoutDistinct( + namedGroupingExpressions.map(_._2), + aggregateExpressions, + aggregateFunctionToAttribute, + rewrittenResultExpressions, + planLater(child)) + } else { + aggregate.Utils.planAggregateWithOneDistinct( + namedGroupingExpressions.map(_._2), + functionsWithDistinct, + functionsWithoutDistinct, + aggregateFunctionToAttribute, + rewrittenResultExpressions, + planLater(child)) + } - val aggregateOperator = - if (aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) { - if (functionsWithDistinct.nonEmpty) { - sys.error("Distinct columns cannot exist in Aggregate operator containing " + - "aggregate functions which don't support partial aggregation.") - } else { - aggregate.Utils.planAggregateWithoutPartial( - namedGroupingExpressions.map(_._2), - aggregateExpressions, - aggregateFunctionToAttribute, - rewrittenResultExpressions, - planLater(child)) - } - } else if (functionsWithDistinct.isEmpty) { - aggregate.Utils.planAggregateWithoutDistinct( - namedGroupingExpressions.map(_._2), - aggregateExpressions, - aggregateFunctionToAttribute, - rewrittenResultExpressions, - planLater(child)) - } else { - aggregate.Utils.planAggregateWithOneDistinct( - namedGroupingExpressions.map(_._2), - functionsWithDistinct, - functionsWithoutDistinct, - aggregateFunctionToAttribute, - rewrittenResultExpressions, - planLater(child)) - } - - aggregateOperator - } + aggregateOperator case _ => Nil } @@ -422,18 +378,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Filter(condition, planLater(child)) :: Nil case e @ logical.Expand(_, _, child) => execution.Expand(e.projections, e.output, planLater(child)) :: Nil - case a @ logical.Aggregate(group, agg, child) => { - val useNewAggregation = sqlContext.conf.useSqlAggregate2 && sqlContext.conf.codegenEnabled - if (useNewAggregation && a.newAggregation.isDefined) { - // If this logical.Aggregate can be planned to use new aggregation code path - // (i.e. it can be planned by the Strategy Aggregation), we will not use the old - // aggregation code path. - Nil - } else { - Utils.checkInvalidAggregateFunction2(a) - execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil - } - } case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => execution.Window( projectList, windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 99fb7a4..008478a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -35,9 +35,9 @@ import scala.collection.mutable.ArrayBuffer abstract class AggregationIterator( groupingKeyAttributes: Seq[Attribute], valueAttributes: Seq[Attribute], - nonCompleteAggregateExpressions: Seq[AggregateExpression2], + nonCompleteAggregateExpressions: Seq[AggregateExpression], nonCompleteAggregateAttributes: Seq[Attribute], - completeAggregateExpressions: Seq[AggregateExpression2], + completeAggregateExpressions: Seq[AggregateExpression], completeAggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], @@ -76,14 +76,14 @@ abstract class AggregationIterator( // Initialize all AggregateFunctions by binding references if necessary, // and set inputBufferOffset and mutableBufferOffset. - protected val allAggregateFunctions: Array[AggregateFunction2] = { + protected val allAggregateFunctions: Array[AggregateFunction] = { var mutableBufferOffset = 0 var inputBufferOffset: Int = initialInputBufferOffset - val functions = new Array[AggregateFunction2](allAggregateExpressions.length) + val functions = new Array[AggregateFunction](allAggregateExpressions.length) var i = 0 while (i < allAggregateExpressions.length) { val func = allAggregateExpressions(i).aggregateFunction - val funcWithBoundReferences: AggregateFunction2 = allAggregateExpressions(i).mode match { + val funcWithBoundReferences: AggregateFunction = allAggregateExpressions(i).mode match { case Partial | Complete if func.isInstanceOf[ImperativeAggregate] => // We need to create BoundReferences if the function is not an // expression-based aggregate function (it does not support code-gen) and the mode of @@ -135,7 +135,7 @@ abstract class AggregationIterator( } // All AggregateFunctions functions with mode Partial, PartialMerge, or Final. - private[this] val nonCompleteAggregateFunctions: Array[AggregateFunction2] = + private[this] val nonCompleteAggregateFunctions: Array[AggregateFunction] = allAggregateFunctions.take(nonCompleteAggregateExpressions.length) // All imperative aggregate functions with mode Partial, PartialMerge, or Final. @@ -172,7 +172,7 @@ abstract class AggregationIterator( case (Some(Partial), None) => val updateExpressions = nonCompleteAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.updateExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val expressionAggUpdateProjection = newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)() @@ -204,7 +204,7 @@ abstract class AggregationIterator( // allAggregateFunctions.flatMap(_.cloneBufferAttributes) val mergeExpressions = nonCompleteAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.mergeExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } // This projection is used to merge buffer values for all expression-based aggregates. val expressionAggMergeProjection = @@ -225,7 +225,7 @@ abstract class AggregationIterator( // Final-Complete case (Some(Final), Some(Complete)) => - val completeAggregateFunctions: Array[AggregateFunction2] = + val completeAggregateFunctions: Array[AggregateFunction] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) // All imperative aggregate functions with mode Complete. val completeImperativeAggregateFunctions: Array[ImperativeAggregate] = @@ -248,7 +248,7 @@ abstract class AggregationIterator( val mergeExpressions = nonCompleteAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.mergeExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } ++ completeOffsetExpressions val finalExpressionAggMergeProjection = newMutableProjection(mergeExpressions, mergeInputSchema)() @@ -256,7 +256,7 @@ abstract class AggregationIterator( val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.updateExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)() @@ -282,7 +282,7 @@ abstract class AggregationIterator( // Complete-only case (None, Some(Complete)) => - val completeAggregateFunctions: Array[AggregateFunction2] = + val completeAggregateFunctions: Array[AggregateFunction] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) // All imperative aggregate functions with mode Complete. val completeImperativeAggregateFunctions: Array[ImperativeAggregate] = @@ -291,7 +291,7 @@ abstract class AggregationIterator( val updateExpressions = completeAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.updateExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = newMutableProjection(updateExpressions, aggregationBufferSchema ++ valueAttributes)() @@ -353,7 +353,7 @@ abstract class AggregationIterator( allAggregateFunctions.flatMap(_.aggBufferAttributes) val evalExpressions = allAggregateFunctions.map { case ae: DeclarativeAggregate => ae.evaluateExpression - case agg: AggregateFunction2 => NoOp + case agg: AggregateFunction => NoOp } val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferSchemata)() val aggregateResultSchema = nonCompleteAggregateAttributes ++ completeAggregateAttributes http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala index 4d37106..fb7f30c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics case class SortBasedAggregate( requiredChildDistributionExpressions: Option[Seq[Expression]], groupingExpressions: Seq[NamedExpression], - nonCompleteAggregateExpressions: Seq[AggregateExpression2], + nonCompleteAggregateExpressions: Seq[AggregateExpression], nonCompleteAggregateAttributes: Seq[Attribute], - completeAggregateExpressions: Seq[AggregateExpression2], + completeAggregateExpressions: Seq[AggregateExpression], completeAggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index 64c6730..fe5c319 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, AggregateFunction2} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction} import org.apache.spark.sql.execution.metric.LongSQLMetric /** - * An iterator used to evaluate [[AggregateFunction2]]. It assumes the input rows have been + * An iterator used to evaluate [[AggregateFunction]]. It assumes the input rows have been * sorted by values of [[groupingKeyAttributes]]. */ class SortBasedAggregationIterator( @@ -31,9 +31,9 @@ class SortBasedAggregationIterator( groupingKeyAttributes: Seq[Attribute], valueAttributes: Seq[Attribute], inputIterator: Iterator[InternalRow], - nonCompleteAggregateExpressions: Seq[AggregateExpression2], + nonCompleteAggregateExpressions: Seq[AggregateExpression], nonCompleteAggregateAttributes: Seq[Attribute], - completeAggregateExpressions: Seq[AggregateExpression2], + completeAggregateExpressions: Seq[AggregateExpression], completeAggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 1561691..1edde1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2 +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, UnsafeFixedWidthAggregationMap} @@ -30,9 +30,9 @@ import org.apache.spark.sql.types.StructType case class TungstenAggregate( requiredChildDistributionExpressions: Option[Seq[Expression]], groupingExpressions: Seq[NamedExpression], - nonCompleteAggregateExpressions: Seq[AggregateExpression2], + nonCompleteAggregateExpressions: Seq[AggregateExpression], nonCompleteAggregateAttributes: Seq[Attribute], - completeAggregateExpressions: Seq[AggregateExpression2], + completeAggregateExpressions: Seq[AggregateExpression], completeAggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index ce8d592..0439144 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -64,12 +64,12 @@ import org.apache.spark.sql.types.StructType * @param groupingExpressions * expressions for grouping keys * @param nonCompleteAggregateExpressions - * [[AggregateExpression2]] containing [[AggregateFunction2]]s with mode [[Partial]], - * [[PartialMerge]], or [[Final]]. + * [[AggregateExpression]] containing [[AggregateFunction]]s with mode [[Partial]], + * [[PartialMerge]], or [[Final]]. * @param nonCompleteAggregateAttributes the attributes of the nonCompleteAggregateExpressions' * outputs when they are stored in the final aggregation buffer. * @param completeAggregateExpressions - * [[AggregateExpression2]] containing [[AggregateFunction2]]s with mode [[Complete]]. + * [[AggregateExpression]] containing [[AggregateFunction]]s with mode [[Complete]]. * @param completeAggregateAttributes the attributes of completeAggregateExpressions' outputs * when they are stored in the final aggregation buffer. * @param resultExpressions @@ -83,9 +83,9 @@ import org.apache.spark.sql.types.StructType */ class TungstenAggregationIterator( groupingExpressions: Seq[NamedExpression], - nonCompleteAggregateExpressions: Seq[AggregateExpression2], + nonCompleteAggregateExpressions: Seq[AggregateExpression], nonCompleteAggregateAttributes: Seq[Attribute], - completeAggregateExpressions: Seq[AggregateExpression2], + completeAggregateExpressions: Seq[AggregateExpression], completeAggregateAttributes: Seq[Attribute], initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], @@ -106,7 +106,7 @@ class TungstenAggregationIterator( // A Seq containing all AggregateExpressions. // It is important that all AggregateExpressions with the mode Partial, PartialMerge or Final // are at the beginning of the allAggregateExpressions. - private[this] val allAggregateExpressions: Seq[AggregateExpression2] = + private[this] val allAggregateExpressions: Seq[AggregateExpression] = nonCompleteAggregateExpressions ++ completeAggregateExpressions // Check to make sure we do not have more than three modes in our AggregateExpressions. @@ -150,10 +150,10 @@ class TungstenAggregationIterator( // Initialize all AggregateFunctions by binding references, if necessary, // and setting inputBufferOffset and mutableBufferOffset. private def initializeAllAggregateFunctions( - startingInputBufferOffset: Int): Array[AggregateFunction2] = { + startingInputBufferOffset: Int): Array[AggregateFunction] = { var mutableBufferOffset = 0 var inputBufferOffset: Int = startingInputBufferOffset - val functions = new Array[AggregateFunction2](allAggregateExpressions.length) + val functions = new Array[AggregateFunction](allAggregateExpressions.length) var i = 0 while (i < allAggregateExpressions.length) { val func = allAggregateExpressions(i).aggregateFunction @@ -195,7 +195,7 @@ class TungstenAggregationIterator( functions } - private[this] var allAggregateFunctions: Array[AggregateFunction2] = + private[this] var allAggregateFunctions: Array[AggregateFunction] = initializeAllAggregateFunctions(initialInputBufferOffset) // Positions of those imperative aggregate functions in allAggregateFunctions. @@ -263,7 +263,7 @@ class TungstenAggregationIterator( case (Some(Partial), None) => val updateExpressions = allAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.updateExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val imperativeAggregateFunctions: Array[ImperativeAggregate] = allAggregateFunctions.collect { case func: ImperativeAggregate => func} @@ -286,7 +286,7 @@ class TungstenAggregationIterator( case (Some(PartialMerge), None) | (Some(Final), None) => val mergeExpressions = allAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.mergeExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val imperativeAggregateFunctions: Array[ImperativeAggregate] = allAggregateFunctions.collect { case func: ImperativeAggregate => func} @@ -307,11 +307,11 @@ class TungstenAggregationIterator( // Final-Complete case (Some(Final), Some(Complete)) => - val completeAggregateFunctions: Array[AggregateFunction2] = + val completeAggregateFunctions: Array[AggregateFunction] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) val completeImperativeAggregateFunctions: Array[ImperativeAggregate] = completeAggregateFunctions.collect { case func: ImperativeAggregate => func } - val nonCompleteAggregateFunctions: Array[AggregateFunction2] = + val nonCompleteAggregateFunctions: Array[AggregateFunction] = allAggregateFunctions.take(nonCompleteAggregateExpressions.length) val nonCompleteImperativeAggregateFunctions: Array[ImperativeAggregate] = nonCompleteAggregateFunctions.collect { case func: ImperativeAggregate => func } @@ -321,7 +321,7 @@ class TungstenAggregationIterator( val mergeExpressions = nonCompleteAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.mergeExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } ++ completeOffsetExpressions val finalMergeProjection = newMutableProjection(mergeExpressions, aggregationBufferAttributes ++ inputAttributes)() @@ -331,7 +331,7 @@ class TungstenAggregationIterator( Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp) val updateExpressions = finalOffsetExpressions ++ completeAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.updateExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeUpdateProjection = newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)() @@ -358,7 +358,7 @@ class TungstenAggregationIterator( // Complete-only case (None, Some(Complete)) => - val completeAggregateFunctions: Array[AggregateFunction2] = + val completeAggregateFunctions: Array[AggregateFunction] = allAggregateFunctions.takeRight(completeAggregateExpressions.length) // All imperative aggregate functions with mode Complete. val completeImperativeAggregateFunctions: Array[ImperativeAggregate] = @@ -366,7 +366,7 @@ class TungstenAggregationIterator( val updateExpressions = completeAggregateFunctions.flatMap { case ae: DeclarativeAggregate => ae.updateExpressions - case agg: AggregateFunction2 => Seq.fill(agg.aggBufferAttributes.length)(NoOp) + case agg: AggregateFunction => Seq.fill(agg.aggBufferAttributes.length)(NoOp) } val completeExpressionAggUpdateProjection = newMutableProjection(updateExpressions, aggregationBufferAttributes ++ inputAttributes)() @@ -414,7 +414,7 @@ class TungstenAggregationIterator( val joinedRow = new JoinedRow() val evalExpressions = allAggregateFunctions.map { case ae: DeclarativeAggregate => ae.evaluateExpression - case agg: AggregateFunction2 => NoOp + case agg: AggregateFunction => NoOp } val expressionAggEvalProjection = newMutableProjection(evalExpressions, bufferAttributes)() // These are the attributes of the row produced by `expressionAggEvalProjection` http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index d2f56e0..20359c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.catalyst.expressions.{MutableRow, InterpretedMutableProjection, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, AggregateFunction2} +import org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, AggregateFunction} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala index eaafd83..79abf2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala @@ -28,8 +28,8 @@ object Utils { def planAggregateWithoutPartial( groupingExpressions: Seq[NamedExpression], - aggregateExpressions: Seq[AggregateExpression2], - aggregateFunctionToAttribute: Map[(AggregateFunction2, Boolean), Attribute], + aggregateExpressions: Seq[AggregateExpression], + aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), Attribute], resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] = { @@ -54,8 +54,8 @@ object Utils { def planAggregateWithoutDistinct( groupingExpressions: Seq[NamedExpression], - aggregateExpressions: Seq[AggregateExpression2], - aggregateFunctionToAttribute: Map[(AggregateFunction2, Boolean), Attribute], + aggregateExpressions: Seq[AggregateExpression], + aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), Attribute], resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] = { // Check if we can use TungstenAggregate. @@ -137,9 +137,9 @@ object Utils { def planAggregateWithOneDistinct( groupingExpressions: Seq[NamedExpression], - functionsWithDistinct: Seq[AggregateExpression2], - functionsWithoutDistinct: Seq[AggregateExpression2], - aggregateFunctionToAttribute: Map[(AggregateFunction2, Boolean), Attribute], + functionsWithDistinct: Seq[AggregateExpression], + functionsWithoutDistinct: Seq[AggregateExpression], + aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), Attribute], resultExpressions: Seq[NamedExpression], child: SparkPlan): Seq[SparkPlan] = { @@ -253,16 +253,16 @@ object Utils { // Children of an AggregateFunction with DISTINCT keyword has already // been evaluated. At here, we need to replace original children // to AttributeReferences. - case agg @ AggregateExpression2(aggregateFunction, mode, true) => + case agg @ AggregateExpression(aggregateFunction, mode, true) => val rewrittenAggregateFunction = aggregateFunction.transformDown { case expr if expr == distinctColumnExpression => distinctColumnAttribute - }.asInstanceOf[AggregateFunction2] + }.asInstanceOf[AggregateFunction] // We rewrite the aggregate function to a non-distinct aggregation because // its input will have distinct arguments. // We just keep the isDistinct setting to true, so when users look at the query plan, // they still can see distinct aggregations. val rewrittenAggregateExpression = - AggregateExpression2(rewrittenAggregateFunction, Complete, isDistinct = true) + AggregateExpression(rewrittenAggregateFunction, Complete, isDistinct = true) val aggregateFunctionAttribute = aggregateFunctionToAttribute(agg.aggregateFunction, true) (rewrittenAggregateExpression, aggregateFunctionAttribute) http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index 0b3192a..8cc25c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.expressions import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder} -import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2} +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete} import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn} @@ -70,7 +70,7 @@ abstract class Aggregator[-A, B, C] { implicit bEncoder: Encoder[B], cEncoder: Encoder[C]): TypedColumn[A, C] = { val expr = - new AggregateExpression2( + new AggregateExpression( TypedAggregateExpression(this), Complete, false) @@ -78,4 +78,3 @@ abstract class Aggregator[-A, B, C] { new TypedColumn[A, C](expr, encoderFor[C]) } } - http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala index 8b9247a..fc873c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.expressions import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.{Column, catalyst} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ /** @@ -141,40 +141,56 @@ class WindowSpec private[sql]( */ private[sql] def withAggregate(aggregate: Column): Column = { val windowExpr = aggregate.expr match { - case Average(child) => WindowExpression( - UnresolvedWindowFunction("avg", child :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Sum(child) => WindowExpression( - UnresolvedWindowFunction("sum", child :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Count(child) => WindowExpression( - UnresolvedWindowFunction("count", child :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case First(child, ignoreNulls) => WindowExpression( - // TODO this is a hack for Hive UDAF first_value - UnresolvedWindowFunction( - "first_value", - child :: ignoreNulls :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Last(child, ignoreNulls) => WindowExpression( - // TODO this is a hack for Hive UDAF last_value - UnresolvedWindowFunction( - "last_value", - child :: ignoreNulls :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Min(child) => WindowExpression( - UnresolvedWindowFunction("min", child :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case Max(child) => WindowExpression( - UnresolvedWindowFunction("max", child :: Nil), - WindowSpecDefinition(partitionSpec, orderSpec, frame)) - case wf: WindowFunction => WindowExpression( - wf, - WindowSpecDefinition(partitionSpec, orderSpec, frame)) + // First, we check if we get an aggregate function without the DISTINCT keyword. + // Right now, we do not support using a DISTINCT aggregate function as a + // window function. + case AggregateExpression(aggregateFunction, _, isDistinct) if !isDistinct => + aggregateFunction match { + case Average(child) => WindowExpression( + UnresolvedWindowFunction("avg", child :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case Sum(child) => WindowExpression( + UnresolvedWindowFunction("sum", child :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case Count(child) => WindowExpression( + UnresolvedWindowFunction("count", child :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case First(child, ignoreNulls) => WindowExpression( + // TODO this is a hack for Hive UDAF first_value + UnresolvedWindowFunction( + "first_value", + child :: ignoreNulls :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case Last(child, ignoreNulls) => WindowExpression( + // TODO this is a hack for Hive UDAF last_value + UnresolvedWindowFunction( + "last_value", + child :: ignoreNulls :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case Min(child) => WindowExpression( + UnresolvedWindowFunction("min", child :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case Max(child) => WindowExpression( + UnresolvedWindowFunction("max", child :: Nil), + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case x => + throw new UnsupportedOperationException(s"$x is not supported in a window operation.") + } + + case AggregateExpression(aggregateFunction, _, isDistinct) if isDistinct => + throw new UnsupportedOperationException( + s"Distinct aggregate function ${aggregateFunction} is not supported " + + s"in window operation.") + + case wf: WindowFunction => + WindowExpression( + wf, + WindowSpecDefinition(partitionSpec, orderSpec, frame)) + case x => - throw new UnsupportedOperationException(s"$x is not supported in window operation.") + throw new UnsupportedOperationException(s"$x is not supported in a window operation.") } + new Column(windowExpr) } - } http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala index 258afad..11dbf39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.expressions -import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2} +import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression} import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.types._ @@ -109,7 +109,7 @@ abstract class UserDefinedAggregateFunction extends Serializable { @scala.annotation.varargs def apply(exprs: Column*): Column = { val aggregateExpression = - AggregateExpression2( + AggregateExpression( ScalaUDAF(exprs.map(_.expr), this), Complete, isDistinct = false) @@ -123,7 +123,7 @@ abstract class UserDefinedAggregateFunction extends Serializable { @scala.annotation.varargs def distinct(exprs: Column*): Column = { val aggregateExpression = - AggregateExpression2( + AggregateExpression( ScalaUDAF(exprs.map(_.expr), this), Complete, isDistinct = true) http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 6d56542..22104e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection} import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, Star} import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, Encoder} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -76,6 +77,12 @@ object functions extends LegacyFunctions { private def withExpr(expr: Expression): Column = Column(expr) + private def withAggregateFunction( + func: AggregateFunction, + isDistinct: Boolean = false): Column = { + Column(func.toAggregateExpression(isDistinct)) + } + private implicit def newLongEncoder: Encoder[Long] = ExpressionEncoder[Long](flat = true) @@ -154,7 +161,9 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def approxCountDistinct(e: Column): Column = withExpr { ApproxCountDistinct(e.expr) } + def approxCountDistinct(e: Column): Column = withAggregateFunction { + HyperLogLogPlusPlus(e.expr) + } /** * Aggregate function: returns the approximate number of distinct items in a group. @@ -170,8 +179,8 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def approxCountDistinct(e: Column, rsd: Double): Column = withExpr { - ApproxCountDistinct(e.expr, rsd) + def approxCountDistinct(e: Column, rsd: Double): Column = withAggregateFunction { + HyperLogLogPlusPlus(e.expr, rsd, 0, 0) } /** @@ -190,7 +199,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def avg(e: Column): Column = withExpr { Average(e.expr) } + def avg(e: Column): Column = withAggregateFunction { Average(e.expr) } /** * Aggregate function: returns the average of the values in a group. @@ -226,7 +235,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def corr(column1: Column, column2: Column): Column = withExpr { + def corr(column1: Column, column2: Column): Column = withAggregateFunction { Corr(column1.expr, column2.expr) } @@ -246,7 +255,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def count(e: Column): Column = withExpr { + def count(e: Column): Column = withAggregateFunction { e.expr match { // Turn count(*) into count(1) case s: Star => Count(Literal(1)) @@ -269,8 +278,8 @@ object functions extends LegacyFunctions { * @since 1.3.0 */ @scala.annotation.varargs - def countDistinct(expr: Column, exprs: Column*): Column = withExpr { - CountDistinct((expr +: exprs).map(_.expr)) + def countDistinct(expr: Column, exprs: Column*): Column = { + withAggregateFunction(Count.apply((expr +: exprs).map(_.expr)), isDistinct = true) } /** @@ -289,7 +298,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def first(e: Column): Column = withExpr { First(e.expr) } + def first(e: Column): Column = withAggregateFunction { new First(e.expr) } /** * Aggregate function: returns the first value of a column in a group. @@ -305,7 +314,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def kurtosis(e: Column): Column = withExpr { Kurtosis(e.expr) } + def kurtosis(e: Column): Column = withAggregateFunction { Kurtosis(e.expr) } /** * Aggregate function: returns the last value in a group. @@ -313,7 +322,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def last(e: Column): Column = withExpr { Last(e.expr) } + def last(e: Column): Column = withAggregateFunction { new Last(e.expr) } /** * Aggregate function: returns the last value of the column in a group. @@ -329,7 +338,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def max(e: Column): Column = withExpr { Max(e.expr) } + def max(e: Column): Column = withAggregateFunction { Max(e.expr) } /** * Aggregate function: returns the maximum value of the column in a group. @@ -363,7 +372,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def min(e: Column): Column = withExpr { Min(e.expr) } + def min(e: Column): Column = withAggregateFunction { Min(e.expr) } /** * Aggregate function: returns the minimum value of the column in a group. @@ -379,7 +388,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def skewness(e: Column): Column = withExpr { Skewness(e.expr) } + def skewness(e: Column): Column = withAggregateFunction { Skewness(e.expr) } /** * Aggregate function: alias for [[stddev_samp]]. @@ -387,7 +396,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def stddev(e: Column): Column = withExpr { StddevSamp(e.expr) } + def stddev(e: Column): Column = withAggregateFunction { StddevSamp(e.expr) } /** * Aggregate function: returns the unbiased sample standard deviation of @@ -396,7 +405,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def stddev_samp(e: Column): Column = withExpr { StddevSamp(e.expr) } + def stddev_samp(e: Column): Column = withAggregateFunction { StddevSamp(e.expr) } /** * Aggregate function: returns the population standard deviation of @@ -405,7 +414,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def stddev_pop(e: Column): Column = withExpr { StddevPop(e.expr) } + def stddev_pop(e: Column): Column = withAggregateFunction { StddevPop(e.expr) } /** * Aggregate function: returns the sum of all values in the expression. @@ -413,7 +422,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def sum(e: Column): Column = withExpr { Sum(e.expr) } + def sum(e: Column): Column = withAggregateFunction { Sum(e.expr) } /** * Aggregate function: returns the sum of all values in the given column. @@ -429,7 +438,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.3.0 */ - def sumDistinct(e: Column): Column = withExpr { SumDistinct(e.expr) } + def sumDistinct(e: Column): Column = withAggregateFunction(Sum(e.expr), isDistinct = true) /** * Aggregate function: returns the sum of distinct values in the expression. @@ -445,7 +454,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def variance(e: Column): Column = withExpr { VarianceSamp(e.expr) } + def variance(e: Column): Column = withAggregateFunction { VarianceSamp(e.expr) } /** * Aggregate function: returns the unbiased variance of the values in a group. @@ -453,7 +462,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def var_samp(e: Column): Column = withExpr { VarianceSamp(e.expr) } + def var_samp(e: Column): Column = withAggregateFunction { VarianceSamp(e.expr) } /** * Aggregate function: returns the population variance of the values in a group. @@ -461,7 +470,7 @@ object functions extends LegacyFunctions { * @group agg_funcs * @since 1.6.0 */ - def var_pop(e: Column): Column = withExpr { VariancePop(e.expr) } + def var_pop(e: Column): Column = withAggregateFunction { VariancePop(e.expr) } ////////////////////////////////////////////////////////////////////////////////////////////// // Window functions http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 3de277a..441a0c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -237,34 +237,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-8828 sum should return null if all input values are null") { - withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "true") { - withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") { - checkAnswer( - sql("select sum(a), avg(a) from allNulls"), - Seq(Row(null, null)) - ) - } - withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") { - checkAnswer( - sql("select sum(a), avg(a) from allNulls"), - Seq(Row(null, null)) - ) - } - } - withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "false") { - withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") { - checkAnswer( - sql("select sum(a), avg(a) from allNulls"), - Seq(Row(null, null)) - ) - } - withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") { - checkAnswer( - sql("select sum(a), avg(a) from allNulls"), - Seq(Row(null, null)) - ) - } - } + checkAnswer( + sql("select sum(a), avg(a) from allNulls"), + Seq(Row(null, null)) + ) } private def testCodeGen(sqlText: String, expectedResults: Seq[Row]): Unit = { @@ -507,29 +483,22 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("literal in agg grouping expressions") { - def literalInAggTest(): Unit = { - checkAnswer( - sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"), - Seq(Row(1, 2), Row(2, 2), Row(3, 2))) - checkAnswer( - sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"), - Seq(Row(1, 2), Row(2, 2), Row(3, 2))) - - checkAnswer( - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"), - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) - checkAnswer( - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"), - sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) - checkAnswer( - sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"), - sql("SELECT 1, 2, sum(b) FROM testData2")) - } + checkAnswer( + sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"), + Seq(Row(1, 2), Row(2, 2), Row(3, 2))) + checkAnswer( + sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"), + Seq(Row(1, 2), Row(2, 2), Row(3, 2))) - literalInAggTest() - withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "false") { - literalInAggTest() - } + checkAnswer( + sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"), + sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) + checkAnswer( + sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"), + sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) + checkAnswer( + sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"), + sql("SELECT 1, 2, sum(b) FROM testData2")) } test("aggregates with nulls") { http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index a229e58..e31c528 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -21,16 +21,13 @@ import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData} import scala.beans.{BeanInfo, BeanProperty} -import com.clearspring.analytics.stream.cardinality.HyperLogLog - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{OpenHashSetUDT, HyperLogLogUDT} +import org.apache.spark.sql.catalyst.expressions.OpenHashSetUDT import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils import org.apache.spark.util.collection.OpenHashSet @@ -134,16 +131,6 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT df.orderBy('int).limit(1).groupBy('int).agg(first('vec)).collect()(0).getAs[MyDenseVector](0) } - test("HyperLogLogUDT") { - val hyperLogLogUDT = HyperLogLogUDT - val hyperLogLog = new HyperLogLog(0.4) - (1 to 10).foreach(i => hyperLogLog.offer(Row(i))) - - val actual = hyperLogLogUDT.deserialize(hyperLogLogUDT.serialize(hyperLogLog)) - assert(actual.cardinality() === hyperLogLog.cardinality()) - assert(java.util.Arrays.equals(actual.getBytes, hyperLogLog.getBytes)) - } - test("OpenHashSetUDT") { val openHashSetUDT = new OpenHashSetUDT(IntegerType) val set = new OpenHashSet[Int] http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2076c57..44634da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -38,7 +38,7 @@ class PlannerSuite extends SharedSQLContext { private def testPartialAggregationPlan(query: LogicalPlan): Unit = { val planner = sqlContext.planner import planner._ - val plannedOption = HashAggregation(query).headOption.orElse(Aggregation(query).headOption) + val plannedOption = Aggregation(query).headOption val planned = plannedOption.getOrElse( fail(s"Could query play aggregation query $query. Is it an aggregation query?")) http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index cdd885b..4b4f5c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -152,36 +152,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { ) } - test("Aggregate metrics") { - withSQLConf( - SQLConf.UNSAFE_ENABLED.key -> "false", - SQLConf.CODEGEN_ENABLED.key -> "false", - SQLConf.TUNGSTEN_ENABLED.key -> "false") { - // Assume the execution plan is - // ... -> Aggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) -> Aggregate(nodeId = 0) - val df = testData2.groupBy().count() // 2 partitions - testSparkPlanMetrics(df, 1, Map( - 2L -> ("Aggregate", Map( - "number of input rows" -> 6L, - "number of output rows" -> 2L)), - 0L -> ("Aggregate", Map( - "number of input rows" -> 2L, - "number of output rows" -> 1L))) - ) - - // 2 partitions and each partition contains 2 keys - val df2 = testData2.groupBy('a).count() - testSparkPlanMetrics(df2, 1, Map( - 2L -> ("Aggregate", Map( - "number of input rows" -> 6L, - "number of output rows" -> 4L)), - 0L -> ("Aggregate", Map( - "number of input rows" -> 4L, - "number of output rows" -> 3L))) - ) - } - } - test("SortBasedAggregate metrics") { // Because SortBasedAggregate may skip different rows if the number of partitions is different, // this test should use the deterministic number of partitions. http://git-wip-us.apache.org/repos/asf/spark/blob/7c4ade0d/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c5f6965..ba62046 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -584,7 +584,6 @@ class HiveContext private[hive]( HiveTableScans, DataSinks, Scripts, - HashAggregation, Aggregation, LeftSemiJoin, EquiJoinSelection, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org