Repository: spark Updated Branches: refs/heads/master a3dccd24c -> a481794ca
[SPARK-25007][R] Add array_intersect/array_except/array_union/shuffle to SparkR ## What changes were proposed in this pull request? Add the R version of array_intersect/array_except/array_union/shuffle ## How was this patch tested? Add test in test_sparkSQL.R Author: Huaxin Gao <huax...@us.ibm.com> Closes #22291 from huaxingao/spark-25007. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a481794c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a481794c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a481794c Branch: refs/heads/master Commit: a481794ca9a5edb87982679cd0e95146f668fe78 Parents: a3dccd2 Author: Huaxin Gao <huax...@us.ibm.com> Authored: Sun Sep 2 00:06:19 2018 -0700 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Sep 2 00:06:19 2018 -0700 ---------------------------------------------------------------------- R/pkg/NAMESPACE | 4 ++ R/pkg/R/functions.R | 59 +++++++++++++++++++++++++++++- R/pkg/R/generics.R | 16 ++++++++ R/pkg/tests/fulltests/test_sparkSQL.R | 19 ++++++++++ 4 files changed, 97 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a481794c/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 0fd0848..96ff389 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -204,6 +204,8 @@ exportMethods("%<=>%", "approxQuantile", "array_contains", "array_distinct", + "array_except", + "array_intersect", "array_join", "array_max", "array_min", @@ -212,6 +214,7 @@ exportMethods("%<=>%", "array_repeat", "array_sort", "arrays_overlap", + "array_union", "arrays_zip", "asc", "ascii", @@ -355,6 +358,7 @@ exportMethods("%<=>%", "shiftLeft", "shiftRight", "shiftRightUnsigned", + "shuffle", "sd", "sign", "signum", http://git-wip-us.apache.org/repos/asf/spark/blob/a481794c/R/pkg/R/functions.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 2929a00..d157acc 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -208,7 +208,7 @@ NULL #' # Dataframe used throughout this doc #' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) #' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp)) -#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1))) +#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1), shuffle(tmp$v1))) #' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1), array_distinct(tmp$v1))) #' head(select(tmp, array_position(tmp$v1, 21), array_repeat(df$mpg, 3), array_sort(tmp$v1))) #' head(select(tmp, flatten(tmp$v1), reverse(tmp$v1), array_remove(tmp$v1, 21))) @@ -223,6 +223,8 @@ NULL #' head(select(tmp3, element_at(tmp3$v3, "Valiant"))) #' tmp4 <- mutate(df, v4 = create_array(df$mpg, df$cyl), v5 = create_array(df$cyl, df$hp)) #' head(select(tmp4, concat(tmp4$v4, tmp4$v5), arrays_overlap(tmp4$v4, tmp4$v5))) +#' head(select(tmp4, array_except(tmp4$v4, tmp4$v5), array_intersect(tmp4$v4, tmp4$v5))) +#' head(select(tmp4, array_union(tmp4$v4, tmp4$v5))) #' head(select(tmp4, arrays_zip(tmp4$v4, tmp4$v5), map_from_arrays(tmp4$v4, tmp4$v5))) #' head(select(tmp, concat(df$mpg, df$cyl, df$hp))) #' tmp5 <- mutate(df, v6 = create_array(df$model, df$model)) @@ -3025,6 +3027,34 @@ setMethod("array_distinct", }) #' @details +#' \code{array_except}: Returns an array of the elements in the first array but not in the second +#' array, without duplicates. The order of elements in the result is not determined. +#' +#' @rdname column_collection_functions +#' @aliases array_except array_except,Column-method +#' @note array_except since 2.4.0 +setMethod("array_except", + signature(x = "Column", y = "Column"), + function(x, y) { + jc <- callJStatic("org.apache.spark.sql.functions", "array_except", x@jc, y@jc) + column(jc) + }) + +#' @details +#' \code{array_intersect}: Returns an array of the elements in the intersection of the given two +#' arrays, without duplicates. +#' +#' @rdname column_collection_functions +#' @aliases array_intersect array_intersect,Column-method +#' @note array_intersect since 2.4.0 +setMethod("array_intersect", + signature(x = "Column", y = "Column"), + function(x, y) { + jc <- callJStatic("org.apache.spark.sql.functions", "array_intersect", x@jc, y@jc) + column(jc) + }) + +#' @details #' \code{array_join}: Concatenates the elements of column using the delimiter. #' Null values are replaced with nullReplacement if set, otherwise they are ignored. #' @@ -3150,6 +3180,20 @@ setMethod("arrays_overlap", }) #' @details +#' \code{array_union}: Returns an array of the elements in the union of the given two arrays, +#' without duplicates. +#' +#' @rdname column_collection_functions +#' @aliases array_union array_union,Column-method +#' @note array_union since 2.4.0 +setMethod("array_union", + signature(x = "Column", y = "Column"), + function(x, y) { + jc <- callJStatic("org.apache.spark.sql.functions", "array_union", x@jc, y@jc) + column(jc) + }) + +#' @details #' \code{arrays_zip}: Returns a merged array of structs in which the N-th struct contains all N-th #' values of input arrays. #' @@ -3168,6 +3212,19 @@ setMethod("arrays_zip", }) #' @details +#' \code{shuffle}: Returns a random permutation of the given array. +#' +#' @rdname column_collection_functions +#' @aliases shuffle shuffle,Column-method +#' @note shuffle since 2.4.0 +setMethod("shuffle", + signature(x = "Column"), + function(x) { + jc <- callJStatic("org.apache.spark.sql.functions", "shuffle", x@jc) + column(jc) + }) + +#' @details #' \code{flatten}: Creates a single array from an array of arrays. #' If a structure of nested arrays is deeper than two levels, only one level of nesting is removed. #' http://git-wip-us.apache.org/repos/asf/spark/blob/a481794c/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index f6f1849..27c1b31 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -769,6 +769,14 @@ setGeneric("array_distinct", function(x) { standardGeneric("array_distinct") }) #' @rdname column_collection_functions #' @name NULL +setGeneric("array_except", function(x, y) { standardGeneric("array_except") }) + +#' @rdname column_collection_functions +#' @name NULL +setGeneric("array_intersect", function(x, y) { standardGeneric("array_intersect") }) + +#' @rdname column_collection_functions +#' @name NULL setGeneric("array_join", function(x, delimiter, ...) { standardGeneric("array_join") }) #' @rdname column_collection_functions @@ -801,6 +809,10 @@ setGeneric("arrays_overlap", function(x, y) { standardGeneric("arrays_overlap") #' @rdname column_collection_functions #' @name NULL +setGeneric("array_union", function(x, y) { standardGeneric("array_union") }) + +#' @rdname column_collection_functions +#' @name NULL setGeneric("arrays_zip", function(x, ...) { standardGeneric("arrays_zip") }) #' @rdname column_string_functions @@ -1220,6 +1232,10 @@ setGeneric("shiftRight", function(y, x) { standardGeneric("shiftRight") }) #' @name NULL setGeneric("shiftRightUnsigned", function(y, x) { standardGeneric("shiftRightUnsigned") }) +#' @rdname column_collection_functions +#' @name NULL +setGeneric("shuffle", function(x) { standardGeneric("shuffle") }) + #' @rdname column_math_functions #' @name NULL setGeneric("signum", function(x) { standardGeneric("signum") }) http://git-wip-us.apache.org/repos/asf/spark/blob/a481794c/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index e1f3cf3..17e4a97 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1598,6 +1598,25 @@ test_that("column functions", { result <- collect(select(df, element_at(df$map, "y")))[[1]] expect_equal(result, 2) + # Test array_except(), array_intersect() and array_union() + df <- createDataFrame(list(list(list(1L, 2L, 3L), list(3L, 1L)), + list(list(1L, 2L), list(3L, 4L)), + list(list(1L, 2L, 3L), list(3L, 4L)))) + result1 <- collect(select(df, array_except(df[[1]], df[[2]])))[[1]] + expect_equal(result1, list(list(2L), list(1L, 2L), list(1L, 2L))) + + result2 <- collect(select(df, array_intersect(df[[1]], df[[2]])))[[1]] + expect_equal(result2, list(list(1L, 3L), list(), list(3L))) + + result3 <- collect(select(df, array_union(df[[1]], df[[2]])))[[1]] + expect_equal(result3, list(list(1L, 2L, 3L), list(1L, 2L, 3L, 4L), list(1L, 2L, 3L, 4L))) + + # Test shuffle() + df <- createDataFrame(list(list(list(1L, 20L, 3L, 5L)), list(list(4L, 5L, 6L, 7L)))) + result <- collect(select(df, shuffle(df[[1]])))[[1]] + expect_true(setequal(result[[1]], c(1L, 20L, 3L, 5L))) + expect_true(setequal(result[[2]], c(4L, 5L, 6L, 7L))) + # Test that stats::lag is working expect_equal(length(lag(ldeaths, 12)), 72) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org