Repository: spark Updated Branches: refs/heads/master e5949c287 -> 8220d5265
[SPARK-6972][SQL] Add Coalesce to DataFrame Author: Michael Armbrust <mich...@databricks.com> Closes #5545 from marmbrus/addCoalesce and squashes the following commits: 9fdf3f6 [Michael Armbrust] [SPARK-6972][SQL] Add Coalesce to DataFrame Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8220d526 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8220d526 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8220d526 Branch: refs/heads/master Commit: 8220d5265f1bbea9dfdaeec4f2d06d7fe24c0bc3 Parents: e5949c2 Author: Michael Armbrust <mich...@databricks.com> Authored: Thu Apr 16 21:49:26 2015 -0500 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Apr 16 21:49:26 2015 -0500 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/DataFrame.scala | 14 ++++++++++++++ .../src/main/scala/org/apache/spark/sql/RDDApi.scala | 2 ++ .../scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ 3 files changed, 24 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8220d526/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 3235f85..17c21f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -909,6 +909,20 @@ class DataFrame private[sql]( } /** + * Returns a new [[DataFrame]] that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. + * @group rdd + */ + override def coalesce(numPartitions: Int): DataFrame = { + sqlContext.createDataFrame( + queryExecution.toRdd.coalesce(numPartitions), + schema, + needsConversion = false) + } + + /** * Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. * @group dfops */ http://git-wip-us.apache.org/repos/asf/spark/blob/8220d526/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala index ba4373f..63dbab1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RDDApi.scala @@ -61,5 +61,7 @@ private[sql] trait RDDApi[T] { def repartition(numPartitions: Int): DataFrame + def coalesce(numPartitions: Int): DataFrame + def distinct: DataFrame } http://git-wip-us.apache.org/repos/asf/spark/blob/8220d526/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 44a7d1e..3250ab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -178,6 +178,14 @@ class DataFrameSuite extends QueryTest { testData.select('key).collect().toSeq) } + test("coalesce") { + assert(testData.select('key).coalesce(1).rdd.partitions.size === 1) + + checkAnswer( + testData.select('key).coalesce(1).select('key), + testData.select('key).collect().toSeq) + } + test("groupBy") { checkAnswer( testData2.groupBy("a").agg($"a", sum($"b")), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org