Repository: spark
Updated Branches:
  refs/heads/branch-1.3 47e4d579e -> 478ee3f91


[SPARK-5605][SQL][DF] Allow using String to specify colum name in DSL aggregate 
functions

Author: Reynold Xin <r...@databricks.com>

Closes #4376 from rxin/SPARK-5605 and squashes the following commits:

c55f5fa [Reynold Xin] Added a Python test.
f4b8dbb [Reynold Xin] [SPARK-5605][SQL][DF] Allow using String to specify colum 
name in DSL aggregate functions.

(cherry picked from commit 1fbd124b1bd6159086d8e88b139ce0817af02322)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/478ee3f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/478ee3f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/478ee3f9

Branch: refs/heads/branch-1.3
Commit: 478ee3f91355263e6dd63d5ad11a7d7c2ef4fc18
Parents: 47e4d57
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Feb 4 18:35:51 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Feb 4 18:36:09 2015 -0800

----------------------------------------------------------------------
 python/pyspark/sql.py                           |  13 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |   8 +-
 .../org/apache/spark/sql/DataFrameImpl.scala    |   8 +-
 .../main/scala/org/apache/spark/sql/Dsl.scala   |  37 ++++
 .../org/apache/spark/sql/GroupedData.scala      | 176 +++++++++++++++++++
 .../org/apache/spark/sql/GroupedDataFrame.scala | 176 -------------------
 .../apache/spark/sql/IncomputableColumn.scala   |   4 +-
 7 files changed, 231 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 5b56b36..417db34 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -23,7 +23,7 @@ public classes of Spark SQL:
     - L{DataFrame}
       A Resilient Distributed Dataset (RDD) with Schema information for the 
data contained. In
       addition to normal RDD operations, DataFrames also support SQL.
-    - L{GroupedDataFrame}
+    - L{GroupedData}
     - L{Column}
       Column is a DataFrame with a single column.
     - L{Row}
@@ -62,7 +62,7 @@ __all__ = [
     "StringType", "BinaryType", "BooleanType", "DateType", "TimestampType", 
"DecimalType",
     "DoubleType", "FloatType", "ByteType", "IntegerType", "LongType",
     "ShortType", "ArrayType", "MapType", "StructField", "StructType",
-    "SQLContext", "HiveContext", "DataFrame", "GroupedDataFrame", "Column", 
"Row", "Dsl",
+    "SQLContext", "HiveContext", "DataFrame", "GroupedData", "Column", "Row", 
"Dsl",
     "SchemaRDD"]
 
 
@@ -2231,7 +2231,7 @@ class DataFrame(object):
 
     def groupBy(self, *cols):
         """ Group the :class:`DataFrame` using the specified columns,
-        so we can run aggregation on them. See :class:`GroupedDataFrame`
+        so we can run aggregation on them. See :class:`GroupedData`
         for all the available aggregate functions.
 
         >>> df.groupBy().avg().collect()
@@ -2244,7 +2244,7 @@ class DataFrame(object):
         jcols = ListConverter().convert([_to_java_column(c) for c in cols],
                                         self._sc._gateway._gateway_client)
         jdf = self._jdf.groupBy(self.sql_ctx._sc._jvm.PythonUtils.toSeq(jcols))
-        return GroupedDataFrame(jdf, self.sql_ctx)
+        return GroupedData(jdf, self.sql_ctx)
 
     def agg(self, *exprs):
         """ Aggregate on the entire :class:`DataFrame` without groups
@@ -2308,7 +2308,7 @@ def dfapi(f):
     return _api
 
 
-class GroupedDataFrame(object):
+class GroupedData(object):
 
     """
     A set of methods for aggregations on a :class:`DataFrame`,
@@ -2638,6 +2638,9 @@ class Dsl(object):
         >>> from pyspark.sql import Dsl
         >>> df.agg(Dsl.countDistinct(df.age, df.name).alias('c')).collect()
         [Row(c=2)]
+
+        >>> df.agg(Dsl.countDistinct("age", "name").alias('c')).collect()
+        [Row(c=2)]
         """
         sc = SparkContext._active_spark_context
         jcols = ListConverter().convert([_to_java_column(c) for c in cols],

http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index a4997fb..92e04ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -290,7 +290,7 @@ trait DataFrame extends RDDApi[Row] {
 
   /**
    * Groups the [[DataFrame]] using the specified columns, so we can run 
aggregation on them.
-   * See [[GroupedDataFrame]] for all the available aggregate functions.
+   * See [[GroupedData]] for all the available aggregate functions.
    *
    * {{{
    *   // Compute the average for all numeric columns grouped by department.
@@ -304,11 +304,11 @@ trait DataFrame extends RDDApi[Row] {
    * }}}
    */
   @scala.annotation.varargs
-  def groupBy(cols: Column*): GroupedDataFrame
+  def groupBy(cols: Column*): GroupedData
 
   /**
    * Groups the [[DataFrame]] using the specified columns, so we can run 
aggregation on them.
-   * See [[GroupedDataFrame]] for all the available aggregate functions.
+   * See [[GroupedData]] for all the available aggregate functions.
    *
    * This is a variant of groupBy that can only group by existing columns 
using column names
    * (i.e. cannot construct expressions).
@@ -325,7 +325,7 @@ trait DataFrame extends RDDApi[Row] {
    * }}}
    */
   @scala.annotation.varargs
-  def groupBy(col1: String, cols: String*): GroupedDataFrame
+  def groupBy(col1: String, cols: String*): GroupedData
 
   /**
    * (Scala-specific) Compute aggregates by specifying a map from column name 
to

http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index c702adc..d6df927 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -201,13 +201,13 @@ private[sql] class DataFrameImpl protected[sql](
     filter(condition)
   }
 
-  override def groupBy(cols: Column*): GroupedDataFrame = {
-    new GroupedDataFrame(this, cols.map(_.expr))
+  override def groupBy(cols: Column*): GroupedData = {
+    new GroupedData(this, cols.map(_.expr))
   }
 
-  override def groupBy(col1: String, cols: String*): GroupedDataFrame = {
+  override def groupBy(col1: String, cols: String*): GroupedData = {
     val colNames: Seq[String] = col1 +: cols
-    new GroupedDataFrame(this, colNames.map(colName => resolve(colName)))
+    new GroupedData(this, colNames.map(colName => resolve(colName)))
   }
 
   override def limit(n: Int): DataFrame = {

http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala
index 50f442d..9afe496 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala
@@ -94,38 +94,75 @@ object Dsl {
   /** Aggregate function: returns the sum of all values in the expression. */
   def sum(e: Column): Column = Sum(e.expr)
 
+  /** Aggregate function: returns the sum of all values in the given column. */
+  def sum(columnName: String): Column = sum(Column(columnName))
+
   /** Aggregate function: returns the sum of distinct values in the 
expression. */
   def sumDistinct(e: Column): Column = SumDistinct(e.expr)
 
+  /** Aggregate function: returns the sum of distinct values in the 
expression. */
+  def sumDistinct(columnName: String): Column = sumDistinct(Column(columnName))
+
   /** Aggregate function: returns the number of items in a group. */
   def count(e: Column): Column = Count(e.expr)
 
+  /** Aggregate function: returns the number of items in a group. */
+  def count(columnName: String): Column = count(Column(columnName))
+
   /** Aggregate function: returns the number of distinct items in a group. */
   @scala.annotation.varargs
   def countDistinct(expr: Column, exprs: Column*): Column =
     CountDistinct((expr +: exprs).map(_.expr))
 
+  /** Aggregate function: returns the number of distinct items in a group. */
+  @scala.annotation.varargs
+  def countDistinct(columnName: String, columnNames: String*): Column =
+    countDistinct(Column(columnName), columnNames.map(Column.apply) :_*)
+
   /** Aggregate function: returns the approximate number of distinct items in 
a group. */
   def approxCountDistinct(e: Column): Column = ApproxCountDistinct(e.expr)
 
   /** Aggregate function: returns the approximate number of distinct items in 
a group. */
+  def approxCountDistinct(columnName: String): Column = 
approxCountDistinct(column(columnName))
+
+  /** Aggregate function: returns the approximate number of distinct items in 
a group. */
   def approxCountDistinct(e: Column, rsd: Double): Column = 
ApproxCountDistinct(e.expr, rsd)
 
+  /** Aggregate function: returns the approximate number of distinct items in 
a group. */
+  def approxCountDistinct(columnName: String, rsd: Double): Column = {
+    approxCountDistinct(Column(columnName), rsd)
+  }
+
   /** Aggregate function: returns the average of the values in a group. */
   def avg(e: Column): Column = Average(e.expr)
 
+  /** Aggregate function: returns the average of the values in a group. */
+  def avg(columnName: String): Column = avg(Column(columnName))
+
   /** Aggregate function: returns the first value in a group. */
   def first(e: Column): Column = First(e.expr)
 
+  /** Aggregate function: returns the first value of a column in a group. */
+  def first(columnName: String): Column = first(Column(columnName))
+
   /** Aggregate function: returns the last value in a group. */
   def last(e: Column): Column = Last(e.expr)
 
+  /** Aggregate function: returns the last value of the column in a group. */
+  def last(columnName: String): Column = last(Column(columnName))
+
   /** Aggregate function: returns the minimum value of the expression in a 
group. */
   def min(e: Column): Column = Min(e.expr)
 
+  /** Aggregate function: returns the minimum value of the column in a group. 
*/
+  def min(columnName: String): Column = min(Column(columnName))
+
   /** Aggregate function: returns the maximum value of the expression in a 
group. */
   def max(e: Column): Column = Max(e.expr)
 
+  /** Aggregate function: returns the maximum value of the column in a group. 
*/
+  def max(columnName: String): Column = max(Column(columnName))
+
   
//////////////////////////////////////////////////////////////////////////////////////////////
   
//////////////////////////////////////////////////////////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
new file mode 100644
index 0000000..3c20676
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.language.implicitConversions
+import scala.collection.JavaConversions._
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
+import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+
+
+/**
+ * A set of methods for aggregations on a [[DataFrame]], created by 
[[DataFrame.groupBy]].
+ */
+class GroupedData protected[sql](df: DataFrameImpl, groupingExprs: 
Seq[Expression]) {
+
+  private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): 
DataFrame = {
+    val namedGroupingExprs = groupingExprs.map {
+      case expr: NamedExpression => expr
+      case expr: Expression => Alias(expr, expr.toString)()
+    }
+    DataFrame(
+      df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, 
df.logicalPlan))
+  }
+
+  private[this] def aggregateNumericColumns(f: Expression => Expression): 
Seq[NamedExpression] = {
+    df.numericColumns.map { c =>
+      val a = f(c)
+      Alias(a, a.toString)()
+    }
+  }
+
+  private[this] def strToExpr(expr: String): (Expression => Expression) = {
+    expr.toLowerCase match {
+      case "avg" | "average" | "mean" => Average
+      case "max" => Max
+      case "min" => Min
+      case "sum" => Sum
+      case "count" | "size" => Count
+    }
+  }
+
+  /**
+   * (Scala-specific) Compute aggregates by specifying a map from column name 
to
+   * aggregate methods. The resulting [[DataFrame]] will also contain the 
grouping columns.
+   *
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *   df.groupBy("department").agg(
+   *     "age" -> "max",
+   *     "expense" -> "sum"
+   *   )
+   * }}}
+   */
+  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = 
{
+    agg((aggExpr +: aggExprs).toMap)
+  }
+
+  /**
+   * (Scala-specific) Compute aggregates by specifying a map from column name 
to
+   * aggregate methods. The resulting [[DataFrame]] will also contain the 
grouping columns.
+   *
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *   df.groupBy("department").agg(Map(
+   *     "age" -> "max",
+   *     "expense" -> "sum"
+   *   ))
+   * }}}
+   */
+  def agg(exprs: Map[String, String]): DataFrame = {
+    exprs.map { case (colName, expr) =>
+      val a = strToExpr(expr)(df(colName).expr)
+      Alias(a, a.toString)()
+    }.toSeq
+  }
+
+  /**
+   * (Java-specific) Compute aggregates by specifying a map from column name to
+   * aggregate methods. The resulting [[DataFrame]] will also contain the 
grouping columns.
+   *
+   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *   import com.google.common.collect.ImmutableMap;
+   *   df.groupBy("department").agg(ImmutableMap.<String, String>builder()
+   *     .put("age", "max")
+   *     .put("expense", "sum")
+   *     .build());
+   * }}}
+   */
+  def agg(exprs: java.util.Map[String, String]): DataFrame = {
+    agg(exprs.toMap)
+  }
+
+  /**
+   * Compute aggregates by specifying a series of aggregate columns. Unlike 
other methods in this
+   * class, the resulting [[DataFrame]] won't automatically include the 
grouping columns.
+   *
+   * The available aggregate methods are defined in 
[[org.apache.spark.sql.Dsl]].
+   *
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *
+   *   // Scala:
+   *   import org.apache.spark.sql.dsl._
+   *   df.groupBy("department").agg($"department", max($"age"), 
sum($"expense"))
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.Dsl.*;
+   *   df.groupBy("department").agg(col("department"), max(col("age")), 
sum(col("expense")));
+   * }}}
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame = {
+    val aggExprs = (expr +: exprs).map(_.expr).map {
+      case expr: NamedExpression => expr
+      case expr: Expression => Alias(expr, expr.toString)()
+    }
+    DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, 
df.logicalPlan))
+  }
+
+  /**
+   * Count the number of rows for each group.
+   * The resulting [[DataFrame]] will also contain the grouping columns.
+   */
+  def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")())
+
+  /**
+   * Compute the average value for each numeric columns for each group. This 
is an alias for `avg`.
+   * The resulting [[DataFrame]] will also contain the grouping columns.
+   */
+  def mean(): DataFrame = aggregateNumericColumns(Average)
+
+  /**
+   * Compute the max value for each numeric columns for each group.
+   * The resulting [[DataFrame]] will also contain the grouping columns.
+   */
+  def max(): DataFrame = aggregateNumericColumns(Max)
+
+  /**
+   * Compute the mean value for each numeric columns for each group.
+   * The resulting [[DataFrame]] will also contain the grouping columns.
+   */
+  def avg(): DataFrame = aggregateNumericColumns(Average)
+
+  /**
+   * Compute the min value for each numeric column for each group.
+   * The resulting [[DataFrame]] will also contain the grouping columns.
+   */
+  def min(): DataFrame = aggregateNumericColumns(Min)
+
+  /**
+   * Compute the sum for each numeric columns for each group.
+   * The resulting [[DataFrame]] will also contain the grouping columns.
+   */
+  def sum(): DataFrame = aggregateNumericColumns(Sum)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
deleted file mode 100644
index 7963cb0..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.language.implicitConversions
-import scala.collection.JavaConversions._
-
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr}
-import org.apache.spark.sql.catalyst.plans.logical.Aggregate
-
-
-/**
- * A set of methods for aggregations on a [[DataFrame]], created by 
[[DataFrame.groupBy]].
- */
-class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: 
Seq[Expression]) {
-
-  private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): 
DataFrame = {
-    val namedGroupingExprs = groupingExprs.map {
-      case expr: NamedExpression => expr
-      case expr: Expression => Alias(expr, expr.toString)()
-    }
-    DataFrame(
-      df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, 
df.logicalPlan))
-  }
-
-  private[this] def aggregateNumericColumns(f: Expression => Expression): 
Seq[NamedExpression] = {
-    df.numericColumns.map { c =>
-      val a = f(c)
-      Alias(a, a.toString)()
-    }
-  }
-
-  private[this] def strToExpr(expr: String): (Expression => Expression) = {
-    expr.toLowerCase match {
-      case "avg" | "average" | "mean" => Average
-      case "max" => Max
-      case "min" => Min
-      case "sum" => Sum
-      case "count" | "size" => Count
-    }
-  }
-
-  /**
-   * (Scala-specific) Compute aggregates by specifying a map from column name 
to
-   * aggregate methods. The resulting [[DataFrame]] will also contain the 
grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   df.groupBy("department").agg(
-   *     "age" -> "max",
-   *     "expense" -> "sum"
-   *   )
-   * }}}
-   */
-  def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = 
{
-    agg((aggExpr +: aggExprs).toMap)
-  }
-
-  /**
-   * (Scala-specific) Compute aggregates by specifying a map from column name 
to
-   * aggregate methods. The resulting [[DataFrame]] will also contain the 
grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   df.groupBy("department").agg(Map(
-   *     "age" -> "max",
-   *     "expense" -> "sum"
-   *   ))
-   * }}}
-   */
-  def agg(exprs: Map[String, String]): DataFrame = {
-    exprs.map { case (colName, expr) =>
-      val a = strToExpr(expr)(df(colName).expr)
-      Alias(a, a.toString)()
-    }.toSeq
-  }
-
-  /**
-   * (Java-specific) Compute aggregates by specifying a map from column name to
-   * aggregate methods. The resulting [[DataFrame]] will also contain the 
grouping columns.
-   *
-   * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`.
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *   import com.google.common.collect.ImmutableMap;
-   *   df.groupBy("department").agg(ImmutableMap.<String, String>builder()
-   *     .put("age", "max")
-   *     .put("expense", "sum")
-   *     .build());
-   * }}}
-   */
-  def agg(exprs: java.util.Map[String, String]): DataFrame = {
-    agg(exprs.toMap)
-  }
-
-  /**
-   * Compute aggregates by specifying a series of aggregate columns. Unlike 
other methods in this
-   * class, the resulting [[DataFrame]] won't automatically include the 
grouping columns.
-   *
-   * The available aggregate methods are defined in 
[[org.apache.spark.sql.Dsl]].
-   *
-   * {{{
-   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
-   *
-   *   // Scala:
-   *   import org.apache.spark.sql.dsl._
-   *   df.groupBy("department").agg($"department", max($"age"), 
sum($"expense"))
-   *
-   *   // Java:
-   *   import static org.apache.spark.sql.Dsl.*;
-   *   df.groupBy("department").agg(col("department"), max(col("age")), 
sum(col("expense")));
-   * }}}
-   */
-  @scala.annotation.varargs
-  def agg(expr: Column, exprs: Column*): DataFrame = {
-    val aggExprs = (expr +: exprs).map(_.expr).map {
-      case expr: NamedExpression => expr
-      case expr: Expression => Alias(expr, expr.toString)()
-    }
-    DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, 
df.logicalPlan))
-  }
-
-  /**
-   * Count the number of rows for each group.
-   * The resulting [[DataFrame]] will also contain the grouping columns.
-   */
-  def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")())
-
-  /**
-   * Compute the average value for each numeric columns for each group. This 
is an alias for `avg`.
-   * The resulting [[DataFrame]] will also contain the grouping columns.
-   */
-  def mean(): DataFrame = aggregateNumericColumns(Average)
-
-  /**
-   * Compute the max value for each numeric columns for each group.
-   * The resulting [[DataFrame]] will also contain the grouping columns.
-   */
-  def max(): DataFrame = aggregateNumericColumns(Max)
-
-  /**
-   * Compute the mean value for each numeric columns for each group.
-   * The resulting [[DataFrame]] will also contain the grouping columns.
-   */
-  def avg(): DataFrame = aggregateNumericColumns(Average)
-
-  /**
-   * Compute the min value for each numeric column for each group.
-   * The resulting [[DataFrame]] will also contain the grouping columns.
-   */
-  def min(): DataFrame = aggregateNumericColumns(Min)
-
-  /**
-   * Compute the sum for each numeric columns for each group.
-   * The resulting [[DataFrame]] will also contain the grouping columns.
-   */
-  def sum(): DataFrame = aggregateNumericColumns(Sum)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/478ee3f9/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 6b032d3..fedd7f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -90,9 +90,9 @@ private[sql] class IncomputableColumn(protected[sql] val 
expr: Expression) exten
 
   override def apply(condition: Column): DataFrame = err()
 
-  override def groupBy(cols: Column*): GroupedDataFrame = err()
+  override def groupBy(cols: Column*): GroupedData = err()
 
-  override def groupBy(col1: String, cols: String*): GroupedDataFrame = err()
+  override def groupBy(col1: String, cols: String*): GroupedData = err()
 
   override def limit(n: Int): DataFrame = err()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to