Repository: spark
Updated Branches:
  refs/heads/master 1a22cf1e9 -> 926a93e54


[SPARK-14088][SQL] Some Dataset API touch-up

## What changes were proposed in this pull request?
1. Deprecated unionAll. It is pretty confusing to have both "union" and 
"unionAll" when the two do the same thing in Spark but are different in SQL.
2. Rename reduce in KeyValueGroupedDataset to reduceGroups so it is more 
consistent with rest of the functions in KeyValueGroupedDataset. Also makes it 
more obvious what "reduce" and "reduceGroups" mean. Previously it was confusing 
because it could be reducing a Dataset, or just reducing groups.
3. Added a "name" function, which is more natural to name columns than "as" for 
non-SQL users.
4. Remove "subtract" function since it is just an alias for "except".

## How was this patch tested?
All changes should be covered by existing tests. Also added couple test cases 
to cover "name".

Author: Reynold Xin <r...@databricks.com>

Closes #11908 from rxin/SPARK-14088.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/926a93e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/926a93e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/926a93e5

Branch: refs/heads/master
Commit: 926a93e54b83f1ee596096f3301fef015705b627
Parents: 1a22cf1
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Mar 22 23:43:09 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Mar 22 23:43:09 2016 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  1 +
 python/pyspark/sql/column.py                    |  2 ++
 python/pyspark/sql/dataframe.py                 | 14 +++++++--
 .../scala/org/apache/spark/sql/Column.scala     | 29 ++++++++++++++-----
 .../scala/org/apache/spark/sql/Dataset.scala    | 30 +++++++-------------
 .../spark/sql/KeyValueGroupedDataset.scala      | 11 ++-----
 .../org/apache/spark/sql/JavaDatasetSuite.java  |  4 +--
 .../spark/sql/ColumnExpressionSuite.scala       |  3 +-
 .../org/apache/spark/sql/DatasetSuite.scala     |  2 +-
 9 files changed, 56 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 68e9c50..42eafcb 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -317,6 +317,7 @@ object MimaExcludes {
         
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.longRddToDataFrameHolder"),
         
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.intRddToDataFrameHolder"),
         
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.GroupedDataset"),
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.Dataset.subtract"),
 
         
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"),
         
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions"),

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/python/pyspark/sql/column.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 19ec6fc..43e9bae 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -315,6 +315,8 @@ class Column(object):
             sc = SparkContext._active_spark_context
             return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))
 
+    name = copy_func(alias, sinceversion=2.0, doc=":func:`name` is an alias 
for :func:`alias`.")
+
     @ignore_unicode_prefix
     @since(1.3)
     def cast(self, dataType):

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 7e1854c..5cfc348 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -911,14 +911,24 @@ class DataFrame(object):
         """
         return self.groupBy().agg(*exprs)
 
+    @since(2.0)
+    def union(self, other):
+        """ Return a new :class:`DataFrame` containing union of rows in this
+        frame and another frame.
+
+        This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
+        (that does deduplication of elements), use this function followed by a 
distinct.
+        """
+        return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx)
+
     @since(1.3)
     def unionAll(self, other):
         """ Return a new :class:`DataFrame` containing union of rows in this
         frame and another frame.
 
-        This is equivalent to `UNION ALL` in SQL.
+        .. note:: Deprecated in 2.0, use union instead.
         """
-        return DataFrame(self._jdf.unionAll(other._jdf), self.sql_ctx)
+        return self.union(other)
 
     @since(1.3)
     def intersect(self, other):

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 622a62a..d64736e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -856,7 +856,7 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
    * @group expr_ops
    * @since 1.4.0
    */
-  def alias(alias: String): Column = as(alias)
+  def alias(alias: String): Column = name(alias)
 
   /**
    * Gives the column an alias.
@@ -871,12 +871,7 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
    * @group expr_ops
    * @since 1.3.0
    */
-  def as(alias: String): Column = withExpr {
-    expr match {
-      case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = 
Some(ne.metadata))
-      case other => Alias(other, alias)()
-    }
-  }
+  def as(alias: String): Column = name(alias)
 
   /**
    * (Scala-specific) Assigns the given aliases to the results of a table 
generating function.
@@ -937,6 +932,26 @@ class Column(protected[sql] val expr: Expression) extends 
Logging {
   }
 
   /**
+   * Gives the column a name (alias).
+   * {{{
+   *   // Renames colA to colB in select output.
+   *   df.select($"colA".name("colB"))
+   * }}}
+   *
+   * If the current column has metadata associated with it, this metadata will 
be propagated
+   * to the new column.  If this not desired, use `as` with explicitly empty 
metadata.
+   *
+   * @group expr_ops
+   * @since 2.0.0
+   */
+  def name(alias: String): Column = withExpr {
+    expr match {
+      case ne: NamedExpression => Alias(expr, alias)(explicitMetadata = 
Some(ne.metadata))
+      case other => Alias(other, alias)()
+    }
+  }
+
+  /**
    * Casts the column to a different data type.
    * {{{
    *   // Casts colA to IntegerType.

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index be0dfe7..31864d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1350,20 +1350,24 @@ class Dataset[T] private[sql](
    * @group typedrel
    * @since 2.0.0
    */
-  def unionAll(other: Dataset[T]): Dataset[T] = withTypedPlan {
-    // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
-    // using union to union many files or partitions.
-    CombineUnions(Union(logicalPlan, other.logicalPlan))
-  }
+  @deprecated("use union()", "2.0.0")
+  def unionAll(other: Dataset[T]): Dataset[T] = union(other)
 
   /**
    * Returns a new [[Dataset]] containing union of rows in this Dataset and 
another Dataset.
    * This is equivalent to `UNION ALL` in SQL.
    *
+   * To do a SQL-style set union (that does deduplication of elements), use 
this function followed
+   * by a [[distinct]].
+   *
    * @group typedrel
    * @since 2.0.0
    */
-  def union(other: Dataset[T]): Dataset[T] = unionAll(other)
+  def union(other: Dataset[T]): Dataset[T] = withTypedPlan {
+    // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
+    // using union to union many files or partitions.
+    CombineUnions(Union(logicalPlan, other.logicalPlan))
+  }
 
   /**
    * Returns a new [[Dataset]] containing rows only in both this Dataset and 
another Dataset.
@@ -1394,18 +1398,6 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Returns a new [[Dataset]] containing rows in this Dataset but not in 
another Dataset.
-   * This is equivalent to `EXCEPT` in SQL.
-   *
-   * Note that, equality checking is performed directly on the encoded 
representation of the data
-   * and thus is not affected by a custom `equals` function defined on `T`.
-   *
-   * @group typedrel
-   * @since 2.0.0
-   */
-  def subtract(other: Dataset[T]): Dataset[T] = except(other)
-
-  /**
    * Returns a new [[Dataset]] by sampling a fraction of rows.
    *
    * @param withReplacement Sample with replacement or not.
@@ -1756,7 +1748,7 @@ class Dataset[T] private[sql](
         outputCols.map(c => Column(Cast(colToAgg(Column(c).expr), 
StringType)).as(c))
       }
 
-      val row = agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
+      val row = groupBy().agg(aggExprs.head, aggExprs.tail: _*).head().toSeq
 
       // Pivot the data so each summary is one row
       row.grouped(outputCols.size).toSeq.zip(statistics).map { case 
(aggregation, (statistic, _)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index f0f9682..8bb75bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -190,7 +190,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
    *
    * @since 1.6.0
    */
-  def reduce(f: (V, V) => V): Dataset[(K, V)] = {
+  def reduceGroups(f: (V, V) => V): Dataset[(K, V)] = {
     val func = (key: K, it: Iterator[V]) => Iterator((key, it.reduce(f)))
 
     implicit val resultEncoder = ExpressionEncoder.tuple(unresolvedKEncoder, 
unresolvedVEncoder)
@@ -203,15 +203,10 @@ class KeyValueGroupedDataset[K, V] private[sql](
    *
    * @since 1.6.0
    */
-  def reduce(f: ReduceFunction[V]): Dataset[(K, V)] = {
-    reduce(f.call _)
+  def reduceGroups(f: ReduceFunction[V]): Dataset[(K, V)] = {
+    reduceGroups(f.call _)
   }
 
-  // This is here to prevent us from adding overloads that would be ambiguous.
-  @scala.annotation.varargs
-  private def agg(exprs: Column*): DataFrame =
-    groupedData.agg(withEncoder(exprs.head), exprs.tail.map(withEncoder): _*)
-
   private def withEncoder(c: Column): Column = c match {
     case tc: TypedColumn[_, _] =>
       tc.withInputType(resolvedVEncoder.bind(dataAttributes), dataAttributes)

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 3bff129..18f17a8 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -204,7 +204,7 @@ public class JavaDatasetSuite implements Serializable {
 
     Assert.assertEquals(asSet("1a", "3foobar"), 
toSet(flatMapped.collectAsList()));
 
-    Dataset<Tuple2<Integer, String>> reduced = grouped.reduce(new 
ReduceFunction<String>() {
+    Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups(new 
ReduceFunction<String>() {
       @Override
       public String call(String v1, String v2) throws Exception {
         return v1 + v2;
@@ -300,7 +300,7 @@ public class JavaDatasetSuite implements Serializable {
       Arrays.asList("abc", "abc", "xyz", "xyz", "foo", "foo", "abc", "abc", 
"xyz"),
       unioned.collectAsList());
 
-    Dataset<String> subtracted = ds.subtract(ds2);
+    Dataset<String> subtracted = ds.except(ds2);
     Assert.assertEquals(Arrays.asList("abc", "abc"), 
subtracted.collectAsList());
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index c2434e4..351b03b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -105,10 +105,11 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
       Row("a") :: Nil)
   }
 
-  test("alias") {
+  test("alias and name") {
     val df = Seq((1, Seq(1, 2, 3))).toDF("a", "intList")
     assert(df.select(df("a").as("b")).columns.head === "b")
     assert(df.select(df("a").alias("b")).columns.head === "b")
+    assert(df.select(df("a").name("b")).columns.head === "b")
   }
 
   test("as propagates metadata") {

http://git-wip-us.apache.org/repos/asf/spark/blob/926a93e5/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 677f84e..0bcc512 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -305,7 +305,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
 
   test("groupBy function, reduce") {
     val ds = Seq("abc", "xyz", "hello").toDS()
-    val agged = ds.groupByKey(_.length).reduce(_ + _)
+    val agged = ds.groupByKey(_.length).reduceGroups(_ + _)
 
     checkDataset(
       agged,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to