Repository: spark
Updated Branches:
  refs/heads/master fd7d141d8 -> ea0a5eef2


[SPARK-22924][SPARKR] R API for sortWithinPartitions

## What changes were proposed in this pull request?

Add to `arrange` the option to sort only within partition

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #20118 from felixcheung/rsortwithinpartition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea0a5eef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea0a5eef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea0a5eef

Branch: refs/heads/master
Commit: ea0a5eef2238daa68a15b60a6f1a74c361216140
Parents: fd7d141
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Sun Dec 31 02:50:00 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Sun Dec 31 02:50:00 2017 +0900

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                   | 14 ++++++++++----
 R/pkg/tests/fulltests/test_sparkSQL.R |  5 +++++
 2 files changed, 15 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ea0a5eef/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index ace49da..fe238f6 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2297,6 +2297,7 @@ setClassUnion("characterOrColumn", c("character", 
"Column"))
 #' @param ... additional sorting fields
 #' @param decreasing a logical argument indicating sorting order for columns 
when
 #'                   a character vector is specified for col
+#' @param withinPartitions a logical argument indicating whether to sort only 
within each partition
 #' @return A SparkDataFrame where all elements are sorted.
 #' @family SparkDataFrame functions
 #' @aliases arrange,SparkDataFrame,Column-method
@@ -2312,16 +2313,21 @@ setClassUnion("characterOrColumn", c("character", 
"Column"))
 #' arrange(df, asc(df$col1), desc(abs(df$col2)))
 #' arrange(df, "col1", decreasing = TRUE)
 #' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
+#' arrange(df, "col1", "col2", withinPartitions = TRUE)
 #' }
 #' @note arrange(SparkDataFrame, Column) since 1.4.0
 setMethod("arrange",
           signature(x = "SparkDataFrame", col = "Column"),
-          function(x, col, ...) {
+          function(x, col, ..., withinPartitions = FALSE) {
               jcols <- lapply(list(col, ...), function(c) {
                 c@jc
               })
 
-            sdf <- callJMethod(x@sdf, "sort", jcols)
+            if (withinPartitions) {
+              sdf <- callJMethod(x@sdf, "sortWithinPartitions", jcols)
+            } else {
+              sdf <- callJMethod(x@sdf, "sort", jcols)
+            }
             dataFrame(sdf)
           })
 
@@ -2332,7 +2338,7 @@ setMethod("arrange",
 #' @note arrange(SparkDataFrame, character) since 1.4.0
 setMethod("arrange",
           signature(x = "SparkDataFrame", col = "character"),
-          function(x, col, ..., decreasing = FALSE) {
+          function(x, col, ..., decreasing = FALSE, withinPartitions = FALSE) {
 
             # all sorting columns
             by <- list(col, ...)
@@ -2356,7 +2362,7 @@ setMethod("arrange",
               }
             })
 
-            do.call("arrange", c(x, jcols))
+            do.call("arrange", c(x, jcols, withinPartitions = 
withinPartitions))
           })
 
 #' @rdname arrange

http://git-wip-us.apache.org/repos/asf/spark/blob/ea0a5eef/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 1b7d53f..5197838 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2130,6 +2130,11 @@ test_that("arrange() and orderBy() on a DataFrame", {
 
   sorted7 <- arrange(df, "name", decreasing = FALSE)
   expect_equal(collect(sorted7)[2, "age"], 19)
+
+  df <- createDataFrame(cars, numPartitions = 10)
+  expect_equal(getNumPartitions(df), 10)
+  sorted8 <- arrange(df, "dist", withinPartitions = TRUE)
+  expect_equal(collect(sorted8)[5:6, "dist"], c(22, 10))
 })
 
 test_that("filter() on a DataFrame", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to