http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index ed810a1..0290faf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -68,7 +68,7 @@ class FilterPushdownSuite extends PlanTest {
   test("column pruning for group") {
     val originalQuery =
       testRelation
-        .groupBy('a)('a, Count('b))
+        .groupBy('a)('a, count('b))
         .select('a)
 
     val optimized = Optimize.execute(originalQuery.analyze)
@@ -84,7 +84,7 @@ class FilterPushdownSuite extends PlanTest {
   test("column pruning for group with alias") {
     val originalQuery =
       testRelation
-        .groupBy('a)('a as 'c, Count('b))
+        .groupBy('a)('a as 'c, count('b))
         .select('c)
 
     val optimized = Optimize.execute(originalQuery.analyze)
@@ -656,7 +656,7 @@ class FilterPushdownSuite extends PlanTest {
 
   test("aggregate: push down filter when filter on group by expression") {
     val originalQuery = testRelation
-                        .groupBy('a)('a, Count('b) as 'c)
+                        .groupBy('a)('a, count('b) as 'c)
                         .select('a, 'c)
                         .where('a === 2)
 
@@ -664,7 +664,7 @@ class FilterPushdownSuite extends PlanTest {
 
     val correctAnswer = testRelation
                         .where('a === 2)
-                        .groupBy('a)('a, Count('b) as 'c)
+                        .groupBy('a)('a, count('b) as 'c)
                         .analyze
     comparePlans(optimized, correctAnswer)
   }
@@ -672,7 +672,7 @@ class FilterPushdownSuite extends PlanTest {
   test("aggregate: don't push down filter when filter not on group by 
expression") {
     val originalQuery = testRelation
                         .select('a, 'b)
-                        .groupBy('a)('a, Count('b) as 'c)
+                        .groupBy('a)('a, count('b) as 'c)
                         .where('c === 2L)
 
     val optimized = Optimize.execute(originalQuery.analyze)
@@ -683,7 +683,7 @@ class FilterPushdownSuite extends PlanTest {
   test("aggregate: push down filters partially which are subset of group by 
expressions") {
     val originalQuery = testRelation
                         .select('a, 'b)
-                        .groupBy('a)('a, Count('b) as 'c)
+                        .groupBy('a)('a, count('b) as 'c)
                         .where('c === 2L && 'a === 3)
 
     val optimized = Optimize.execute(originalQuery.analyze)
@@ -691,7 +691,7 @@ class FilterPushdownSuite extends PlanTest {
     val correctAnswer = testRelation
                         .select('a, 'b)
                         .where('a === 3)
-                        .groupBy('a)('a, Count('b) as 'c)
+                        .groupBy('a)('a, count('b) as 'c)
                         .where('c === 2L)
                         .analyze
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index d25807c..3b69247 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.encoders.Encoder
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
@@ -1338,7 +1339,7 @@ class DataFrame private[sql](
       if (groupColExprIds.contains(attr.exprId)) {
         attr
       } else {
-        Alias(First(attr), attr.name)()
+        Alias(new First(attr).toAggregateExpression(), attr.name)()
       }
     }
     Aggregate(groupCols, aggCols, logicalPlan)
@@ -1381,11 +1382,11 @@ class DataFrame private[sql](
 
     // The list of summary statistics to compute, in the form of expressions.
     val statistics = List[(String, Expression => Expression)](
-      "count" -> Count,
-      "mean" -> Average,
-      "stddev" -> StddevSamp,
-      "min" -> Min,
-      "max" -> Max)
+      "count" -> ((child: Expression) => Count(child).toAggregateExpression()),
+      "mean" -> ((child: Expression) => 
Average(child).toAggregateExpression()),
+      "stddev" -> ((child: Expression) => 
StddevSamp(child).toAggregateExpression()),
+      "min" -> ((child: Expression) => Min(child).toAggregateExpression()),
+      "max" -> ((child: Expression) => Max(child).toAggregateExpression()))
 
     val outputCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) 
else cols).toList
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
index f9eab5c..5babf2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -21,8 +21,9 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, 
UnresolvedAttribute, Star}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, 
UnresolvedAlias, UnresolvedAttribute, Star}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{Rollup, Cube, Aggregate}
 import org.apache.spark.sql.types.NumericType
 
@@ -70,7 +71,7 @@ class GroupedData protected[sql](
     }
   }
 
-  private[this] def aggregateNumericColumns(colNames: String*)(f: Expression 
=> Expression)
+  private[this] def aggregateNumericColumns(colNames: String*)(f: Expression 
=> AggregateFunction)
     : DataFrame = {
 
     val columnExprs = if (colNames.isEmpty) {
@@ -88,30 +89,28 @@ class GroupedData protected[sql](
         namedExpr
       }
     }
-    toDF(columnExprs.map(f))
+    toDF(columnExprs.map(expr => f(expr).toAggregateExpression()))
   }
 
   private[this] def strToExpr(expr: String): (Expression => Expression) = {
-    expr.toLowerCase match {
-      case "avg" | "average" | "mean" => Average
-      case "max" => Max
-      case "min" => Min
-      case "stddev" | "std" => StddevSamp
-      case "stddev_pop" => StddevPop
-      case "stddev_samp" => StddevSamp
-      case "variance" => VarianceSamp
-      case "var_pop" => VariancePop
-      case "var_samp" => VarianceSamp
-      case "sum" => Sum
-      case "skewness" => Skewness
-      case "kurtosis" => Kurtosis
-      case "count" | "size" =>
-        // Turn count(*) into count(1)
-        (inputExpr: Expression) => inputExpr match {
-          case s: Star => Count(Literal(1))
-          case _ => Count(inputExpr)
-        }
+    val exprToFunc: (Expression => Expression) = {
+      (inputExpr: Expression) => expr.toLowerCase match {
+        // We special handle a few cases that have alias that are not in 
function registry.
+        case "avg" | "average" | "mean" =>
+          UnresolvedFunction("avg", inputExpr :: Nil, isDistinct = false)
+        case "stddev" | "std" =>
+          UnresolvedFunction("stddev", inputExpr :: Nil, isDistinct = false)
+        // Also special handle count because we need to take care count(*).
+        case "count" | "size" =>
+          // Turn count(*) into count(1)
+          inputExpr match {
+            case s: Star => Count(Literal(1)).toAggregateExpression()
+            case _ => Count(inputExpr).toAggregateExpression()
+          }
+        case name => UnresolvedFunction(name, inputExpr :: Nil, isDistinct = 
false)
+      }
     }
+    (inputExpr: Expression) => exprToFunc(inputExpr)
   }
 
   /**
@@ -213,7 +212,7 @@ class GroupedData protected[sql](
    *
    * @since 1.3.0
    */
-  def count(): DataFrame = toDF(Seq(Alias(Count(Literal(1)), "count")()))
+  def count(): DataFrame = 
toDF(Seq(Alias(Count(Literal(1)).toAggregateExpression(), "count")()))
 
   /**
    * Compute the average value for each numeric columns for each group. This 
is an alias for `avg`.

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index ed8b634..b731418 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -448,15 +448,24 @@ private[spark] object SQLConf {
     defaultValue = Some(true),
     isPublic = false)
 
-  val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2",
-    defaultValue = Some(true), doc = "<TODO>")
-
   val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
     defaultValue = Some(true),
     isPublic = false,
     doc = "When true, we could use `datasource`.`path` as table in SQL query"
   )
 
+  val SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING =
+    booleanConf("spark.sql.specializeSingleDistinctAggPlanning",
+      defaultValue = Some(true),
+      isPublic = false,
+      doc = "When true, if a query only has a single distinct column and it 
has " +
+        "grouping expressions, we will use our planner rule to handle this 
distinct " +
+        "column (other cases are handled by DistinctAggregationRewriter). " +
+        "When false, we will always use DistinctAggregationRewriter to plan " +
+        "aggregation queries with DISTINCT keyword. This is an internal flag 
that is " +
+        "used to benchmark the performance impact of using 
DistinctAggregationRewriter to " +
+        "plan aggregation queries with a single distinct column.")
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
     val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -532,8 +541,6 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
 
   private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, 
getConf(TUNGSTEN_ENABLED))
 
-  private[spark] def useSqlAggregate2: Boolean = getConf(USE_SQL_AGGREGATE2)
-
   private[spark] def autoBroadcastJoinThreshold: Int = 
getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
   private[spark] def defaultSizeInBytes: Long =
@@ -575,6 +582,9 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
 
   private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
 
+  protected[spark] override def specializeSingleDistinctAggPlanning: Boolean =
+    getConf(SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
deleted file mode 100644
index 6f3f1bd..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.util.HashMap
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.metric.SQLMetrics
-
-/**
- * Groups input data by `groupingExpressions` and computes the 
`aggregateExpressions` for each
- * group.
- *
- * @param partial if true then aggregation is done partially on local data 
without shuffling to
- *                ensure all values where `groupingExpressions` are equal are 
present.
- * @param groupingExpressions expressions that are evaluated to determine 
grouping.
- * @param aggregateExpressions expressions that are computed for each group.
- * @param child the input data source.
- */
-case class Aggregate(
-    partial: Boolean,
-    groupingExpressions: Seq[Expression],
-    aggregateExpressions: Seq[NamedExpression],
-    child: SparkPlan)
-  extends UnaryNode {
-
-  override private[sql] lazy val metrics = Map(
-    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
input rows"),
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
-
-  override def requiredChildDistribution: List[Distribution] = {
-    if (partial) {
-      UnspecifiedDistribution :: Nil
-    } else {
-      if (groupingExpressions == Nil) {
-        AllTuples :: Nil
-      } else {
-        ClusteredDistribution(groupingExpressions) :: Nil
-      }
-    }
-  }
-
-  override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)
-
-  /**
-   * An aggregate that needs to be computed for each row in a group.
-   *
-   * @param unbound Unbound version of this aggregate, used for result 
substitution.
-   * @param aggregate A bound copy of this aggregate used to create a new 
aggregation buffer.
-   * @param resultAttribute An attribute used to refer to the result of this 
aggregate in the final
-   *                        output.
-   */
-  case class ComputedAggregate(
-      unbound: AggregateExpression1,
-      aggregate: AggregateExpression1,
-      resultAttribute: AttributeReference)
-
-  /** A list of aggregates that need to be computed for each group. */
-  private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
-    agg.collect {
-      case a: AggregateExpression1 =>
-        ComputedAggregate(
-          a,
-          BindReferences.bindReference(a, child.output),
-          AttributeReference(s"aggResult:$a", a.dataType, a.nullable)())
-    }
-  }.toArray
-
-  /** The schema of the result of all aggregate evaluations */
-  private[this] val computedSchema = computedAggregates.map(_.resultAttribute)
-
-  /** Creates a new aggregate buffer for a group. */
-  private[this] def newAggregateBuffer(): Array[AggregateFunction1] = {
-    val buffer = new Array[AggregateFunction1](computedAggregates.length)
-    var i = 0
-    while (i < computedAggregates.length) {
-      buffer(i) = computedAggregates(i).aggregate.newInstance()
-      i += 1
-    }
-    buffer
-  }
-
-  /** Named attributes used to substitute grouping attributes into the final 
result. */
-  private[this] val namedGroups = groupingExpressions.map {
-    case ne: NamedExpression => ne -> ne.toAttribute
-    case e => e -> Alias(e, s"groupingExpr:$e")().toAttribute
-  }
-
-  /**
-   * A map of substitutions that are used to insert the aggregate expressions 
and grouping
-   * expression into the final result expression.
-   */
-  private[this] val resultMap =
-    (computedAggregates.map { agg => agg.unbound -> agg.resultAttribute } ++ 
namedGroups).toMap
-
-  /**
-   * Substituted version of aggregateExpressions expressions which are used to 
compute final
-   * output rows given a group and the result of all aggregate computations.
-   */
-  private[this] val resultExpressions = aggregateExpressions.map { agg =>
-    agg.transform {
-      case e: Expression if resultMap.contains(e) => resultMap(e)
-    }
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"execute") {
-    val numInputRows = longMetric("numInputRows")
-    val numOutputRows = longMetric("numOutputRows")
-    if (groupingExpressions.isEmpty) {
-      child.execute().mapPartitions { iter =>
-        val buffer = newAggregateBuffer()
-        var currentRow: InternalRow = null
-        while (iter.hasNext) {
-          currentRow = iter.next()
-          numInputRows += 1
-          var i = 0
-          while (i < buffer.length) {
-            buffer(i).update(currentRow)
-            i += 1
-          }
-        }
-        val resultProjection = new InterpretedProjection(resultExpressions, 
computedSchema)
-        val aggregateResults = new GenericMutableRow(computedAggregates.length)
-
-        var i = 0
-        while (i < buffer.length) {
-          aggregateResults(i) = buffer(i).eval(EmptyRow)
-          i += 1
-        }
-
-        numOutputRows += 1
-        Iterator(resultProjection(aggregateResults))
-      }
-    } else {
-      child.execute().mapPartitions { iter =>
-        val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]]
-        val groupingProjection = new 
InterpretedMutableProjection(groupingExpressions, child.output)
-
-        var currentRow: InternalRow = null
-        while (iter.hasNext) {
-          currentRow = iter.next()
-          numInputRows += 1
-          val currentGroup = groupingProjection(currentRow)
-          var currentBuffer = hashTable.get(currentGroup)
-          if (currentBuffer == null) {
-            currentBuffer = newAggregateBuffer()
-            hashTable.put(currentGroup.copy(), currentBuffer)
-          }
-
-          var i = 0
-          while (i < currentBuffer.length) {
-            currentBuffer(i).update(currentRow)
-            i += 1
-          }
-        }
-
-        new Iterator[InternalRow] {
-          private[this] val hashTableIter = hashTable.entrySet().iterator()
-          private[this] val aggregateResults = new 
GenericMutableRow(computedAggregates.length)
-          private[this] val resultProjection =
-            new InterpretedMutableProjection(
-              resultExpressions, computedSchema ++ namedGroups.map(_._2))
-          private[this] val joinedRow = new JoinedRow
-
-          override final def hasNext: Boolean = hashTableIter.hasNext
-
-          override final def next(): InternalRow = {
-            val currentEntry = hashTableIter.next()
-            val currentGroup = currentEntry.getKey
-            val currentBuffer = currentEntry.getValue
-            numOutputRows += 1
-
-            var i = 0
-            while (i < currentBuffer.length) {
-              // Evaluating an aggregate buffer returns the result.  No row is 
required since we
-              // already added all rows in the group using update.
-              aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
-              i += 1
-            }
-            resultProjection(joinedRow(aggregateResults, currentGroup))
-          }
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 55e9576..91530bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -45,6 +45,9 @@ case class Expand(
   override def canProcessUnsafeRows: Boolean = true
   override def canProcessSafeRows: Boolean = true
 
+  override def references: AttributeSet =
+    AttributeSet(projections.flatten.flatMap(_.references))
+
   private[this] val projection = {
     if (outputsUnsafeRows) {
       (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 0f98fe8..a10d1ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -38,7 +38,6 @@ class SparkPlanner(val sqlContext: SQLContext) extends 
SparkStrategies {
       DataSourceStrategy ::
       DDLStrategy ::
       TakeOrderedAndProject ::
-      HashAggregation ::
       Aggregation ::
       LeftSemiJoin ::
       EquiJoinSelection ::

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index dd3bb33..d65cb1b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, 
Utils}
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
@@ -146,148 +146,104 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     }
   }
 
-  object HashAggregation extends Strategy {
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      // Aggregations that can be performed in two phases, before and after 
the shuffle.
-      case PartialAggregation(
-          namedGroupingAttributes,
-          rewrittenAggregateExpressions,
-          groupingExpressions,
-          partialComputation,
-          child) if !canBeConvertedToNewAggregation(plan) =>
-        execution.Aggregate(
-          partial = false,
-          namedGroupingAttributes,
-          rewrittenAggregateExpressions,
-          execution.Aggregate(
-            partial = true,
-            groupingExpressions,
-            partialComputation,
-            planLater(child))) :: Nil
-
-      case _ => Nil
-    }
-
-    def canBeConvertedToNewAggregation(plan: LogicalPlan): Boolean = plan 
match {
-      case a: logical.Aggregate =>
-        if (sqlContext.conf.useSqlAggregate2 && 
sqlContext.conf.codegenEnabled) {
-          a.newAggregation.isDefined
-        } else {
-          Utils.checkInvalidAggregateFunction2(a)
-          false
-        }
-      case _ => false
-    }
-
-    def allAggregates(exprs: Seq[Expression]): Seq[AggregateExpression1] =
-      exprs.flatMap(_.collect { case a: AggregateExpression1 => a })
-  }
-
   /**
    * Used to plan the aggregate operator for expressions based on the 
AggregateFunction2 interface.
    */
   object Aggregation extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case p: logical.Aggregate if sqlContext.conf.useSqlAggregate2 &&
-          sqlContext.conf.codegenEnabled =>
-        val converted = p.newAggregation
-        converted match {
-          case None => Nil // Cannot convert to new aggregation code path.
-          case Some(logical.Aggregate(groupingExpressions, resultExpressions, 
child)) =>
-            // A single aggregate expression might appear multiple times in 
resultExpressions.
-            // In order to avoid evaluating an individual aggregate function 
multiple times, we'll
-            // build a set of the distinct aggregate expressions and build a 
function which can
-            // be used to re-write expressions so that they reference the 
single copy of the
-            // aggregate function which actually gets computed.
-            val aggregateExpressions = resultExpressions.flatMap { expr =>
-              expr.collect {
-                case agg: AggregateExpression2 => agg
-              }
-            }.distinct
-            // For those distinct aggregate expressions, we create a map from 
the
-            // aggregate function to the corresponding attribute of the 
function.
-            val aggregateFunctionToAttribute = aggregateExpressions.map { agg 
=>
-              val aggregateFunction = agg.aggregateFunction
-              val attribute = Alias(aggregateFunction, 
aggregateFunction.toString)().toAttribute
-              (aggregateFunction, agg.isDistinct) -> attribute
-            }.toMap
-
-            val (functionsWithDistinct, functionsWithoutDistinct) =
-              aggregateExpressions.partition(_.isDistinct)
-            if 
(functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
-              // This is a sanity check. We should not reach here when we have 
multiple distinct
-              // column sets (aggregate.NewAggregation will not match).
-              sys.error(
-                "Multiple distinct column sets are not supported by the new 
aggregation" +
-                  "code path.")
-            }
+      case logical.Aggregate(groupingExpressions, resultExpressions, child) =>
+        // A single aggregate expression might appear multiple times in 
resultExpressions.
+        // In order to avoid evaluating an individual aggregate function 
multiple times, we'll
+        // build a set of the distinct aggregate expressions and build a 
function which can
+        // be used to re-write expressions so that they reference the single 
copy of the
+        // aggregate function which actually gets computed.
+        val aggregateExpressions = resultExpressions.flatMap { expr =>
+          expr.collect {
+            case agg: AggregateExpression => agg
+          }
+        }.distinct
+        // For those distinct aggregate expressions, we create a map from the
+        // aggregate function to the corresponding attribute of the function.
+        val aggregateFunctionToAttribute = aggregateExpressions.map { agg =>
+          val aggregateFunction = agg.aggregateFunction
+          val attribute = Alias(aggregateFunction, 
aggregateFunction.toString)().toAttribute
+          (aggregateFunction, agg.isDistinct) -> attribute
+        }.toMap
+
+        val (functionsWithDistinct, functionsWithoutDistinct) =
+          aggregateExpressions.partition(_.isDistinct)
+        if 
(functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
+          // This is a sanity check. We should not reach here when we have 
multiple distinct
+          // column sets. Our MultipleDistinctRewriter should take care this 
case.
+          sys.error("You hit a query analyzer bug. Please report your query to 
" +
+            "Spark user mailing list.")
+        }
 
-            val namedGroupingExpressions = groupingExpressions.map {
-              case ne: NamedExpression => ne -> ne
-              // If the expression is not a NamedExpressions, we add an alias.
-              // So, when we generate the result of the operator, the 
Aggregate Operator
-              // can directly get the Seq of attributes representing the 
grouping expressions.
-              case other =>
-                val withAlias = Alias(other, other.toString)()
-                other -> withAlias
-            }
-            val groupExpressionMap = namedGroupingExpressions.toMap
-
-            // The original `resultExpressions` are a set of expressions which 
may reference
-            // aggregate expressions, grouping column values, and constants. 
When aggregate operator
-            // emits output rows, we will use `resultExpressions` to generate 
an output projection
-            // which takes the grouping columns and final aggregate result 
buffer as input.
-            // Thus, we must re-write the result expressions so that their 
attributes match up with
-            // the attributes of the final result projection's input row:
-            val rewrittenResultExpressions = resultExpressions.map { expr =>
-              expr.transformDown {
-                case AggregateExpression2(aggregateFunction, _, isDistinct) =>
-                  // The final aggregation buffer's attributes will be 
`finalAggregationAttributes`,
-                  // so replace each aggregate expression by its corresponding 
attribute in the set:
-                  aggregateFunctionToAttribute(aggregateFunction, isDistinct)
-                case expression =>
-                  // Since we're using `namedGroupingAttributes` to extract 
the grouping key
-                  // columns, we need to replace grouping key expressions with 
their corresponding
-                  // attributes. We do not rely on the equality check at here 
since attributes may
-                  // differ cosmetically. Instead, we use semanticEquals.
-                  groupExpressionMap.collectFirst {
-                    case (expr, ne) if expr semanticEquals expression => 
ne.toAttribute
-                  }.getOrElse(expression)
-              }.asInstanceOf[NamedExpression]
+        val namedGroupingExpressions = groupingExpressions.map {
+          case ne: NamedExpression => ne -> ne
+          // If the expression is not a NamedExpressions, we add an alias.
+          // So, when we generate the result of the operator, the Aggregate 
Operator
+          // can directly get the Seq of attributes representing the grouping 
expressions.
+          case other =>
+            val withAlias = Alias(other, other.toString)()
+            other -> withAlias
+        }
+        val groupExpressionMap = namedGroupingExpressions.toMap
+
+        // The original `resultExpressions` are a set of expressions which may 
reference
+        // aggregate expressions, grouping column values, and constants. When 
aggregate operator
+        // emits output rows, we will use `resultExpressions` to generate an 
output projection
+        // which takes the grouping columns and final aggregate result buffer 
as input.
+        // Thus, we must re-write the result expressions so that their 
attributes match up with
+        // the attributes of the final result projection's input row:
+        val rewrittenResultExpressions = resultExpressions.map { expr =>
+          expr.transformDown {
+            case AggregateExpression(aggregateFunction, _, isDistinct) =>
+              // The final aggregation buffer's attributes will be 
`finalAggregationAttributes`,
+              // so replace each aggregate expression by its corresponding 
attribute in the set:
+              aggregateFunctionToAttribute(aggregateFunction, isDistinct)
+            case expression =>
+              // Since we're using `namedGroupingAttributes` to extract the 
grouping key
+              // columns, we need to replace grouping key expressions with 
their corresponding
+              // attributes. We do not rely on the equality check at here 
since attributes may
+              // differ cosmetically. Instead, we use semanticEquals.
+              groupExpressionMap.collectFirst {
+                case (expr, ne) if expr semanticEquals expression => 
ne.toAttribute
+              }.getOrElse(expression)
+          }.asInstanceOf[NamedExpression]
+        }
+
+        val aggregateOperator =
+          if 
(aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
+            if (functionsWithDistinct.nonEmpty) {
+              sys.error("Distinct columns cannot exist in Aggregate operator 
containing " +
+                "aggregate functions which don't support partial aggregation.")
+            } else {
+              aggregate.Utils.planAggregateWithoutPartial(
+                namedGroupingExpressions.map(_._2),
+                aggregateExpressions,
+                aggregateFunctionToAttribute,
+                rewrittenResultExpressions,
+                planLater(child))
             }
+          } else if (functionsWithDistinct.isEmpty) {
+            aggregate.Utils.planAggregateWithoutDistinct(
+              namedGroupingExpressions.map(_._2),
+              aggregateExpressions,
+              aggregateFunctionToAttribute,
+              rewrittenResultExpressions,
+              planLater(child))
+          } else {
+            aggregate.Utils.planAggregateWithOneDistinct(
+              namedGroupingExpressions.map(_._2),
+              functionsWithDistinct,
+              functionsWithoutDistinct,
+              aggregateFunctionToAttribute,
+              rewrittenResultExpressions,
+              planLater(child))
+          }
 
-            val aggregateOperator =
-              if 
(aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
-                if (functionsWithDistinct.nonEmpty) {
-                  sys.error("Distinct columns cannot exist in Aggregate 
operator containing " +
-                    "aggregate functions which don't support partial 
aggregation.")
-                } else {
-                  aggregate.Utils.planAggregateWithoutPartial(
-                    namedGroupingExpressions.map(_._2),
-                    aggregateExpressions,
-                    aggregateFunctionToAttribute,
-                    rewrittenResultExpressions,
-                    planLater(child))
-                }
-              } else if (functionsWithDistinct.isEmpty) {
-                aggregate.Utils.planAggregateWithoutDistinct(
-                  namedGroupingExpressions.map(_._2),
-                  aggregateExpressions,
-                  aggregateFunctionToAttribute,
-                  rewrittenResultExpressions,
-                  planLater(child))
-              } else {
-                aggregate.Utils.planAggregateWithOneDistinct(
-                  namedGroupingExpressions.map(_._2),
-                  functionsWithDistinct,
-                  functionsWithoutDistinct,
-                  aggregateFunctionToAttribute,
-                  rewrittenResultExpressions,
-                  planLater(child))
-              }
-
-            aggregateOperator
-        }
+        aggregateOperator
 
       case _ => Nil
     }
@@ -422,18 +378,6 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         execution.Filter(condition, planLater(child)) :: Nil
       case e @ logical.Expand(_, _, child) =>
         execution.Expand(e.projections, e.output, planLater(child)) :: Nil
-      case a @ logical.Aggregate(group, agg, child) => {
-        val useNewAggregation = sqlContext.conf.useSqlAggregate2 && 
sqlContext.conf.codegenEnabled
-        if (useNewAggregation && a.newAggregation.isDefined) {
-          // If this logical.Aggregate can be planned to use new aggregation 
code path
-          // (i.e. it can be planned by the Strategy Aggregation), we will not 
use the old
-          // aggregation code path.
-          Nil
-        } else {
-          Utils.checkInvalidAggregateFunction2(a)
-          execution.Aggregate(partial = false, group, agg, planLater(child)) 
:: Nil
-        }
-      }
       case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, 
child) =>
         execution.Window(
           projectList, windowExprs, partitionSpec, orderSpec, 
planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index 99fb7a4..008478a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -35,9 +35,9 @@ import scala.collection.mutable.ArrayBuffer
 abstract class AggregationIterator(
     groupingKeyAttributes: Seq[Attribute],
     valueAttributes: Seq[Attribute],
-    nonCompleteAggregateExpressions: Seq[AggregateExpression2],
+    nonCompleteAggregateExpressions: Seq[AggregateExpression],
     nonCompleteAggregateAttributes: Seq[Attribute],
-    completeAggregateExpressions: Seq[AggregateExpression2],
+    completeAggregateExpressions: Seq[AggregateExpression],
     completeAggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
@@ -76,14 +76,14 @@ abstract class AggregationIterator(
 
   // Initialize all AggregateFunctions by binding references if necessary,
   // and set inputBufferOffset and mutableBufferOffset.
-  protected val allAggregateFunctions: Array[AggregateFunction2] = {
+  protected val allAggregateFunctions: Array[AggregateFunction] = {
     var mutableBufferOffset = 0
     var inputBufferOffset: Int = initialInputBufferOffset
-    val functions = new 
Array[AggregateFunction2](allAggregateExpressions.length)
+    val functions = new 
Array[AggregateFunction](allAggregateExpressions.length)
     var i = 0
     while (i < allAggregateExpressions.length) {
       val func = allAggregateExpressions(i).aggregateFunction
-      val funcWithBoundReferences: AggregateFunction2 = 
allAggregateExpressions(i).mode match {
+      val funcWithBoundReferences: AggregateFunction = 
allAggregateExpressions(i).mode match {
         case Partial | Complete if func.isInstanceOf[ImperativeAggregate] =>
           // We need to create BoundReferences if the function is not an
           // expression-based aggregate function (it does not support 
code-gen) and the mode of
@@ -135,7 +135,7 @@ abstract class AggregationIterator(
   }
 
   // All AggregateFunctions functions with mode Partial, PartialMerge, or 
Final.
-  private[this] val nonCompleteAggregateFunctions: Array[AggregateFunction2] =
+  private[this] val nonCompleteAggregateFunctions: Array[AggregateFunction] =
     allAggregateFunctions.take(nonCompleteAggregateExpressions.length)
 
   // All imperative aggregate functions with mode Partial, PartialMerge, or 
Final.
@@ -172,7 +172,7 @@ abstract class AggregationIterator(
       case (Some(Partial), None) =>
         val updateExpressions = nonCompleteAggregateFunctions.flatMap {
           case ae: DeclarativeAggregate => ae.updateExpressions
-          case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+          case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
         }
         val expressionAggUpdateProjection =
           newMutableProjection(updateExpressions, aggregationBufferSchema ++ 
valueAttributes)()
@@ -204,7 +204,7 @@ abstract class AggregationIterator(
         //    allAggregateFunctions.flatMap(_.cloneBufferAttributes)
         val mergeExpressions = nonCompleteAggregateFunctions.flatMap {
           case ae: DeclarativeAggregate => ae.mergeExpressions
-          case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+          case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
         }
         // This projection is used to merge buffer values for all 
expression-based aggregates.
         val expressionAggMergeProjection =
@@ -225,7 +225,7 @@ abstract class AggregationIterator(
 
       // Final-Complete
       case (Some(Final), Some(Complete)) =>
-        val completeAggregateFunctions: Array[AggregateFunction2] =
+        val completeAggregateFunctions: Array[AggregateFunction] =
           allAggregateFunctions.takeRight(completeAggregateExpressions.length)
         // All imperative aggregate functions with mode Complete.
         val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
@@ -248,7 +248,7 @@ abstract class AggregationIterator(
         val mergeExpressions =
           nonCompleteAggregateFunctions.flatMap {
             case ae: DeclarativeAggregate => ae.mergeExpressions
-            case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+            case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
           } ++ completeOffsetExpressions
         val finalExpressionAggMergeProjection =
           newMutableProjection(mergeExpressions, mergeInputSchema)()
@@ -256,7 +256,7 @@ abstract class AggregationIterator(
         val updateExpressions =
           finalOffsetExpressions ++ completeAggregateFunctions.flatMap {
             case ae: DeclarativeAggregate => ae.updateExpressions
-            case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+            case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
           }
         val completeExpressionAggUpdateProjection =
           newMutableProjection(updateExpressions, aggregationBufferSchema ++ 
valueAttributes)()
@@ -282,7 +282,7 @@ abstract class AggregationIterator(
 
       // Complete-only
       case (None, Some(Complete)) =>
-        val completeAggregateFunctions: Array[AggregateFunction2] =
+        val completeAggregateFunctions: Array[AggregateFunction] =
           allAggregateFunctions.takeRight(completeAggregateExpressions.length)
         // All imperative aggregate functions with mode Complete.
         val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
@@ -291,7 +291,7 @@ abstract class AggregationIterator(
         val updateExpressions =
           completeAggregateFunctions.flatMap {
             case ae: DeclarativeAggregate => ae.updateExpressions
-            case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+            case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
           }
         val completeExpressionAggUpdateProjection =
           newMutableProjection(updateExpressions, aggregationBufferSchema ++ 
valueAttributes)()
@@ -353,7 +353,7 @@ abstract class AggregationIterator(
           allAggregateFunctions.flatMap(_.aggBufferAttributes)
         val evalExpressions = allAggregateFunctions.map {
           case ae: DeclarativeAggregate => ae.evaluateExpression
-          case agg: AggregateFunction2 => NoOp
+          case agg: AggregateFunction => NoOp
         }
         val expressionAggEvalProjection = 
newMutableProjection(evalExpressions, bufferSchemata)()
         val aggregateResultSchema = nonCompleteAggregateAttributes ++ 
completeAggregateAttributes

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index 4d37106..fb7f30c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 case class SortBasedAggregate(
     requiredChildDistributionExpressions: Option[Seq[Expression]],
     groupingExpressions: Seq[NamedExpression],
-    nonCompleteAggregateExpressions: Seq[AggregateExpression2],
+    nonCompleteAggregateExpressions: Seq[AggregateExpression],
     nonCompleteAggregateAttributes: Seq[Attribute],
-    completeAggregateExpressions: Seq[AggregateExpression2],
+    completeAggregateExpressions: Seq[AggregateExpression],
     completeAggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 64c6730..fe5c319 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.aggregate
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression2, 
AggregateFunction2}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
AggregateFunction}
 import org.apache.spark.sql.execution.metric.LongSQLMetric
 
 /**
- * An iterator used to evaluate [[AggregateFunction2]]. It assumes the input 
rows have been
+ * An iterator used to evaluate [[AggregateFunction]]. It assumes the input 
rows have been
  * sorted by values of [[groupingKeyAttributes]].
  */
 class SortBasedAggregationIterator(
@@ -31,9 +31,9 @@ class SortBasedAggregationIterator(
     groupingKeyAttributes: Seq[Attribute],
     valueAttributes: Seq[Attribute],
     inputIterator: Iterator[InternalRow],
-    nonCompleteAggregateExpressions: Seq[AggregateExpression2],
+    nonCompleteAggregateExpressions: Seq[AggregateExpression],
     nonCompleteAggregateAttributes: Seq[Attribute],
-    completeAggregateExpressions: Seq[AggregateExpression2],
+    completeAggregateExpressions: Seq[AggregateExpression],
     completeAggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 1561691..1edde1e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, 
UnsafeFixedWidthAggregationMap}
@@ -30,9 +30,9 @@ import org.apache.spark.sql.types.StructType
 case class TungstenAggregate(
     requiredChildDistributionExpressions: Option[Seq[Expression]],
     groupingExpressions: Seq[NamedExpression],
-    nonCompleteAggregateExpressions: Seq[AggregateExpression2],
+    nonCompleteAggregateExpressions: Seq[AggregateExpression],
     nonCompleteAggregateAttributes: Seq[Attribute],
-    completeAggregateExpressions: Seq[AggregateExpression2],
+    completeAggregateExpressions: Seq[AggregateExpression],
     completeAggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index ce8d592..0439144 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -64,12 +64,12 @@ import org.apache.spark.sql.types.StructType
  * @param groupingExpressions
  *   expressions for grouping keys
  * @param nonCompleteAggregateExpressions
- *   [[AggregateExpression2]] containing [[AggregateFunction2]]s with mode 
[[Partial]],
- *   [[PartialMerge]], or [[Final]].
+ * [[AggregateExpression]] containing [[AggregateFunction]]s with mode 
[[Partial]],
+ * [[PartialMerge]], or [[Final]].
  * @param nonCompleteAggregateAttributes the attributes of the 
nonCompleteAggregateExpressions'
  *   outputs when they are stored in the final aggregation buffer.
  * @param completeAggregateExpressions
- *   [[AggregateExpression2]] containing [[AggregateFunction2]]s with mode 
[[Complete]].
+ * [[AggregateExpression]] containing [[AggregateFunction]]s with mode 
[[Complete]].
  * @param completeAggregateAttributes the attributes of 
completeAggregateExpressions' outputs
  *   when they are stored in the final aggregation buffer.
  * @param resultExpressions
@@ -83,9 +83,9 @@ import org.apache.spark.sql.types.StructType
  */
 class TungstenAggregationIterator(
     groupingExpressions: Seq[NamedExpression],
-    nonCompleteAggregateExpressions: Seq[AggregateExpression2],
+    nonCompleteAggregateExpressions: Seq[AggregateExpression],
     nonCompleteAggregateAttributes: Seq[Attribute],
-    completeAggregateExpressions: Seq[AggregateExpression2],
+    completeAggregateExpressions: Seq[AggregateExpression],
     completeAggregateAttributes: Seq[Attribute],
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
@@ -106,7 +106,7 @@ class TungstenAggregationIterator(
   // A Seq containing all AggregateExpressions.
   // It is important that all AggregateExpressions with the mode Partial, 
PartialMerge or Final
   // are at the beginning of the allAggregateExpressions.
-  private[this] val allAggregateExpressions: Seq[AggregateExpression2] =
+  private[this] val allAggregateExpressions: Seq[AggregateExpression] =
     nonCompleteAggregateExpressions ++ completeAggregateExpressions
 
   // Check to make sure we do not have more than three modes in our 
AggregateExpressions.
@@ -150,10 +150,10 @@ class TungstenAggregationIterator(
   // Initialize all AggregateFunctions by binding references, if necessary,
   // and setting inputBufferOffset and mutableBufferOffset.
   private def initializeAllAggregateFunctions(
-      startingInputBufferOffset: Int): Array[AggregateFunction2] = {
+      startingInputBufferOffset: Int): Array[AggregateFunction] = {
     var mutableBufferOffset = 0
     var inputBufferOffset: Int = startingInputBufferOffset
-    val functions = new 
Array[AggregateFunction2](allAggregateExpressions.length)
+    val functions = new 
Array[AggregateFunction](allAggregateExpressions.length)
     var i = 0
     while (i < allAggregateExpressions.length) {
       val func = allAggregateExpressions(i).aggregateFunction
@@ -195,7 +195,7 @@ class TungstenAggregationIterator(
     functions
   }
 
-  private[this] var allAggregateFunctions: Array[AggregateFunction2] =
+  private[this] var allAggregateFunctions: Array[AggregateFunction] =
     initializeAllAggregateFunctions(initialInputBufferOffset)
 
   // Positions of those imperative aggregate functions in 
allAggregateFunctions.
@@ -263,7 +263,7 @@ class TungstenAggregationIterator(
       case (Some(Partial), None) =>
         val updateExpressions = allAggregateFunctions.flatMap {
           case ae: DeclarativeAggregate => ae.updateExpressions
-          case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+          case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
         }
         val imperativeAggregateFunctions: Array[ImperativeAggregate] =
           allAggregateFunctions.collect { case func: ImperativeAggregate => 
func}
@@ -286,7 +286,7 @@ class TungstenAggregationIterator(
       case (Some(PartialMerge), None) | (Some(Final), None) =>
         val mergeExpressions = allAggregateFunctions.flatMap {
           case ae: DeclarativeAggregate => ae.mergeExpressions
-          case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+          case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
         }
         val imperativeAggregateFunctions: Array[ImperativeAggregate] =
           allAggregateFunctions.collect { case func: ImperativeAggregate => 
func}
@@ -307,11 +307,11 @@ class TungstenAggregationIterator(
 
       // Final-Complete
       case (Some(Final), Some(Complete)) =>
-        val completeAggregateFunctions: Array[AggregateFunction2] =
+        val completeAggregateFunctions: Array[AggregateFunction] =
           allAggregateFunctions.takeRight(completeAggregateExpressions.length)
         val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
           completeAggregateFunctions.collect { case func: ImperativeAggregate 
=> func }
-        val nonCompleteAggregateFunctions: Array[AggregateFunction2] =
+        val nonCompleteAggregateFunctions: Array[AggregateFunction] =
           allAggregateFunctions.take(nonCompleteAggregateExpressions.length)
         val nonCompleteImperativeAggregateFunctions: 
Array[ImperativeAggregate] =
           nonCompleteAggregateFunctions.collect { case func: 
ImperativeAggregate => func }
@@ -321,7 +321,7 @@ class TungstenAggregationIterator(
         val mergeExpressions =
           nonCompleteAggregateFunctions.flatMap {
             case ae: DeclarativeAggregate => ae.mergeExpressions
-            case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+            case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
           } ++ completeOffsetExpressions
         val finalMergeProjection =
           newMutableProjection(mergeExpressions, aggregationBufferAttributes 
++ inputAttributes)()
@@ -331,7 +331,7 @@ class TungstenAggregationIterator(
           
Seq.fill(nonCompleteAggregateFunctions.map(_.aggBufferAttributes.length).sum)(NoOp)
         val updateExpressions = finalOffsetExpressions ++ 
completeAggregateFunctions.flatMap {
           case ae: DeclarativeAggregate => ae.updateExpressions
-          case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+          case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
         }
         val completeUpdateProjection =
           newMutableProjection(updateExpressions, aggregationBufferAttributes 
++ inputAttributes)()
@@ -358,7 +358,7 @@ class TungstenAggregationIterator(
 
       // Complete-only
       case (None, Some(Complete)) =>
-        val completeAggregateFunctions: Array[AggregateFunction2] =
+        val completeAggregateFunctions: Array[AggregateFunction] =
           allAggregateFunctions.takeRight(completeAggregateExpressions.length)
         // All imperative aggregate functions with mode Complete.
         val completeImperativeAggregateFunctions: Array[ImperativeAggregate] =
@@ -366,7 +366,7 @@ class TungstenAggregationIterator(
 
         val updateExpressions = completeAggregateFunctions.flatMap {
           case ae: DeclarativeAggregate => ae.updateExpressions
-          case agg: AggregateFunction2 => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
+          case agg: AggregateFunction => 
Seq.fill(agg.aggBufferAttributes.length)(NoOp)
         }
         val completeExpressionAggUpdateProjection =
           newMutableProjection(updateExpressions, aggregationBufferAttributes 
++ inputAttributes)()
@@ -414,7 +414,7 @@ class TungstenAggregationIterator(
         val joinedRow = new JoinedRow()
         val evalExpressions = allAggregateFunctions.map {
           case ae: DeclarativeAggregate => ae.evaluateExpression
-          case agg: AggregateFunction2 => NoOp
+          case agg: AggregateFunction => NoOp
         }
         val expressionAggEvalProjection = 
newMutableProjection(evalExpressions, bufferAttributes)()
         // These are the attributes of the row produced by 
`expressionAggEvalProjection`

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index d2f56e0..20359c1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.catalyst.expressions.{MutableRow, 
InterpretedMutableProjection, AttributeReference, Expression}
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
AggregateFunction2}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
AggregateFunction}
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
index eaafd83..79abf2d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/utils.scala
@@ -28,8 +28,8 @@ object Utils {
 
   def planAggregateWithoutPartial(
       groupingExpressions: Seq[NamedExpression],
-      aggregateExpressions: Seq[AggregateExpression2],
-      aggregateFunctionToAttribute: Map[(AggregateFunction2, Boolean), 
Attribute],
+      aggregateExpressions: Seq[AggregateExpression],
+      aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), 
Attribute],
       resultExpressions: Seq[NamedExpression],
       child: SparkPlan): Seq[SparkPlan] = {
 
@@ -54,8 +54,8 @@ object Utils {
 
   def planAggregateWithoutDistinct(
       groupingExpressions: Seq[NamedExpression],
-      aggregateExpressions: Seq[AggregateExpression2],
-      aggregateFunctionToAttribute: Map[(AggregateFunction2, Boolean), 
Attribute],
+      aggregateExpressions: Seq[AggregateExpression],
+      aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), 
Attribute],
       resultExpressions: Seq[NamedExpression],
       child: SparkPlan): Seq[SparkPlan] = {
     // Check if we can use TungstenAggregate.
@@ -137,9 +137,9 @@ object Utils {
 
   def planAggregateWithOneDistinct(
       groupingExpressions: Seq[NamedExpression],
-      functionsWithDistinct: Seq[AggregateExpression2],
-      functionsWithoutDistinct: Seq[AggregateExpression2],
-      aggregateFunctionToAttribute: Map[(AggregateFunction2, Boolean), 
Attribute],
+      functionsWithDistinct: Seq[AggregateExpression],
+      functionsWithoutDistinct: Seq[AggregateExpression],
+      aggregateFunctionToAttribute: Map[(AggregateFunction, Boolean), 
Attribute],
       resultExpressions: Seq[NamedExpression],
       child: SparkPlan): Seq[SparkPlan] = {
 
@@ -253,16 +253,16 @@ object Utils {
         // Children of an AggregateFunction with DISTINCT keyword has already
         // been evaluated. At here, we need to replace original children
         // to AttributeReferences.
-        case agg @ AggregateExpression2(aggregateFunction, mode, true) =>
+        case agg @ AggregateExpression(aggregateFunction, mode, true) =>
           val rewrittenAggregateFunction = aggregateFunction.transformDown {
             case expr if expr == distinctColumnExpression => 
distinctColumnAttribute
-          }.asInstanceOf[AggregateFunction2]
+          }.asInstanceOf[AggregateFunction]
           // We rewrite the aggregate function to a non-distinct aggregation 
because
           // its input will have distinct arguments.
           // We just keep the isDistinct setting to true, so when users look 
at the query plan,
           // they still can see distinct aggregations.
           val rewrittenAggregateExpression =
-            AggregateExpression2(rewrittenAggregateFunction, Complete, 
isDistinct = true)
+            AggregateExpression(rewrittenAggregateFunction, Complete, 
isDistinct = true)
 
           val aggregateFunctionAttribute = 
aggregateFunctionToAttribute(agg.aggregateFunction, true)
           (rewrittenAggregateExpression, aggregateFunctionAttribute)

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index 0b3192a..8cc25c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.expressions
 
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, Encoder}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, 
AggregateExpression2}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Complete}
 import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
 import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn}
 
@@ -70,7 +70,7 @@ abstract class Aggregator[-A, B, C] {
       implicit bEncoder: Encoder[B],
       cEncoder: Encoder[C]): TypedColumn[A, C] = {
     val expr =
-      new AggregateExpression2(
+      new AggregateExpression(
         TypedAggregateExpression(this),
         Complete,
         false)
@@ -78,4 +78,3 @@ abstract class Aggregator[-A, B, C] {
     new TypedColumn[A, C](expr, encoderFor[C])
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
index 8b9247a..fc873c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.expressions
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.types.BooleanType
 import org.apache.spark.sql.{Column, catalyst}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 
 
 /**
@@ -141,40 +141,56 @@ class WindowSpec private[sql](
    */
   private[sql] def withAggregate(aggregate: Column): Column = {
     val windowExpr = aggregate.expr match {
-      case Average(child) => WindowExpression(
-        UnresolvedWindowFunction("avg", child :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case Sum(child) => WindowExpression(
-        UnresolvedWindowFunction("sum", child :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case Count(child) => WindowExpression(
-        UnresolvedWindowFunction("count", child :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case First(child, ignoreNulls) => WindowExpression(
-        // TODO this is a hack for Hive UDAF first_value
-        UnresolvedWindowFunction(
-          "first_value",
-          child :: ignoreNulls :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case Last(child, ignoreNulls) => WindowExpression(
-        // TODO this is a hack for Hive UDAF last_value
-        UnresolvedWindowFunction(
-          "last_value",
-          child :: ignoreNulls :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case Min(child) => WindowExpression(
-        UnresolvedWindowFunction("min", child :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case Max(child) => WindowExpression(
-        UnresolvedWindowFunction("max", child :: Nil),
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
-      case wf: WindowFunction => WindowExpression(
-        wf,
-        WindowSpecDefinition(partitionSpec, orderSpec, frame))
+      // First, we check if we get an aggregate function without the DISTINCT 
keyword.
+      // Right now, we do not support using a DISTINCT aggregate function as a
+      // window function.
+      case AggregateExpression(aggregateFunction, _, isDistinct) if 
!isDistinct =>
+        aggregateFunction match {
+          case Average(child) => WindowExpression(
+            UnresolvedWindowFunction("avg", child :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case Sum(child) => WindowExpression(
+            UnresolvedWindowFunction("sum", child :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case Count(child) => WindowExpression(
+            UnresolvedWindowFunction("count", child :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case First(child, ignoreNulls) => WindowExpression(
+            // TODO this is a hack for Hive UDAF first_value
+            UnresolvedWindowFunction(
+              "first_value",
+              child :: ignoreNulls :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case Last(child, ignoreNulls) => WindowExpression(
+            // TODO this is a hack for Hive UDAF last_value
+            UnresolvedWindowFunction(
+              "last_value",
+              child :: ignoreNulls :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case Min(child) => WindowExpression(
+            UnresolvedWindowFunction("min", child :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case Max(child) => WindowExpression(
+            UnresolvedWindowFunction("max", child :: Nil),
+            WindowSpecDefinition(partitionSpec, orderSpec, frame))
+          case x =>
+            throw new UnsupportedOperationException(s"$x is not supported in a 
window operation.")
+        }
+
+      case AggregateExpression(aggregateFunction, _, isDistinct) if isDistinct 
=>
+        throw new UnsupportedOperationException(
+          s"Distinct aggregate function ${aggregateFunction} is not supported 
" +
+            s"in window operation.")
+
+      case wf: WindowFunction =>
+        WindowExpression(
+          wf,
+          WindowSpecDefinition(partitionSpec, orderSpec, frame))
+
       case x =>
-        throw new UnsupportedOperationException(s"$x is not supported in 
window operation.")
+        throw new UnsupportedOperationException(s"$x is not supported in a 
window operation.")
     }
+
     new Column(windowExpr)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
index 258afad..11dbf39 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.expressions
 
-import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, 
AggregateExpression2}
+import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, 
AggregateExpression}
 import org.apache.spark.sql.execution.aggregate.ScalaUDAF
 import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.types._
@@ -109,7 +109,7 @@ abstract class UserDefinedAggregateFunction extends 
Serializable {
   @scala.annotation.varargs
   def apply(exprs: Column*): Column = {
     val aggregateExpression =
-      AggregateExpression2(
+      AggregateExpression(
         ScalaUDAF(exprs.map(_.expr), this),
         Complete,
         isDistinct = false)
@@ -123,7 +123,7 @@ abstract class UserDefinedAggregateFunction extends 
Serializable {
   @scala.annotation.varargs
   def distinct(exprs: Column*): Column = {
     val aggregateExpression =
-      AggregateExpression2(
+      AggregateExpression(
         ScalaUDAF(exprs.map(_.expr), this),
         Complete,
         isDistinct = true)

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 6d56542..22104e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.{SqlParser, 
ScalaReflection}
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, Star}
 import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, Encoder}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -76,6 +77,12 @@ object functions extends LegacyFunctions {
 
   private def withExpr(expr: Expression): Column = Column(expr)
 
+  private def withAggregateFunction(
+    func: AggregateFunction,
+    isDistinct: Boolean = false): Column = {
+    Column(func.toAggregateExpression(isDistinct))
+  }
+
   private implicit def newLongEncoder: Encoder[Long] = 
ExpressionEncoder[Long](flat = true)
 
 
@@ -154,7 +161,9 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def approxCountDistinct(e: Column): Column = withExpr { 
ApproxCountDistinct(e.expr) }
+  def approxCountDistinct(e: Column): Column = withAggregateFunction {
+    HyperLogLogPlusPlus(e.expr)
+  }
 
   /**
    * Aggregate function: returns the approximate number of distinct items in a 
group.
@@ -170,8 +179,8 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def approxCountDistinct(e: Column, rsd: Double): Column = withExpr {
-    ApproxCountDistinct(e.expr, rsd)
+  def approxCountDistinct(e: Column, rsd: Double): Column = 
withAggregateFunction {
+    HyperLogLogPlusPlus(e.expr, rsd, 0, 0)
   }
 
   /**
@@ -190,7 +199,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def avg(e: Column): Column = withExpr { Average(e.expr) }
+  def avg(e: Column): Column = withAggregateFunction { Average(e.expr) }
 
   /**
    * Aggregate function: returns the average of the values in a group.
@@ -226,7 +235,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def corr(column1: Column, column2: Column): Column = withExpr {
+  def corr(column1: Column, column2: Column): Column = withAggregateFunction {
     Corr(column1.expr, column2.expr)
   }
 
@@ -246,7 +255,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def count(e: Column): Column = withExpr {
+  def count(e: Column): Column = withAggregateFunction {
     e.expr match {
       // Turn count(*) into count(1)
       case s: Star => Count(Literal(1))
@@ -269,8 +278,8 @@ object functions extends LegacyFunctions {
    * @since 1.3.0
    */
   @scala.annotation.varargs
-  def countDistinct(expr: Column, exprs: Column*): Column = withExpr {
-    CountDistinct((expr +: exprs).map(_.expr))
+  def countDistinct(expr: Column, exprs: Column*): Column = {
+    withAggregateFunction(Count.apply((expr +: exprs).map(_.expr)), isDistinct 
= true)
   }
 
   /**
@@ -289,7 +298,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def first(e: Column): Column = withExpr { First(e.expr) }
+  def first(e: Column): Column = withAggregateFunction { new First(e.expr) }
 
   /**
    * Aggregate function: returns the first value of a column in a group.
@@ -305,7 +314,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def kurtosis(e: Column): Column = withExpr { Kurtosis(e.expr) }
+  def kurtosis(e: Column): Column = withAggregateFunction { Kurtosis(e.expr) }
 
   /**
    * Aggregate function: returns the last value in a group.
@@ -313,7 +322,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def last(e: Column): Column = withExpr { Last(e.expr) }
+  def last(e: Column): Column = withAggregateFunction { new Last(e.expr) }
 
   /**
    * Aggregate function: returns the last value of the column in a group.
@@ -329,7 +338,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def max(e: Column): Column = withExpr { Max(e.expr) }
+  def max(e: Column): Column = withAggregateFunction { Max(e.expr) }
 
   /**
    * Aggregate function: returns the maximum value of the column in a group.
@@ -363,7 +372,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def min(e: Column): Column = withExpr { Min(e.expr) }
+  def min(e: Column): Column = withAggregateFunction { Min(e.expr) }
 
   /**
    * Aggregate function: returns the minimum value of the column in a group.
@@ -379,7 +388,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def skewness(e: Column): Column = withExpr { Skewness(e.expr) }
+  def skewness(e: Column): Column = withAggregateFunction { Skewness(e.expr) }
 
   /**
    * Aggregate function: alias for [[stddev_samp]].
@@ -387,7 +396,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def stddev(e: Column): Column = withExpr { StddevSamp(e.expr) }
+  def stddev(e: Column): Column = withAggregateFunction { StddevSamp(e.expr) }
 
   /**
    * Aggregate function: returns the unbiased sample standard deviation of
@@ -396,7 +405,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def stddev_samp(e: Column): Column = withExpr { StddevSamp(e.expr) }
+  def stddev_samp(e: Column): Column = withAggregateFunction { 
StddevSamp(e.expr) }
 
   /**
    * Aggregate function: returns the population standard deviation of
@@ -405,7 +414,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def stddev_pop(e: Column): Column = withExpr { StddevPop(e.expr) }
+  def stddev_pop(e: Column): Column = withAggregateFunction { 
StddevPop(e.expr) }
 
   /**
    * Aggregate function: returns the sum of all values in the expression.
@@ -413,7 +422,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def sum(e: Column): Column = withExpr { Sum(e.expr) }
+  def sum(e: Column): Column = withAggregateFunction { Sum(e.expr) }
 
   /**
    * Aggregate function: returns the sum of all values in the given column.
@@ -429,7 +438,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.3.0
    */
-  def sumDistinct(e: Column): Column = withExpr { SumDistinct(e.expr) }
+  def sumDistinct(e: Column): Column = withAggregateFunction(Sum(e.expr), 
isDistinct = true)
 
   /**
    * Aggregate function: returns the sum of distinct values in the expression.
@@ -445,7 +454,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def variance(e: Column): Column = withExpr { VarianceSamp(e.expr) }
+  def variance(e: Column): Column = withAggregateFunction { 
VarianceSamp(e.expr) }
 
   /**
    * Aggregate function: returns the unbiased variance of the values in a 
group.
@@ -453,7 +462,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def var_samp(e: Column): Column = withExpr { VarianceSamp(e.expr) }
+  def var_samp(e: Column): Column = withAggregateFunction { 
VarianceSamp(e.expr) }
 
   /**
    * Aggregate function: returns the population variance of the values in a 
group.
@@ -461,7 +470,7 @@ object functions extends LegacyFunctions {
    * @group agg_funcs
    * @since 1.6.0
    */
-  def var_pop(e: Column): Column = withExpr { VariancePop(e.expr) }
+  def var_pop(e: Column): Column = withAggregateFunction { VariancePop(e.expr) 
}
 
   
//////////////////////////////////////////////////////////////////////////////////////////////
   // Window functions

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 3de277a..441a0c6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -237,34 +237,10 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("SPARK-8828 sum should return null if all input values are null") {
-    withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "true") {
-      withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
-        checkAnswer(
-          sql("select sum(a), avg(a) from allNulls"),
-          Seq(Row(null, null))
-        )
-      }
-      withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") {
-        checkAnswer(
-          sql("select sum(a), avg(a) from allNulls"),
-          Seq(Row(null, null))
-        )
-      }
-    }
-    withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "false") {
-      withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
-        checkAnswer(
-          sql("select sum(a), avg(a) from allNulls"),
-          Seq(Row(null, null))
-        )
-      }
-      withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "false") {
-        checkAnswer(
-          sql("select sum(a), avg(a) from allNulls"),
-          Seq(Row(null, null))
-        )
-      }
-    }
+    checkAnswer(
+      sql("select sum(a), avg(a) from allNulls"),
+      Seq(Row(null, null))
+    )
   }
 
   private def testCodeGen(sqlText: String, expectedResults: Seq[Row]): Unit = {
@@ -507,29 +483,22 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("literal in agg grouping expressions") {
-    def literalInAggTest(): Unit = {
-      checkAnswer(
-        sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"),
-        Seq(Row(1, 2), Row(2, 2), Row(3, 2)))
-      checkAnswer(
-        sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"),
-        Seq(Row(1, 2), Row(2, 2), Row(3, 2)))
-
-      checkAnswer(
-        sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"),
-        sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))
-      checkAnswer(
-        sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"),
-        sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))
-      checkAnswer(
-        sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"),
-        sql("SELECT 1, 2, sum(b) FROM testData2"))
-    }
+    checkAnswer(
+      sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"),
+      Seq(Row(1, 2), Row(2, 2), Row(3, 2)))
+    checkAnswer(
+      sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"),
+      Seq(Row(1, 2), Row(2, 2), Row(3, 2)))
 
-    literalInAggTest()
-    withSQLConf(SQLConf.USE_SQL_AGGREGATE2.key -> "false") {
-      literalInAggTest()
-    }
+    checkAnswer(
+      sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"),
+      sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))
+    checkAnswer(
+      sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"),
+      sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a"))
+    checkAnswer(
+      sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"),
+      sql("SELECT 1, 2, sum(b) FROM testData2"))
   }
 
   test("aggregates with nulls") {

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index a229e58..e31c528 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -21,16 +21,13 @@ import 
org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData}
 
 import scala.beans.{BeanInfo, BeanProperty}
 
-import com.clearspring.analytics.stream.cardinality.HyperLogLog
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.{OpenHashSetUDT, 
HyperLogLogUDT}
+import org.apache.spark.sql.catalyst.expressions.OpenHashSetUDT
 import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.OpenHashSet
 
 
@@ -134,16 +131,6 @@ class UserDefinedTypeSuite extends QueryTest with 
SharedSQLContext with ParquetT
     
df.orderBy('int).limit(1).groupBy('int).agg(first('vec)).collect()(0).getAs[MyDenseVector](0)
   }
 
-  test("HyperLogLogUDT") {
-    val hyperLogLogUDT = HyperLogLogUDT
-    val hyperLogLog = new HyperLogLog(0.4)
-    (1 to 10).foreach(i => hyperLogLog.offer(Row(i)))
-
-    val actual = 
hyperLogLogUDT.deserialize(hyperLogLogUDT.serialize(hyperLogLog))
-    assert(actual.cardinality() === hyperLogLog.cardinality())
-    assert(java.util.Arrays.equals(actual.getBytes, hyperLogLog.getBytes))
-  }
-
   test("OpenHashSetUDT") {
     val openHashSetUDT = new OpenHashSetUDT(IntegerType)
     val set = new OpenHashSet[Int]

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 2076c57..44634da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -38,7 +38,7 @@ class PlannerSuite extends SharedSQLContext {
   private def testPartialAggregationPlan(query: LogicalPlan): Unit = {
     val planner = sqlContext.planner
     import planner._
-    val plannedOption = 
HashAggregation(query).headOption.orElse(Aggregation(query).headOption)
+    val plannedOption = Aggregation(query).headOption
     val planned =
       plannedOption.getOrElse(
         fail(s"Could query play aggregation query $query. Is it an aggregation 
query?"))

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index cdd885b..4b4f5c6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -152,36 +152,6 @@ class SQLMetricsSuite extends SparkFunSuite with 
SharedSQLContext {
     )
   }
 
-  test("Aggregate metrics") {
-    withSQLConf(
-      SQLConf.UNSAFE_ENABLED.key -> "false",
-      SQLConf.CODEGEN_ENABLED.key -> "false",
-      SQLConf.TUNGSTEN_ENABLED.key -> "false") {
-      // Assume the execution plan is
-      // ... -> Aggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) -> 
Aggregate(nodeId = 0)
-      val df = testData2.groupBy().count() // 2 partitions
-      testSparkPlanMetrics(df, 1, Map(
-        2L -> ("Aggregate", Map(
-          "number of input rows" -> 6L,
-          "number of output rows" -> 2L)),
-        0L -> ("Aggregate", Map(
-          "number of input rows" -> 2L,
-          "number of output rows" -> 1L)))
-      )
-
-      // 2 partitions and each partition contains 2 keys
-      val df2 = testData2.groupBy('a).count()
-      testSparkPlanMetrics(df2, 1, Map(
-        2L -> ("Aggregate", Map(
-          "number of input rows" -> 6L,
-          "number of output rows" -> 4L)),
-        0L -> ("Aggregate", Map(
-          "number of input rows" -> 4L,
-          "number of output rows" -> 3L)))
-      )
-    }
-  }
-
   test("SortBasedAggregate metrics") {
     // Because SortBasedAggregate may skip different rows if the number of 
partitions is different,
     // this test should use the deterministic number of partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/e0701c75/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c5f6965..ba62046 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -584,7 +584,6 @@ class HiveContext private[hive](
       HiveTableScans,
       DataSinks,
       Scripts,
-      HashAggregation,
       Aggregation,
       LeftSemiJoin,
       EquiJoinSelection,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to