Repository: spark
Updated Branches:
  refs/heads/branch-2.0 567093596 -> 9f18c8f38


[SPARK-16088][SPARKR] update setJobGroup, cancelJobGroup, clearJobGroup

## What changes were proposed in this pull request?

Updated setJobGroup, cancelJobGroup, clearJobGroup to not require 
sc/SparkContext as parameter.
Also updated roxygen2 doc and R programming guide on deprecations.

## How was this patch tested?

unit tests

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #13838 from felixcheung/rjobgroup.

(cherry picked from commit b5a997667f4c0e514217da6df5af37b8b849dfdf)
Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f18c8f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f18c8f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f18c8f3

Branch: refs/heads/branch-2.0
Commit: 9f18c8f386af558ed72b88ad372835f25e807e79
Parents: 5670935
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Thu Jun 23 09:45:01 2016 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Thu Jun 23 09:45:11 2016 -0700

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                      |  1 -
 R/pkg/R/context.R                        | 10 +---
 R/pkg/R/sparkR.R                         | 68 ++++++++++++++++++++++-----
 R/pkg/R/utils.R                          |  8 ++++
 R/pkg/inst/tests/testthat/test_context.R | 10 ++--
 docs/sparkr.md                           |  2 +
 6 files changed, 75 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9f18c8f3/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 725cbf2..f856979 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -55,7 +55,6 @@ setMethod("initialize", "SparkDataFrame", function(.Object, 
sdf, isCached) {
   .Object
 })
 
-#' @rdname SparkDataFrame
 #' @export
 #' @param sdf A Java object reference to the backing Scala DataFrame
 #' @param isCached TRUE if the SparkDataFrame is cached

http://git-wip-us.apache.org/repos/asf/spark/blob/9f18c8f3/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index dd0ceae..2538bb2 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -264,10 +264,7 @@ setCheckpointDir <- function(sc, dirName) {
 #'}
 #' @note spark.lapply since 2.0.0
 spark.lapply <- function(list, func) {
-  if (!exists(".sparkRjsc", envir = .sparkREnv)) {
-    stop("SparkR has not been initialized. Please call sparkR.session()")
-  }
-  sc <- get(".sparkRjsc", envir = .sparkREnv)
+  sc <- getSparkContext()
   rdd <- parallelize(sc, list, length(list))
   results <- map(rdd, func)
   local <- collect(results)
@@ -287,9 +284,6 @@ spark.lapply <- function(list, func) {
 #'}
 #' @note setLogLevel since 2.0.0
 setLogLevel <- function(level) {
-  if (!exists(".sparkRjsc", envir = .sparkREnv)) {
-    stop("SparkR has not been initialized. Please call sparkR.session()")
-  }
-  sc <- get(".sparkRjsc", envir = .sparkREnv)
+  sc <- getSparkContext()
   callJMethod(sc, "setLogLevel", level)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9f18c8f3/R/pkg/R/sparkR.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R
index 2b6e124..62659b0 100644
--- a/R/pkg/R/sparkR.R
+++ b/R/pkg/R/sparkR.R
@@ -392,47 +392,91 @@ sparkR.session <- function(
 #' Assigns a group ID to all the jobs started by this thread until the group 
ID is set to a
 #' different value or cleared.
 #'
-#' @param sc existing spark context
 #' @param groupid the ID to be assigned to job groups
 #' @param description description for the job group ID
 #' @param interruptOnCancel flag to indicate if the job is interrupted on job 
cancellation
+#' @rdname setJobGroup
+#' @name setJobGroup
 #' @examples
 #'\dontrun{
-#' sc <- sparkR.init()
-#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE)
+#' sparkR.session()
+#' setJobGroup("myJobGroup", "My job group description", TRUE)
 #'}
 #' @note setJobGroup since 1.5.0
-setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
+#' @method setJobGroup default
+setJobGroup.default <- function(groupId, description, interruptOnCancel) {
+  sc <- getSparkContext()
   callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel)
 }
 
+setJobGroup <- function(sc, groupId, description, interruptOnCancel) {
+  if (class(sc) == "jobj" && any(grepl("JavaSparkContext", 
getClassName.jobj(sc)))) {
+    .Deprecated("setJobGroup(groupId, description, interruptOnCancel)",
+                old = "setJobGroup(sc, groupId, description, 
interruptOnCancel)")
+    setJobGroup.default(groupId, description, interruptOnCancel)
+  } else {
+    # Parameter order is shifted
+    groupIdToUse <- sc
+    descriptionToUse <- groupId
+    interruptOnCancelToUse <- description
+    setJobGroup.default(groupIdToUse, descriptionToUse, interruptOnCancelToUse)
+  }
+}
+
 #' Clear current job group ID and its description
 #'
-#' @param sc existing spark context
+#' @rdname clearJobGroup
+#' @name clearJobGroup
 #' @examples
 #'\dontrun{
-#' sc <- sparkR.init()
-#' clearJobGroup(sc)
+#' sparkR.session()
+#' clearJobGroup()
 #'}
 #' @note clearJobGroup since 1.5.0
-clearJobGroup <- function(sc) {
+#' @method clearJobGroup default
+clearJobGroup.default <- function() {
+  sc <- getSparkContext()
   callJMethod(sc, "clearJobGroup")
 }
 
+clearJobGroup <- function(sc) {
+  if (!missing(sc) &&
+      class(sc) == "jobj" &&
+      any(grepl("JavaSparkContext", getClassName.jobj(sc)))) {
+    .Deprecated("clearJobGroup()", old = "clearJobGroup(sc)")
+  }
+  clearJobGroup.default()
+}
+
+
 #' Cancel active jobs for the specified group
 #'
-#' @param sc existing spark context
 #' @param groupId the ID of job group to be cancelled
+#' @rdname cancelJobGroup
+#' @name cancelJobGroup
 #' @examples
 #'\dontrun{
-#' sc <- sparkR.init()
-#' cancelJobGroup(sc, "myJobGroup")
+#' sparkR.session()
+#' cancelJobGroup("myJobGroup")
 #'}
 #' @note cancelJobGroup since 1.5.0
-cancelJobGroup <- function(sc, groupId) {
+#' @method cancelJobGroup default
+cancelJobGroup.default <- function(groupId) {
+  sc <- getSparkContext()
   callJMethod(sc, "cancelJobGroup", groupId)
 }
 
+cancelJobGroup <- function(sc, groupId) {
+  if (class(sc) == "jobj" && any(grepl("JavaSparkContext", 
getClassName.jobj(sc)))) {
+    .Deprecated("cancelJobGroup(groupId)", old = "cancelJobGroup(sc, groupId)")
+    cancelJobGroup.default(groupId)
+  } else {
+    # Parameter order is shifted
+    groupIdToUse <- sc
+    cancelJobGroup.default(groupIdToUse)
+  }
+}
+
 sparkConfToSubmitOps <- new.env()
 sparkConfToSubmitOps[["spark.driver.memory"]]           <- "--driver-memory"
 sparkConfToSubmitOps[["spark.driver.extraClassPath"]]   <- 
"--driver-class-path"

http://git-wip-us.apache.org/repos/asf/spark/blob/9f18c8f3/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index d5c062d..e75bfbf 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -685,3 +685,11 @@ launchScript <- function(script, combinedArgs, capture = 
FALSE) {
     system2(script, combinedArgs, wait = capture, stdout = capture)
   }
 }
+
+getSparkContext <- function() {
+  if (!exists(".sparkRjsc", envir = .sparkREnv)) {
+    stop("SparkR has not been initialized. Please call sparkR.session()")
+  }
+  sc <- get(".sparkRjsc", envir = .sparkREnv)
+  sc
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9f18c8f3/R/pkg/inst/tests/testthat/test_context.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_context.R 
b/R/pkg/inst/tests/testthat/test_context.R
index 3d232df..2a1bd61 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -100,9 +100,13 @@ test_that("rdd GC across sparkR.stop", {
 
 test_that("job group functions can be called", {
   sc <- sparkR.sparkContext()
-  setJobGroup(sc, "groupId", "job description", TRUE)
-  cancelJobGroup(sc, "groupId")
-  clearJobGroup(sc)
+  setJobGroup("groupId", "job description", TRUE)
+  cancelJobGroup("groupId")
+  clearJobGroup()
+
+  suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE))
+  suppressWarnings(cancelJobGroup(sc, "groupId"))
+  suppressWarnings(clearJobGroup(sc))
   sparkR.session.stop()
 })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9f18c8f3/docs/sparkr.md
----------------------------------------------------------------------
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 9e74e4a..32ef815 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -428,3 +428,5 @@ You can inspect the search path in R with 
[`search()`](https://stat.ethz.ch/R-ma
  - The `sqlContext` parameter is no longer required for these functions: 
`createDataFrame`, `as.DataFrame`, `read.json`, `jsonFile`, `read.parquet`, 
`parquetFile`, `read.text`, `sql`, `tables`, `tableNames`, `cacheTable`, 
`uncacheTable`, `clearCache`, `dropTempTable`, `read.df`, `loadDF`, 
`createExternalTable`.
  - The method `registerTempTable` has been deprecated to be replaced by 
`createOrReplaceTempView`.
  - The method `dropTempTable` has been deprecated to be replaced by 
`dropTempView`.
+ - The `sc` SparkContext parameter is no longer required for these functions: 
`setJobGroup`, `clearJobGroup`, `cancelJobGroup`
+ 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to