Repository: spark Updated Branches: refs/heads/master 9a7ce70ea -> 1fbd124b1
[SPARK-5605][SQL][DF] Allow using String to specify colum name in DSL aggregate functions Author: Reynold Xin <r...@databricks.com> Closes #4376 from rxin/SPARK-5605 and squashes the following commits: c55f5fa [Reynold Xin] Added a Python test. f4b8dbb [Reynold Xin] [SPARK-5605][SQL][DF] Allow using String to specify colum name in DSL aggregate functions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fbd124b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fbd124b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fbd124b Branch: refs/heads/master Commit: 1fbd124b1bd6159086d8e88b139ce0817af02322 Parents: 9a7ce70 Author: Reynold Xin <r...@databricks.com> Authored: Wed Feb 4 18:35:51 2015 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Feb 4 18:35:51 2015 -0800 ---------------------------------------------------------------------- python/pyspark/sql.py | 13 +- .../scala/org/apache/spark/sql/DataFrame.scala | 8 +- .../org/apache/spark/sql/DataFrameImpl.scala | 8 +- .../main/scala/org/apache/spark/sql/Dsl.scala | 37 ++++ .../org/apache/spark/sql/GroupedData.scala | 176 +++++++++++++++++++ .../org/apache/spark/sql/GroupedDataFrame.scala | 176 ------------------- .../apache/spark/sql/IncomputableColumn.scala | 4 +- 7 files changed, 231 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/python/pyspark/sql.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 5b56b36..417db34 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -23,7 +23,7 @@ public classes of Spark SQL: - L{DataFrame} A Resilient Distributed Dataset (RDD) with Schema information for the data contained. In addition to normal RDD operations, DataFrames also support SQL. - - L{GroupedDataFrame} + - L{GroupedData} - L{Column} Column is a DataFrame with a single column. - L{Row} @@ -62,7 +62,7 @@ __all__ = [ "StringType", "BinaryType", "BooleanType", "DateType", "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType", "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType", - "SQLContext", "HiveContext", "DataFrame", "GroupedDataFrame", "Column", "Row", "Dsl", + "SQLContext", "HiveContext", "DataFrame", "GroupedData", "Column", "Row", "Dsl", "SchemaRDD"] @@ -2231,7 +2231,7 @@ class DataFrame(object): def groupBy(self, *cols): """ Group the :class:`DataFrame` using the specified columns, - so we can run aggregation on them. See :class:`GroupedDataFrame` + so we can run aggregation on them. See :class:`GroupedData` for all the available aggregate functions. >>> df.groupBy().avg().collect() @@ -2244,7 +2244,7 @@ class DataFrame(object): jcols = ListConverter().convert([_to_java_column(c) for c in cols], self._sc._gateway._gateway_client) jdf = self._jdf.groupBy(self.sql_ctx._sc._jvm.PythonUtils.toSeq(jcols)) - return GroupedDataFrame(jdf, self.sql_ctx) + return GroupedData(jdf, self.sql_ctx) def agg(self, *exprs): """ Aggregate on the entire :class:`DataFrame` without groups @@ -2308,7 +2308,7 @@ def dfapi(f): return _api -class GroupedDataFrame(object): +class GroupedData(object): """ A set of methods for aggregations on a :class:`DataFrame`, @@ -2638,6 +2638,9 @@ class Dsl(object): >>> from pyspark.sql import Dsl >>> df.agg(Dsl.countDistinct(df.age, df.name).alias('c')).collect() [Row(c=2)] + + >>> df.agg(Dsl.countDistinct("age", "name").alias('c')).collect() + [Row(c=2)] """ sc = SparkContext._active_spark_context jcols = ListConverter().convert([_to_java_column(c) for c in cols], http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index a4997fb..92e04ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -290,7 +290,7 @@ trait DataFrame extends RDDApi[Row] { /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. - * See [[GroupedDataFrame]] for all the available aggregate functions. + * See [[GroupedData]] for all the available aggregate functions. * * {{{ * // Compute the average for all numeric columns grouped by department. @@ -304,11 +304,11 @@ trait DataFrame extends RDDApi[Row] { * }}} */ @scala.annotation.varargs - def groupBy(cols: Column*): GroupedDataFrame + def groupBy(cols: Column*): GroupedData /** * Groups the [[DataFrame]] using the specified columns, so we can run aggregation on them. - * See [[GroupedDataFrame]] for all the available aggregate functions. + * See [[GroupedData]] for all the available aggregate functions. * * This is a variant of groupBy that can only group by existing columns using column names * (i.e. cannot construct expressions). @@ -325,7 +325,7 @@ trait DataFrame extends RDDApi[Row] { * }}} */ @scala.annotation.varargs - def groupBy(col1: String, cols: String*): GroupedDataFrame + def groupBy(col1: String, cols: String*): GroupedData /** * (Scala-specific) Compute aggregates by specifying a map from column name to http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index c702adc..d6df927 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -201,13 +201,13 @@ private[sql] class DataFrameImpl protected[sql]( filter(condition) } - override def groupBy(cols: Column*): GroupedDataFrame = { - new GroupedDataFrame(this, cols.map(_.expr)) + override def groupBy(cols: Column*): GroupedData = { + new GroupedData(this, cols.map(_.expr)) } - override def groupBy(col1: String, cols: String*): GroupedDataFrame = { + override def groupBy(col1: String, cols: String*): GroupedData = { val colNames: Seq[String] = col1 +: cols - new GroupedDataFrame(this, colNames.map(colName => resolve(colName))) + new GroupedData(this, colNames.map(colName => resolve(colName))) } override def limit(n: Int): DataFrame = { http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala index 50f442d..9afe496 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala @@ -94,38 +94,75 @@ object Dsl { /** Aggregate function: returns the sum of all values in the expression. */ def sum(e: Column): Column = Sum(e.expr) + /** Aggregate function: returns the sum of all values in the given column. */ + def sum(columnName: String): Column = sum(Column(columnName)) + /** Aggregate function: returns the sum of distinct values in the expression. */ def sumDistinct(e: Column): Column = SumDistinct(e.expr) + /** Aggregate function: returns the sum of distinct values in the expression. */ + def sumDistinct(columnName: String): Column = sumDistinct(Column(columnName)) + /** Aggregate function: returns the number of items in a group. */ def count(e: Column): Column = Count(e.expr) + /** Aggregate function: returns the number of items in a group. */ + def count(columnName: String): Column = count(Column(columnName)) + /** Aggregate function: returns the number of distinct items in a group. */ @scala.annotation.varargs def countDistinct(expr: Column, exprs: Column*): Column = CountDistinct((expr +: exprs).map(_.expr)) + /** Aggregate function: returns the number of distinct items in a group. */ + @scala.annotation.varargs + def countDistinct(columnName: String, columnNames: String*): Column = + countDistinct(Column(columnName), columnNames.map(Column.apply) :_*) + /** Aggregate function: returns the approximate number of distinct items in a group. */ def approxCountDistinct(e: Column): Column = ApproxCountDistinct(e.expr) /** Aggregate function: returns the approximate number of distinct items in a group. */ + def approxCountDistinct(columnName: String): Column = approxCountDistinct(column(columnName)) + + /** Aggregate function: returns the approximate number of distinct items in a group. */ def approxCountDistinct(e: Column, rsd: Double): Column = ApproxCountDistinct(e.expr, rsd) + /** Aggregate function: returns the approximate number of distinct items in a group. */ + def approxCountDistinct(columnName: String, rsd: Double): Column = { + approxCountDistinct(Column(columnName), rsd) + } + /** Aggregate function: returns the average of the values in a group. */ def avg(e: Column): Column = Average(e.expr) + /** Aggregate function: returns the average of the values in a group. */ + def avg(columnName: String): Column = avg(Column(columnName)) + /** Aggregate function: returns the first value in a group. */ def first(e: Column): Column = First(e.expr) + /** Aggregate function: returns the first value of a column in a group. */ + def first(columnName: String): Column = first(Column(columnName)) + /** Aggregate function: returns the last value in a group. */ def last(e: Column): Column = Last(e.expr) + /** Aggregate function: returns the last value of the column in a group. */ + def last(columnName: String): Column = last(Column(columnName)) + /** Aggregate function: returns the minimum value of the expression in a group. */ def min(e: Column): Column = Min(e.expr) + /** Aggregate function: returns the minimum value of the column in a group. */ + def min(columnName: String): Column = min(Column(columnName)) + /** Aggregate function: returns the maximum value of the expression in a group. */ def max(e: Column): Column = Max(e.expr) + /** Aggregate function: returns the maximum value of the column in a group. */ + def max(columnName: String): Column = max(Column(columnName)) + ////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala new file mode 100644 index 0000000..3c20676 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.language.implicitConversions +import scala.collection.JavaConversions._ + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr} +import org.apache.spark.sql.catalyst.plans.logical.Aggregate + + +/** + * A set of methods for aggregations on a [[DataFrame]], created by [[DataFrame.groupBy]]. + */ +class GroupedData protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression]) { + + private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): DataFrame = { + val namedGroupingExprs = groupingExprs.map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.toString)() + } + DataFrame( + df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan)) + } + + private[this] def aggregateNumericColumns(f: Expression => Expression): Seq[NamedExpression] = { + df.numericColumns.map { c => + val a = f(c) + Alias(a, a.toString)() + } + } + + private[this] def strToExpr(expr: String): (Expression => Expression) = { + expr.toLowerCase match { + case "avg" | "average" | "mean" => Average + case "max" => Max + case "min" => Min + case "sum" => Sum + case "count" | "size" => Count + } + } + + /** + * (Scala-specific) Compute aggregates by specifying a map from column name to + * aggregate methods. The resulting [[DataFrame]] will also contain the grouping columns. + * + * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. + * {{{ + * // Selects the age of the oldest employee and the aggregate expense for each department + * df.groupBy("department").agg( + * "age" -> "max", + * "expense" -> "sum" + * ) + * }}} + */ + def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { + agg((aggExpr +: aggExprs).toMap) + } + + /** + * (Scala-specific) Compute aggregates by specifying a map from column name to + * aggregate methods. The resulting [[DataFrame]] will also contain the grouping columns. + * + * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. + * {{{ + * // Selects the age of the oldest employee and the aggregate expense for each department + * df.groupBy("department").agg(Map( + * "age" -> "max", + * "expense" -> "sum" + * )) + * }}} + */ + def agg(exprs: Map[String, String]): DataFrame = { + exprs.map { case (colName, expr) => + val a = strToExpr(expr)(df(colName).expr) + Alias(a, a.toString)() + }.toSeq + } + + /** + * (Java-specific) Compute aggregates by specifying a map from column name to + * aggregate methods. The resulting [[DataFrame]] will also contain the grouping columns. + * + * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. + * {{{ + * // Selects the age of the oldest employee and the aggregate expense for each department + * import com.google.common.collect.ImmutableMap; + * df.groupBy("department").agg(ImmutableMap.<String, String>builder() + * .put("age", "max") + * .put("expense", "sum") + * .build()); + * }}} + */ + def agg(exprs: java.util.Map[String, String]): DataFrame = { + agg(exprs.toMap) + } + + /** + * Compute aggregates by specifying a series of aggregate columns. Unlike other methods in this + * class, the resulting [[DataFrame]] won't automatically include the grouping columns. + * + * The available aggregate methods are defined in [[org.apache.spark.sql.Dsl]]. + * + * {{{ + * // Selects the age of the oldest employee and the aggregate expense for each department + * + * // Scala: + * import org.apache.spark.sql.dsl._ + * df.groupBy("department").agg($"department", max($"age"), sum($"expense")) + * + * // Java: + * import static org.apache.spark.sql.Dsl.*; + * df.groupBy("department").agg(col("department"), max(col("age")), sum(col("expense"))); + * }}} + */ + @scala.annotation.varargs + def agg(expr: Column, exprs: Column*): DataFrame = { + val aggExprs = (expr +: exprs).map(_.expr).map { + case expr: NamedExpression => expr + case expr: Expression => Alias(expr, expr.toString)() + } + DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan)) + } + + /** + * Count the number of rows for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ + def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")()) + + /** + * Compute the average value for each numeric columns for each group. This is an alias for `avg`. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ + def mean(): DataFrame = aggregateNumericColumns(Average) + + /** + * Compute the max value for each numeric columns for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ + def max(): DataFrame = aggregateNumericColumns(Max) + + /** + * Compute the mean value for each numeric columns for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ + def avg(): DataFrame = aggregateNumericColumns(Average) + + /** + * Compute the min value for each numeric column for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ + def min(): DataFrame = aggregateNumericColumns(Min) + + /** + * Compute the sum for each numeric columns for each group. + * The resulting [[DataFrame]] will also contain the grouping columns. + */ + def sum(): DataFrame = aggregateNumericColumns(Sum) +} http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala deleted file mode 100644 index 7963cb0..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataFrame.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.language.implicitConversions -import scala.collection.JavaConversions._ - -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.{Literal => LiteralExpr} -import org.apache.spark.sql.catalyst.plans.logical.Aggregate - - -/** - * A set of methods for aggregations on a [[DataFrame]], created by [[DataFrame.groupBy]]. - */ -class GroupedDataFrame protected[sql](df: DataFrameImpl, groupingExprs: Seq[Expression]) { - - private[this] implicit def toDataFrame(aggExprs: Seq[NamedExpression]): DataFrame = { - val namedGroupingExprs = groupingExprs.map { - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.toString)() - } - DataFrame( - df.sqlContext, Aggregate(groupingExprs, namedGroupingExprs ++ aggExprs, df.logicalPlan)) - } - - private[this] def aggregateNumericColumns(f: Expression => Expression): Seq[NamedExpression] = { - df.numericColumns.map { c => - val a = f(c) - Alias(a, a.toString)() - } - } - - private[this] def strToExpr(expr: String): (Expression => Expression) = { - expr.toLowerCase match { - case "avg" | "average" | "mean" => Average - case "max" => Max - case "min" => Min - case "sum" => Sum - case "count" | "size" => Count - } - } - - /** - * (Scala-specific) Compute aggregates by specifying a map from column name to - * aggregate methods. The resulting [[DataFrame]] will also contain the grouping columns. - * - * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. - * {{{ - * // Selects the age of the oldest employee and the aggregate expense for each department - * df.groupBy("department").agg( - * "age" -> "max", - * "expense" -> "sum" - * ) - * }}} - */ - def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = { - agg((aggExpr +: aggExprs).toMap) - } - - /** - * (Scala-specific) Compute aggregates by specifying a map from column name to - * aggregate methods. The resulting [[DataFrame]] will also contain the grouping columns. - * - * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. - * {{{ - * // Selects the age of the oldest employee and the aggregate expense for each department - * df.groupBy("department").agg(Map( - * "age" -> "max", - * "expense" -> "sum" - * )) - * }}} - */ - def agg(exprs: Map[String, String]): DataFrame = { - exprs.map { case (colName, expr) => - val a = strToExpr(expr)(df(colName).expr) - Alias(a, a.toString)() - }.toSeq - } - - /** - * (Java-specific) Compute aggregates by specifying a map from column name to - * aggregate methods. The resulting [[DataFrame]] will also contain the grouping columns. - * - * The available aggregate methods are `avg`, `max`, `min`, `sum`, `count`. - * {{{ - * // Selects the age of the oldest employee and the aggregate expense for each department - * import com.google.common.collect.ImmutableMap; - * df.groupBy("department").agg(ImmutableMap.<String, String>builder() - * .put("age", "max") - * .put("expense", "sum") - * .build()); - * }}} - */ - def agg(exprs: java.util.Map[String, String]): DataFrame = { - agg(exprs.toMap) - } - - /** - * Compute aggregates by specifying a series of aggregate columns. Unlike other methods in this - * class, the resulting [[DataFrame]] won't automatically include the grouping columns. - * - * The available aggregate methods are defined in [[org.apache.spark.sql.Dsl]]. - * - * {{{ - * // Selects the age of the oldest employee and the aggregate expense for each department - * - * // Scala: - * import org.apache.spark.sql.dsl._ - * df.groupBy("department").agg($"department", max($"age"), sum($"expense")) - * - * // Java: - * import static org.apache.spark.sql.Dsl.*; - * df.groupBy("department").agg(col("department"), max(col("age")), sum(col("expense"))); - * }}} - */ - @scala.annotation.varargs - def agg(expr: Column, exprs: Column*): DataFrame = { - val aggExprs = (expr +: exprs).map(_.expr).map { - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, expr.toString)() - } - DataFrame(df.sqlContext, Aggregate(groupingExprs, aggExprs, df.logicalPlan)) - } - - /** - * Count the number of rows for each group. - * The resulting [[DataFrame]] will also contain the grouping columns. - */ - def count(): DataFrame = Seq(Alias(Count(LiteralExpr(1)), "count")()) - - /** - * Compute the average value for each numeric columns for each group. This is an alias for `avg`. - * The resulting [[DataFrame]] will also contain the grouping columns. - */ - def mean(): DataFrame = aggregateNumericColumns(Average) - - /** - * Compute the max value for each numeric columns for each group. - * The resulting [[DataFrame]] will also contain the grouping columns. - */ - def max(): DataFrame = aggregateNumericColumns(Max) - - /** - * Compute the mean value for each numeric columns for each group. - * The resulting [[DataFrame]] will also contain the grouping columns. - */ - def avg(): DataFrame = aggregateNumericColumns(Average) - - /** - * Compute the min value for each numeric column for each group. - * The resulting [[DataFrame]] will also contain the grouping columns. - */ - def min(): DataFrame = aggregateNumericColumns(Min) - - /** - * Compute the sum for each numeric columns for each group. - * The resulting [[DataFrame]] will also contain the grouping columns. - */ - def sum(): DataFrame = aggregateNumericColumns(Sum) -} http://git-wip-us.apache.org/repos/asf/spark/blob/1fbd124b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 6b032d3..fedd7f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -90,9 +90,9 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten override def apply(condition: Column): DataFrame = err() - override def groupBy(cols: Column*): GroupedDataFrame = err() + override def groupBy(cols: Column*): GroupedData = err() - override def groupBy(col1: String, cols: String*): GroupedDataFrame = err() + override def groupBy(col1: String, cols: String*): GroupedData = err() override def limit(n: Int): DataFrame = err() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org