Repository: spark
Updated Branches:
  refs/heads/master de289bf27 -> abf5e4285


[SPARK-11504][SQL] API audit for distributeBy and localSort

1. Renamed localSort -> sortWithinPartitions to avoid ambiguity in "local"
2. distributeBy -> repartition to match the existing repartition.

Author: Reynold Xin <r...@databricks.com>

Closes #9470 from rxin/SPARK-11504.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abf5e428
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abf5e428
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abf5e428

Branch: refs/heads/master
Commit: abf5e4285d97b148a32cf22f5287511198175cb6
Parents: de289bf
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Nov 4 12:33:47 2015 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Nov 4 12:33:47 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 132 +++++++++++--------
 .../org/apache/spark/sql/CachedTableSuite.scala |  20 ++-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  44 +++----
 3 files changed, 113 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abf5e428/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 5e9c7ef..d3a2249 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -241,18 +241,6 @@ class DataFrame private[sql](
     sb.toString()
   }
 
-  private[sql] def sortInternal(global: Boolean, sortExprs: Seq[Column]): 
DataFrame = {
-    val sortOrder: Seq[SortOrder] = sortExprs.map { col =>
-      col.expr match {
-        case expr: SortOrder =>
-          expr
-        case expr: Expression =>
-          SortOrder(expr, Ascending)
-      }
-    }
-    Sort(sortOrder, global = global, logicalPlan)
-  }
-
   override def toString: String = {
     try {
       schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", 
", ", "]")
@@ -620,6 +608,32 @@ class DataFrame private[sql](
   }
 
   /**
+   * Returns a new [[DataFrame]] with each partition sorted by the given 
expressions.
+   *
+   * This is the same operation as "SORT BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def sortWithinPartitions(sortCol: String, sortCols: String*): DataFrame = {
+    sortWithinPartitions(sortCol, sortCols : _*)
+  }
+
+  /**
+   * Returns a new [[DataFrame]] with each partition sorted by the given 
expressions.
+   *
+   * This is the same operation as "SORT BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def sortWithinPartitions(sortExprs: Column*): DataFrame = {
+    sortInternal(global = false, sortExprs)
+  }
+
+  /**
    * Returns a new [[DataFrame]] sorted by the specified column, all in 
ascending order.
    * {{{
    *   // The following 3 are equivalent
@@ -645,7 +659,7 @@ class DataFrame private[sql](
    */
   @scala.annotation.varargs
   def sort(sortExprs: Column*): DataFrame = {
-    sortInternal(true, sortExprs)
+    sortInternal(global = true, sortExprs)
   }
 
   /**
@@ -667,44 +681,6 @@ class DataFrame private[sql](
   def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*)
 
   /**
-   * Returns a new [[DataFrame]] partitioned by the given partitioning 
expressions into
-   * `numPartitions`. The resulting DataFrame is hash partitioned.
-   * @group dfops
-   * @since 1.6.0
-   */
-  def distributeBy(partitionExprs: Seq[Column], numPartitions: Int): DataFrame 
= {
-    RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, 
Some(numPartitions))
-  }
-
-  /**
-   * Returns a new [[DataFrame]] partitioned by the given partitioning 
expressions preserving
-   * the existing number of partitions. The resulting DataFrame is hash 
partitioned.
-   * @group dfops
-   * @since 1.6.0
-   */
-  def distributeBy(partitionExprs: Seq[Column]): DataFrame = {
-    RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, None)
-  }
-
-  /**
-   * Returns a new [[DataFrame]] with each partition sorted by the given 
expressions.
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def localSort(sortCol: String, sortCols: String*): DataFrame = 
localSort(sortCol, sortCols : _*)
-
-  /**
-   * Returns a new [[DataFrame]] with each partition sorted by the given 
expressions.
-   * @group dfops
-   * @since 1.6.0
-   */
-  @scala.annotation.varargs
-  def localSort(sortExprs: Column*): DataFrame = {
-    sortInternal(false, sortExprs)
-  }
-
-  /**
    * Selects column based on the column name and return it as a [[Column]].
    * Note that the column name can also reference to a nested column like 
`a.b`.
    * @group dfops
@@ -798,7 +774,9 @@ class DataFrame private[sql](
    * SQL expressions.
    *
    * {{{
+   *   // The following are equivalent:
    *   df.selectExpr("colA", "colB as newName", "abs(colC)")
+   *   df.select(expr("colA"), expr("colB as newName"), expr("abs(colC)"))
    * }}}
    * @group dfops
    * @since 1.3.0
@@ -1524,7 +1502,7 @@ class DataFrame private[sql](
 
   /**
    * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
-   * @group rdd
+   * @group dfops
    * @since 1.3.0
    */
   def repartition(numPartitions: Int): DataFrame = {
@@ -1532,6 +1510,34 @@ class DataFrame private[sql](
   }
 
   /**
+   * Returns a new [[DataFrame]] partitioned by the given partitioning 
expressions into
+   * `numPartitions`. The resulting DataFrame is hash partitioned.
+   *
+   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def repartition(numPartitions: Int, partitionExprs: Column*): DataFrame = {
+    RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
Some(numPartitions))
+  }
+
+  /**
+   * Returns a new [[DataFrame]] partitioned by the given partitioning 
expressions preserving
+   * the existing number of partitions. The resulting DataFrame is hash 
partitioned.
+   *
+   * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
+   *
+   * @group dfops
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def repartition(partitionExprs: Column*): DataFrame = {
+    RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, 
numPartitions = None)
+  }
+
+  /**
    * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
    * Similar to coalesce defined on an [[RDD]], this operation results in a 
narrow dependency, e.g.
    * if you go from 1000 partitions to 100 partitions, there will not be a 
shuffle, instead each of
@@ -2016,6 +2022,12 @@ class DataFrame private[sql](
     write.mode(SaveMode.Append).insertInto(tableName)
   }
 
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+  // End of deprecated methods
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+
   /**
    * Wrap a DataFrame action to track all Spark jobs in the body so that we 
can connect them with
    * an execution.
@@ -2045,10 +2057,16 @@ class DataFrame private[sql](
     }
   }
 
-  ////////////////////////////////////////////////////////////////////////////
-  ////////////////////////////////////////////////////////////////////////////
-  // End of deprecated methods
-  ////////////////////////////////////////////////////////////////////////////
-  ////////////////////////////////////////////////////////////////////////////
+  private def sortInternal(global: Boolean, sortExprs: Seq[Column]): DataFrame 
= {
+    val sortOrder: Seq[SortOrder] = sortExprs.map { col =>
+      col.expr match {
+        case expr: SortOrder =>
+          expr
+        case expr: Expression =>
+          SortOrder(expr, Ascending)
+      }
+    }
+    Sort(sortOrder, global = global, logicalPlan)
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/abf5e428/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 605954b..dbcb011 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -379,8 +379,8 @@ class CachedTableSuite extends QueryTest with 
SharedSQLContext {
     // Set up two tables distributed in the same way. Try this with the data 
distributed into
     // different number of partitions.
     for (numPartitions <- 1 until 10 by 4) {
-      testData.distributeBy(Column("key") :: Nil, 
numPartitions).registerTempTable("t1")
-      testData2.distributeBy(Column("a") :: Nil, 
numPartitions).registerTempTable("t2")
+      testData.repartition(numPartitions, $"key").registerTempTable("t1")
+      testData2.repartition(numPartitions, $"a").registerTempTable("t2")
       sqlContext.cacheTable("t1")
       sqlContext.cacheTable("t2")
 
@@ -401,8 +401,20 @@ class CachedTableSuite extends QueryTest with 
SharedSQLContext {
     }
 
     // Distribute the tables into non-matching number of partitions. Need to 
shuffle.
-    testData.distributeBy(Column("key") :: Nil, 6).registerTempTable("t1")
-    testData2.distributeBy(Column("a") :: Nil, 3).registerTempTable("t2")
+    testData.repartition(6, $"key").registerTempTable("t1")
+    testData2.repartition(3, $"a").registerTempTable("t2")
+    sqlContext.cacheTable("t1")
+    sqlContext.cacheTable("t2")
+
+    verifyNumExchanges(sql("SELECT * FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a"), 
2)
+    sqlContext.uncacheTable("t1")
+    sqlContext.uncacheTable("t2")
+    sqlContext.dropTempTable("t1")
+    sqlContext.dropTempTable("t2")
+
+    // One side of join is not partitioned in the desired way. Need to shuffle.
+    testData.repartition(6, $"value").registerTempTable("t1")
+    testData2.repartition(6, $"a").registerTempTable("t2")
     sqlContext.cacheTable("t1")
     sqlContext.cacheTable("t2")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abf5e428/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a9e6413..84a616d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1044,79 +1044,79 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
   test("distributeBy and localSort") {
     val original = testData.repartition(1)
     assert(original.rdd.partitions.length == 1)
-    val df = original.distributeBy(Column("key") :: Nil, 5)
-    assert(df.rdd.partitions.length  == 5)
+    val df = original.repartition(5, $"key")
+    assert(df.rdd.partitions.length == 5)
     checkAnswer(original.select(), df.select())
 
-    val df2 = original.distributeBy(Column("key") :: Nil, 10)
-    assert(df2.rdd.partitions.length  == 10)
+    val df2 = original.repartition(10, $"key")
+    assert(df2.rdd.partitions.length == 10)
     checkAnswer(original.select(), df2.select())
 
     // Group by the column we are distributed by. This should generate a plan 
with no exchange
     // between the aggregates
-    val df3 = testData.distributeBy(Column("key") :: 
Nil).groupBy("key").count()
+    val df3 = testData.repartition($"key").groupBy("key").count()
     verifyNonExchangingAgg(df3)
-    verifyNonExchangingAgg(testData.distributeBy(Column("key") :: 
Column("value") :: Nil)
+    verifyNonExchangingAgg(testData.repartition($"key", $"value")
       .groupBy("key", "value").count())
 
     // Grouping by just the first distributeBy expr, need to exchange.
-    verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") 
:: Nil)
+    verifyExchangingAgg(testData.repartition($"key", $"value")
       .groupBy("key").count())
 
     val data = sqlContext.sparkContext.parallelize(
       (1 to 100).map(i => TestData2(i % 10, i))).toDF()
 
     // Distribute and order by.
-    val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc)
+    val df4 = data.repartition($"a").sortWithinPartitions($"b".desc)
     // Walk each partition and verify that it is sorted descending and does 
not contain all
     // the values.
-    df4.rdd.foreachPartition(p => {
+    df4.rdd.foreachPartition { p =>
       var previousValue: Int = -1
       var allSequential: Boolean = true
-      p.foreach(r => {
+      p.foreach { r =>
         val v: Int = r.getInt(1)
         if (previousValue != -1) {
           if (previousValue < v) throw new SparkException("Partition is not 
ordered.")
           if (v + 1 != previousValue) allSequential = false
         }
         previousValue = v
-      })
+      }
       if (allSequential) throw new SparkException("Partition should not be 
globally ordered")
-    })
+    }
 
     // Distribute and order by with multiple order bys
-    val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, 
$"a".asc)
+    val df5 = data.repartition(2, $"a").sortWithinPartitions($"b".asc, 
$"a".asc)
     // Walk each partition and verify that it is sorted ascending
-    df5.rdd.foreachPartition(p => {
+    df5.rdd.foreachPartition { p =>
       var previousValue: Int = -1
       var allSequential: Boolean = true
-      p.foreach(r => {
+      p.foreach { r =>
         val v: Int = r.getInt(1)
         if (previousValue != -1) {
           if (previousValue > v) throw new SparkException("Partition is not 
ordered.")
           if (v - 1 != previousValue) allSequential = false
         }
         previousValue = v
-      })
+      }
       if (allSequential) throw new SparkException("Partition should not be all 
sequential")
-    })
+    }
 
     // Distribute into one partition and order by. This partition should 
contain all the values.
-    val df6 = data.distributeBy(Column("a") :: Nil, 1).localSort($"b".asc)
+    val df6 = data.repartition(1, $"a").sortWithinPartitions($"b".asc)
     // Walk each partition and verify that it is sorted descending and not 
globally sorted.
-    df6.rdd.foreachPartition(p => {
+    df6.rdd.foreachPartition { p =>
       var previousValue: Int = -1
       var allSequential: Boolean = true
-      p.foreach(r => {
+      p.foreach { r =>
         val v: Int = r.getInt(1)
         if (previousValue != -1) {
           if (previousValue > v) throw new SparkException("Partition is not 
ordered.")
           if (v - 1 != previousValue) allSequential = false
         }
         previousValue = v
-      })
+      }
       if (!allSequential) throw new SparkException("Partition should contain 
all sequential values")
-    })
+    }
   }
 
   test("fix case sensitivity of partition by") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to