Repository: spark Updated Branches: refs/heads/master 65afd3ce8 -> 008a60dd3
http://git-wip-us.apache.org/repos/asf/spark/blob/008a60dd/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5838955..380e8eb 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -17,353 +17,353 @@ ############ RDD Actions and Transformations ############ -#' @rdname aggregateRDD -#' @seealso reduce -#' @export +# @rdname aggregateRDD +# @seealso reduce +# @export setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") }) -#' @rdname cache-methods -#' @export +# @rdname cache-methods +# @export setGeneric("cache", function(x) { standardGeneric("cache") }) -#' @rdname coalesce -#' @seealso repartition -#' @export +# @rdname coalesce +# @seealso repartition +# @export setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") }) -#' @rdname checkpoint-methods -#' @export +# @rdname checkpoint-methods +# @export setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") }) -#' @rdname collect-methods -#' @export +# @rdname collect-methods +# @export setGeneric("collect", function(x, ...) { standardGeneric("collect") }) -#' @rdname collect-methods -#' @export +# @rdname collect-methods +# @export setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") }) -#' @rdname collect-methods -#' @export +# @rdname collect-methods +# @export setGeneric("collectPartition", function(x, partitionId) { standardGeneric("collectPartition") }) -#' @rdname count -#' @export +# @rdname count +# @export setGeneric("count", function(x) { standardGeneric("count") }) -#' @rdname countByValue -#' @export +# @rdname countByValue +# @export setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) -#' @rdname distinct -#' @export +# @rdname distinct +# @export setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") }) -#' @rdname filterRDD -#' @export +# @rdname filterRDD +# @export setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") }) -#' @rdname first -#' @export +# @rdname first +# @export setGeneric("first", function(x) { standardGeneric("first") }) -#' @rdname flatMap -#' @export +# @rdname flatMap +# @export setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") }) -#' @rdname fold -#' @seealso reduce -#' @export +# @rdname fold +# @seealso reduce +# @export setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") }) -#' @rdname foreach -#' @export +# @rdname foreach +# @export setGeneric("foreach", function(x, func) { standardGeneric("foreach") }) -#' @rdname foreach -#' @export +# @rdname foreach +# @export setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") }) # The jrdd accessor function. setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") }) -#' @rdname glom -#' @export +# @rdname glom +# @export setGeneric("glom", function(x) { standardGeneric("glom") }) -#' @rdname keyBy -#' @export +# @rdname keyBy +# @export setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") }) -#' @rdname lapplyPartition -#' @export +# @rdname lapplyPartition +# @export setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") }) -#' @rdname lapplyPartitionsWithIndex -#' @export +# @rdname lapplyPartitionsWithIndex +# @export setGeneric("lapplyPartitionsWithIndex", function(X, FUN) { standardGeneric("lapplyPartitionsWithIndex") }) -#' @rdname lapply -#' @export +# @rdname lapply +# @export setGeneric("map", function(X, FUN) { standardGeneric("map") }) -#' @rdname lapplyPartition -#' @export +# @rdname lapplyPartition +# @export setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") }) -#' @rdname lapplyPartitionsWithIndex -#' @export +# @rdname lapplyPartitionsWithIndex +# @export setGeneric("mapPartitionsWithIndex", function(X, FUN) { standardGeneric("mapPartitionsWithIndex") }) -#' @rdname maximum -#' @export +# @rdname maximum +# @export setGeneric("maximum", function(x) { standardGeneric("maximum") }) -#' @rdname minimum -#' @export +# @rdname minimum +# @export setGeneric("minimum", function(x) { standardGeneric("minimum") }) -#' @rdname sumRDD -#' @export +# @rdname sumRDD +# @export setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") }) -#' @rdname name -#' @export +# @rdname name +# @export setGeneric("name", function(x) { standardGeneric("name") }) -#' @rdname numPartitions -#' @export +# @rdname numPartitions +# @export setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") }) -#' @rdname persist -#' @export +# @rdname persist +# @export setGeneric("persist", function(x, newLevel) { standardGeneric("persist") }) -#' @rdname pipeRDD -#' @export +# @rdname pipeRDD +# @export setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")}) -#' @rdname reduce -#' @export +# @rdname reduce +# @export setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) -#' @rdname repartition -#' @seealso coalesce -#' @export +# @rdname repartition +# @seealso coalesce +# @export setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") }) -#' @rdname sampleRDD -#' @export +# @rdname sampleRDD +# @export setGeneric("sampleRDD", function(x, withReplacement, fraction, seed) { standardGeneric("sampleRDD") }) -#' @rdname saveAsObjectFile -#' @seealso objectFile -#' @export +# @rdname saveAsObjectFile +# @seealso objectFile +# @export setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") }) -#' @rdname saveAsTextFile -#' @export +# @rdname saveAsTextFile +# @export setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") }) -#' @rdname setName -#' @export +# @rdname setName +# @export setGeneric("setName", function(x, name) { standardGeneric("setName") }) -#' @rdname sortBy -#' @export +# @rdname sortBy +# @export setGeneric("sortBy", function(x, func, ascending = TRUE, numPartitions = 1) { standardGeneric("sortBy") }) -#' @rdname take -#' @export +# @rdname take +# @export setGeneric("take", function(x, num) { standardGeneric("take") }) -#' @rdname takeOrdered -#' @export +# @rdname takeOrdered +# @export setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") }) -#' @rdname takeSample -#' @export +# @rdname takeSample +# @export setGeneric("takeSample", function(x, withReplacement, num, seed) { standardGeneric("takeSample") }) -#' @rdname top -#' @export +# @rdname top +# @export setGeneric("top", function(x, num) { standardGeneric("top") }) -#' @rdname unionRDD -#' @export +# @rdname unionRDD +# @export setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") }) -#' @rdname unpersist-methods -#' @export +# @rdname unpersist-methods +# @export setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) -#' @rdname zipRDD -#' @export +# @rdname zipRDD +# @export setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") }) -#' @rdname zipRDD -#' @export +# @rdname zipRDD +# @export setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") }, signature = "...") -#' @rdname zipWithIndex -#' @seealso zipWithUniqueId -#' @export +# @rdname zipWithIndex +# @seealso zipWithUniqueId +# @export setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") }) -#' @rdname zipWithUniqueId -#' @seealso zipWithIndex -#' @export +# @rdname zipWithUniqueId +# @seealso zipWithIndex +# @export setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") }) ############ Binary Functions ############# -#' @rdname cartesian -#' @export +# @rdname cartesian +# @export setGeneric("cartesian", function(x, other) { standardGeneric("cartesian") }) -#' @rdname countByKey -#' @export +# @rdname countByKey +# @export setGeneric("countByKey", function(x) { standardGeneric("countByKey") }) -#' @rdname flatMapValues -#' @export +# @rdname flatMapValues +# @export setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") }) -#' @rdname intersection -#' @export +# @rdname intersection +# @export setGeneric("intersection", function(x, other, numPartitions = 1) { standardGeneric("intersection") }) -#' @rdname keys -#' @export +# @rdname keys +# @export setGeneric("keys", function(x) { standardGeneric("keys") }) -#' @rdname lookup -#' @export +# @rdname lookup +# @export setGeneric("lookup", function(x, key) { standardGeneric("lookup") }) -#' @rdname mapValues -#' @export +# @rdname mapValues +# @export setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") }) -#' @rdname sampleByKey -#' @export +# @rdname sampleByKey +# @export setGeneric("sampleByKey", function(x, withReplacement, fractions, seed) { standardGeneric("sampleByKey") }) -#' @rdname values -#' @export +# @rdname values +# @export setGeneric("values", function(x) { standardGeneric("values") }) ############ Shuffle Functions ############ -#' @rdname aggregateByKey -#' @seealso foldByKey, combineByKey -#' @export +# @rdname aggregateByKey +# @seealso foldByKey, combineByKey +# @export setGeneric("aggregateByKey", function(x, zeroValue, seqOp, combOp, numPartitions) { standardGeneric("aggregateByKey") }) -#' @rdname cogroup -#' @export +# @rdname cogroup +# @export setGeneric("cogroup", function(..., numPartitions) { standardGeneric("cogroup") }, signature = "...") -#' @rdname combineByKey -#' @seealso groupByKey, reduceByKey -#' @export +# @rdname combineByKey +# @seealso groupByKey, reduceByKey +# @export setGeneric("combineByKey", function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { standardGeneric("combineByKey") }) -#' @rdname foldByKey -#' @seealso aggregateByKey, combineByKey -#' @export +# @rdname foldByKey +# @seealso aggregateByKey, combineByKey +# @export setGeneric("foldByKey", function(x, zeroValue, func, numPartitions) { standardGeneric("foldByKey") }) -#' @rdname join-methods -#' @export +# @rdname join-methods +# @export setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") }) -#' @rdname groupByKey -#' @seealso reduceByKey -#' @export +# @rdname groupByKey +# @seealso reduceByKey +# @export setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") }) -#' @rdname join-methods -#' @export +# @rdname join-methods +# @export setGeneric("join", function(x, y, ...) { standardGeneric("join") }) -#' @rdname join-methods -#' @export +# @rdname join-methods +# @export setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") }) -#' @rdname partitionBy -#' @export +# @rdname partitionBy +# @export setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") }) -#' @rdname reduceByKey -#' @seealso groupByKey -#' @export +# @rdname reduceByKey +# @seealso groupByKey +# @export setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")}) -#' @rdname reduceByKeyLocally -#' @seealso reduceByKey -#' @export +# @rdname reduceByKeyLocally +# @seealso reduceByKey +# @export setGeneric("reduceByKeyLocally", function(x, combineFunc) { standardGeneric("reduceByKeyLocally") }) -#' @rdname join-methods -#' @export +# @rdname join-methods +# @export setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") }) -#' @rdname sortByKey -#' @export +# @rdname sortByKey +# @export setGeneric("sortByKey", function(x, ascending = TRUE, numPartitions = 1) { standardGeneric("sortByKey") }) -#' @rdname subtract -#' @export +# @rdname subtract +# @export setGeneric("subtract", function(x, other, numPartitions = 1) { standardGeneric("subtract") }) -#' @rdname subtractByKey -#' @export +# @rdname subtractByKey +# @export setGeneric("subtractByKey", function(x, other, numPartitions = 1) { standardGeneric("subtractByKey") @@ -372,8 +372,8 @@ setGeneric("subtractByKey", ################### Broadcast Variable Methods ################# -#' @rdname broadcast -#' @export +# @rdname broadcast +# @export setGeneric("value", function(bcast) { standardGeneric("value") }) @@ -477,8 +477,8 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") }) #' @export setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") }) -#' @rdname tojson -#' @export +# @rdname tojson +# @export setGeneric("toJSON", function(x) { standardGeneric("toJSON") }) #' @rdname DataFrame http://git-wip-us.apache.org/repos/asf/spark/blob/008a60dd/R/pkg/R/pairRDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index edeb8d9..7694652 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -21,23 +21,23 @@ NULL ############ Actions and Transformations ############ -#' Look up elements of a key in an RDD -#' -#' @description -#' \code{lookup} returns a list of values in this RDD for key key. -#' -#' @param x The RDD to collect -#' @param key The key to look up for -#' @return a list of values in this RDD for key key -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' pairs <- list(c(1, 1), c(2, 2), c(1, 3)) -#' rdd <- parallelize(sc, pairs) -#' lookup(rdd, 1) # list(1, 3) -#'} -#' @rdname lookup -#' @aliases lookup,RDD-method +# Look up elements of a key in an RDD +# +# @description +# \code{lookup} returns a list of values in this RDD for key key. +# +# @param x The RDD to collect +# @param key The key to look up for +# @return a list of values in this RDD for key key +# @examples +#\dontrun{ +# sc <- sparkR.init() +# pairs <- list(c(1, 1), c(2, 2), c(1, 3)) +# rdd <- parallelize(sc, pairs) +# lookup(rdd, 1) # list(1, 3) +#} +# @rdname lookup +# @aliases lookup,RDD-method setMethod("lookup", signature(x = "RDD", key = "ANY"), function(x, key) { @@ -49,21 +49,21 @@ setMethod("lookup", collect(valsRDD) }) -#' Count the number of elements for each key, and return the result to the -#' master as lists of (key, count) pairs. -#' -#' Same as countByKey in Spark. -#' -#' @param x The RDD to count keys. -#' @return list of (key, count) pairs, where count is number of each key in rdd. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1))) -#' countByKey(rdd) # ("a", 2L), ("b", 1L) -#'} -#' @rdname countByKey -#' @aliases countByKey,RDD-method +# Count the number of elements for each key, and return the result to the +# master as lists of (key, count) pairs. +# +# Same as countByKey in Spark. +# +# @param x The RDD to count keys. +# @return list of (key, count) pairs, where count is number of each key in rdd. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1))) +# countByKey(rdd) # ("a", 2L), ("b", 1L) +#} +# @rdname countByKey +# @aliases countByKey,RDD-method setMethod("countByKey", signature(x = "RDD"), function(x) { @@ -71,17 +71,17 @@ setMethod("countByKey", countByValue(keys) }) -#' Return an RDD with the keys of each tuple. -#' -#' @param x The RDD from which the keys of each tuple is returned. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) -#' collect(keys(rdd)) # list(1, 3) -#'} -#' @rdname keys -#' @aliases keys,RDD +# Return an RDD with the keys of each tuple. +# +# @param x The RDD from which the keys of each tuple is returned. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) +# collect(keys(rdd)) # list(1, 3) +#} +# @rdname keys +# @aliases keys,RDD setMethod("keys", signature(x = "RDD"), function(x) { @@ -91,17 +91,17 @@ setMethod("keys", lapply(x, func) }) -#' Return an RDD with the values of each tuple. -#' -#' @param x The RDD from which the values of each tuple is returned. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) -#' collect(values(rdd)) # list(2, 4) -#'} -#' @rdname values -#' @aliases values,RDD +# Return an RDD with the values of each tuple. +# +# @param x The RDD from which the values of each tuple is returned. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) +# collect(values(rdd)) # list(2, 4) +#} +# @rdname values +# @aliases values,RDD setMethod("values", signature(x = "RDD"), function(x) { @@ -111,23 +111,23 @@ setMethod("values", lapply(x, func) }) -#' Applies a function to all values of the elements, without modifying the keys. -#' -#' The same as `mapValues()' in Spark. -#' -#' @param X The RDD to apply the transformation. -#' @param FUN the transformation to apply on the value of each element. -#' @return a new RDD created by the transformation. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, 1:10) -#' makePairs <- lapply(rdd, function(x) { list(x, x) }) -#' collect(mapValues(makePairs, function(x) { x * 2) }) -#' Output: list(list(1,2), list(2,4), list(3,6), ...) -#'} -#' @rdname mapValues -#' @aliases mapValues,RDD,function-method +# Applies a function to all values of the elements, without modifying the keys. +# +# The same as `mapValues()' in Spark. +# +# @param X The RDD to apply the transformation. +# @param FUN the transformation to apply on the value of each element. +# @return a new RDD created by the transformation. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, 1:10) +# makePairs <- lapply(rdd, function(x) { list(x, x) }) +# collect(mapValues(makePairs, function(x) { x * 2) }) +# Output: list(list(1,2), list(2,4), list(3,6), ...) +#} +# @rdname mapValues +# @aliases mapValues,RDD,function-method setMethod("mapValues", signature(X = "RDD", FUN = "function"), function(X, FUN) { @@ -137,23 +137,23 @@ setMethod("mapValues", lapply(X, func) }) -#' Pass each value in the key-value pair RDD through a flatMap function without -#' changing the keys; this also retains the original RDD's partitioning. -#' -#' The same as 'flatMapValues()' in Spark. -#' -#' @param X The RDD to apply the transformation. -#' @param FUN the transformation to apply on the value of each element. -#' @return a new RDD created by the transformation. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) -#' collect(flatMapValues(rdd, function(x) { x })) -#' Output: list(list(1,1), list(1,2), list(2,3), list(2,4)) -#'} -#' @rdname flatMapValues -#' @aliases flatMapValues,RDD,function-method +# Pass each value in the key-value pair RDD through a flatMap function without +# changing the keys; this also retains the original RDD's partitioning. +# +# The same as 'flatMapValues()' in Spark. +# +# @param X The RDD to apply the transformation. +# @param FUN the transformation to apply on the value of each element. +# @return a new RDD created by the transformation. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4)))) +# collect(flatMapValues(rdd, function(x) { x })) +# Output: list(list(1,1), list(1,2), list(2,3), list(2,4)) +#} +# @rdname flatMapValues +# @aliases flatMapValues,RDD,function-method setMethod("flatMapValues", signature(X = "RDD", FUN = "function"), function(X, FUN) { @@ -165,30 +165,30 @@ setMethod("flatMapValues", ############ Shuffle Functions ############ -#' Partition an RDD by key -#' -#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -#' For each element of this RDD, the partitioner is used to compute a hash -#' function and the RDD is partitioned using this hash value. -#' -#' @param x The RDD to partition. Should be an RDD where each element is -#' list(K, V) or c(K, V). -#' @param numPartitions Number of partitions to create. -#' @param ... Other optional arguments to partitionBy. -#' -#' @param partitionFunc The partition function to use. Uses a default hashCode -#' function if not provided -#' @return An RDD partitioned using the specified partitioner. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -#' rdd <- parallelize(sc, pairs) -#' parts <- partitionBy(rdd, 2L) -#' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4) -#'} -#' @rdname partitionBy -#' @aliases partitionBy,RDD,integer-method +# Partition an RDD by key +# +# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). +# For each element of this RDD, the partitioner is used to compute a hash +# function and the RDD is partitioned using this hash value. +# +# @param x The RDD to partition. Should be an RDD where each element is +# list(K, V) or c(K, V). +# @param numPartitions Number of partitions to create. +# @param ... Other optional arguments to partitionBy. +# +# @param partitionFunc The partition function to use. Uses a default hashCode +# function if not provided +# @return An RDD partitioned using the specified partitioner. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) +# rdd <- parallelize(sc, pairs) +# parts <- partitionBy(rdd, 2L) +# collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4) +#} +# @rdname partitionBy +# @aliases partitionBy,RDD,integer-method setMethod("partitionBy", signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions, partitionFunc = hashCode) { @@ -234,27 +234,27 @@ setMethod("partitionBy", RDD(r, serializedMode = "byte") }) -#' Group values by key -#' -#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -#' and group values for each key in the RDD into a single sequence. -#' -#' @param x The RDD to group. Should be an RDD where each element is -#' list(K, V) or c(K, V). -#' @param numPartitions Number of partitions to create. -#' @return An RDD where each element is list(K, list(V)) -#' @seealso reduceByKey -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -#' rdd <- parallelize(sc, pairs) -#' parts <- groupByKey(rdd, 2L) -#' grouped <- collect(parts) -#' grouped[[1]] # Should be a list(1, list(2, 4)) -#'} -#' @rdname groupByKey -#' @aliases groupByKey,RDD,integer-method +# Group values by key +# +# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). +# and group values for each key in the RDD into a single sequence. +# +# @param x The RDD to group. Should be an RDD where each element is +# list(K, V) or c(K, V). +# @param numPartitions Number of partitions to create. +# @return An RDD where each element is list(K, list(V)) +# @seealso reduceByKey +# @examples +#\dontrun{ +# sc <- sparkR.init() +# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) +# rdd <- parallelize(sc, pairs) +# parts <- groupByKey(rdd, 2L) +# grouped <- collect(parts) +# grouped[[1]] # Should be a list(1, list(2, 4)) +#} +# @rdname groupByKey +# @aliases groupByKey,RDD,integer-method setMethod("groupByKey", signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions) { @@ -292,28 +292,28 @@ setMethod("groupByKey", lapplyPartition(shuffled, groupVals) }) -#' Merge values by key -#' -#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -#' and merges the values for each key using an associative reduce function. -#' -#' @param x The RDD to reduce by key. Should be an RDD where each element is -#' list(K, V) or c(K, V). -#' @param combineFunc The associative reduce function to use. -#' @param numPartitions Number of partitions to create. -#' @return An RDD where each element is list(K, V') where V' is the merged -#' value -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -#' rdd <- parallelize(sc, pairs) -#' parts <- reduceByKey(rdd, "+", 2L) -#' reduced <- collect(parts) -#' reduced[[1]] # Should be a list(1, 6) -#'} -#' @rdname reduceByKey -#' @aliases reduceByKey,RDD,integer-method +# Merge values by key +# +# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). +# and merges the values for each key using an associative reduce function. +# +# @param x The RDD to reduce by key. Should be an RDD where each element is +# list(K, V) or c(K, V). +# @param combineFunc The associative reduce function to use. +# @param numPartitions Number of partitions to create. +# @return An RDD where each element is list(K, V') where V' is the merged +# value +# @examples +#\dontrun{ +# sc <- sparkR.init() +# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) +# rdd <- parallelize(sc, pairs) +# parts <- reduceByKey(rdd, "+", 2L) +# reduced <- collect(parts) +# reduced[[1]] # Should be a list(1, 6) +#} +# @rdname reduceByKey +# @aliases reduceByKey,RDD,integer-method setMethod("reduceByKey", signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"), function(x, combineFunc, numPartitions) { @@ -333,27 +333,27 @@ setMethod("reduceByKey", lapplyPartition(shuffled, reduceVals) }) -#' Merge values by key locally -#' -#' This function operates on RDDs where every element is of the form list(K, V) or c(K, V). -#' and merges the values for each key using an associative reduce function, but return the -#' results immediately to the driver as an R list. -#' -#' @param x The RDD to reduce by key. Should be an RDD where each element is -#' list(K, V) or c(K, V). -#' @param combineFunc The associative reduce function to use. -#' @return A list of elements of type list(K, V') where V' is the merged value for each key -#' @seealso reduceByKey -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -#' rdd <- parallelize(sc, pairs) -#' reduced <- reduceByKeyLocally(rdd, "+") -#' reduced # list(list(1, 6), list(1.1, 3)) -#'} -#' @rdname reduceByKeyLocally -#' @aliases reduceByKeyLocally,RDD,integer-method +# Merge values by key locally +# +# This function operates on RDDs where every element is of the form list(K, V) or c(K, V). +# and merges the values for each key using an associative reduce function, but return the +# results immediately to the driver as an R list. +# +# @param x The RDD to reduce by key. Should be an RDD where each element is +# list(K, V) or c(K, V). +# @param combineFunc The associative reduce function to use. +# @return A list of elements of type list(K, V') where V' is the merged value for each key +# @seealso reduceByKey +# @examples +#\dontrun{ +# sc <- sparkR.init() +# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) +# rdd <- parallelize(sc, pairs) +# reduced <- reduceByKeyLocally(rdd, "+") +# reduced # list(list(1, 6), list(1.1, 3)) +#} +# @rdname reduceByKeyLocally +# @aliases reduceByKeyLocally,RDD,integer-method setMethod("reduceByKeyLocally", signature(x = "RDD", combineFunc = "ANY"), function(x, combineFunc) { @@ -385,41 +385,41 @@ setMethod("reduceByKeyLocally", convertEnvsToList(merged[[1]], merged[[2]]) }) -#' Combine values by key -#' -#' Generic function to combine the elements for each key using a custom set of -#' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], -#' for a "combined type" C. Note that V and C can be different -- for example, one -#' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). - -#' Users provide three functions: -#' \itemize{ -#' \item createCombiner, which turns a V into a C (e.g., creates a one-element list) -#' \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - -#' \item mergeCombiners, to combine two C's into a single one (e.g., concatentates -#' two lists). -#' } -#' -#' @param x The RDD to combine. Should be an RDD where each element is -#' list(K, V) or c(K, V). -#' @param createCombiner Create a combiner (C) given a value (V) -#' @param mergeValue Merge the given value (V) with an existing combiner (C) -#' @param mergeCombiners Merge two combiners and return a new combiner -#' @param numPartitions Number of partitions to create. -#' @return An RDD where each element is list(K, C) where C is the combined type -#' -#' @seealso groupByKey, reduceByKey -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) -#' rdd <- parallelize(sc, pairs) -#' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L) -#' combined <- collect(parts) -#' combined[[1]] # Should be a list(1, 6) -#'} -#' @rdname combineByKey -#' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method +# Combine values by key +# +# Generic function to combine the elements for each key using a custom set of +# aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], +# for a "combined type" C. Note that V and C can be different -- for example, one +# might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]). + +# Users provide three functions: +# \itemize{ +# \item createCombiner, which turns a V into a C (e.g., creates a one-element list) +# \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) - +# \item mergeCombiners, to combine two C's into a single one (e.g., concatentates +# two lists). +# } +# +# @param x The RDD to combine. Should be an RDD where each element is +# list(K, V) or c(K, V). +# @param createCombiner Create a combiner (C) given a value (V) +# @param mergeValue Merge the given value (V) with an existing combiner (C) +# @param mergeCombiners Merge two combiners and return a new combiner +# @param numPartitions Number of partitions to create. +# @return An RDD where each element is list(K, C) where C is the combined type +# +# @seealso groupByKey, reduceByKey +# @examples +#\dontrun{ +# sc <- sparkR.init() +# pairs <- list(list(1, 2), list(1.1, 3), list(1, 4)) +# rdd <- parallelize(sc, pairs) +# parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L) +# combined <- collect(parts) +# combined[[1]] # Should be a list(1, 6) +#} +# @rdname combineByKey +# @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method setMethod("combineByKey", signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY", mergeCombiners = "ANY", numPartitions = "numeric"), @@ -451,36 +451,36 @@ setMethod("combineByKey", lapplyPartition(shuffled, mergeAfterShuffle) }) -#' Aggregate a pair RDD by each key. -#' -#' Aggregate the values of each key in an RDD, using given combine functions -#' and a neutral "zero value". This function can return a different result type, -#' U, than the type of the values in this RDD, V. Thus, we need one operation -#' for merging a V into a U and one operation for merging two U's, The former -#' operation is used for merging values within a partition, and the latter is -#' used for merging values between partitions. To avoid memory allocation, both -#' of these functions are allowed to modify and return their first argument -#' instead of creating a new U. -#' -#' @param x An RDD. -#' @param zeroValue A neutral "zero value". -#' @param seqOp A function to aggregate the values of each key. It may return -#' a different result type from the type of the values. -#' @param combOp A function to aggregate results of seqOp. -#' @return An RDD containing the aggregation result. -#' @seealso foldByKey, combineByKey -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) -#' zeroValue <- list(0, 0) -#' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } -#' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } -#' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) -#' # list(list(1, list(3, 2)), list(2, list(7, 2))) -#'} -#' @rdname aggregateByKey -#' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method +# Aggregate a pair RDD by each key. +# +# Aggregate the values of each key in an RDD, using given combine functions +# and a neutral "zero value". This function can return a different result type, +# U, than the type of the values in this RDD, V. Thus, we need one operation +# for merging a V into a U and one operation for merging two U's, The former +# operation is used for merging values within a partition, and the latter is +# used for merging values between partitions. To avoid memory allocation, both +# of these functions are allowed to modify and return their first argument +# instead of creating a new U. +# +# @param x An RDD. +# @param zeroValue A neutral "zero value". +# @param seqOp A function to aggregate the values of each key. It may return +# a different result type from the type of the values. +# @param combOp A function to aggregate results of seqOp. +# @return An RDD containing the aggregation result. +# @seealso foldByKey, combineByKey +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) +# zeroValue <- list(0, 0) +# seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } +# combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } +# aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L) +# # list(list(1, list(3, 2)), list(2, list(7, 2))) +#} +# @rdname aggregateByKey +# @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method setMethod("aggregateByKey", signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY", numPartitions = "numeric"), @@ -492,26 +492,26 @@ setMethod("aggregateByKey", combineByKey(x, createCombiner, seqOp, combOp, numPartitions) }) -#' Fold a pair RDD by each key. -#' -#' Aggregate the values of each key in an RDD, using an associative function "func" -#' and a neutral "zero value" which may be added to the result an arbitrary -#' number of times, and must not change the result (e.g., 0 for addition, or -#' 1 for multiplication.). -#' -#' @param x An RDD. -#' @param zeroValue A neutral "zero value". -#' @param func An associative function for folding values of each key. -#' @return An RDD containing the aggregation result. -#' @seealso aggregateByKey, combineByKey -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) -#' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7)) -#'} -#' @rdname foldByKey -#' @aliases foldByKey,RDD,ANY,ANY,integer-method +# Fold a pair RDD by each key. +# +# Aggregate the values of each key in an RDD, using an associative function "func" +# and a neutral "zero value" which may be added to the result an arbitrary +# number of times, and must not change the result (e.g., 0 for addition, or +# 1 for multiplication.). +# +# @param x An RDD. +# @param zeroValue A neutral "zero value". +# @param func An associative function for folding values of each key. +# @return An RDD containing the aggregation result. +# @seealso aggregateByKey, combineByKey +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) +# foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7)) +#} +# @rdname foldByKey +# @aliases foldByKey,RDD,ANY,ANY,integer-method setMethod("foldByKey", signature(x = "RDD", zeroValue = "ANY", func = "ANY", numPartitions = "numeric"), @@ -521,28 +521,28 @@ setMethod("foldByKey", ############ Binary Functions ############# -#' Join two RDDs -#' -#' @description -#' \code{join} This function joins two RDDs where every element is of the form list(K, V). -#' The key types of the two RDDs should be the same. -#' -#' @param x An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param y An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param numPartitions Number of partitions to create. -#' @return a new RDD containing all pairs of elements with matching keys in -#' two input RDDs. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) -#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -#' join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3)) -#'} -#' @rdname join-methods -#' @aliases join,RDD,RDD-method +# Join two RDDs +# +# @description +# \code{join} This function joins two RDDs where every element is of the form list(K, V). +# The key types of the two RDDs should be the same. +# +# @param x An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param y An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param numPartitions Number of partitions to create. +# @return a new RDD containing all pairs of elements with matching keys in +# two input RDDs. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) +# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) +# join(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3)) +#} +# @rdname join-methods +# @aliases join,RDD,RDD-method setMethod("join", signature(x = "RDD", y = "RDD"), function(x, y, numPartitions) { @@ -557,30 +557,30 @@ setMethod("join", doJoin) }) -#' Left outer join two RDDs -#' -#' @description -#' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V). -#' The key types of the two RDDs should be the same. -#' -#' @param x An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param y An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param numPartitions Number of partitions to create. -#' @return For each element (k, v) in x, the resulting RDD will either contain -#' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) -#' if no elements in rdd2 have key k. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) -#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -#' leftOuterJoin(rdd1, rdd2, 2L) -#' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) -#'} -#' @rdname join-methods -#' @aliases leftOuterJoin,RDD,RDD-method +# Left outer join two RDDs +# +# @description +# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V). +# The key types of the two RDDs should be the same. +# +# @param x An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param y An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param numPartitions Number of partitions to create. +# @return For each element (k, v) in x, the resulting RDD will either contain +# all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL)) +# if no elements in rdd2 have key k. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) +# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) +# leftOuterJoin(rdd1, rdd2, 2L) +# # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) +#} +# @rdname join-methods +# @aliases leftOuterJoin,RDD,RDD-method setMethod("leftOuterJoin", signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { @@ -594,30 +594,30 @@ setMethod("leftOuterJoin", joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) -#' Right outer join two RDDs -#' -#' @description -#' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V). -#' The key types of the two RDDs should be the same. -#' -#' @param x An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param y An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param numPartitions Number of partitions to create. -#' @return For each element (k, w) in y, the resulting RDD will either contain -#' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w)) -#' if no elements in x have key k. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3))) -#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) -#' rightOuterJoin(rdd1, rdd2, 2L) -#' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) -#'} -#' @rdname join-methods -#' @aliases rightOuterJoin,RDD,RDD-method +# Right outer join two RDDs +# +# @description +# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V). +# The key types of the two RDDs should be the same. +# +# @param x An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param y An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param numPartitions Number of partitions to create. +# @return For each element (k, w) in y, the resulting RDD will either contain +# all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w)) +# if no elements in x have key k. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3))) +# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) +# rightOuterJoin(rdd1, rdd2, 2L) +# # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) +#} +# @rdname join-methods +# @aliases rightOuterJoin,RDD,RDD-method setMethod("rightOuterJoin", signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { @@ -631,33 +631,33 @@ setMethod("rightOuterJoin", joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) -#' Full outer join two RDDs -#' -#' @description -#' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V). -#' The key types of the two RDDs should be the same. -#' -#' @param x An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param y An RDD to be joined. Should be an RDD where each element is -#' list(K, V). -#' @param numPartitions Number of partitions to create. -#' @return For each element (k, v) in x and (k, w) in y, the resulting RDD -#' will contain all pairs (k, (v, w)) for both (k, v) in x and -#' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements -#' in x/y have key k. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3))) -#' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) -#' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)), -#' # list(1, list(3, 1)), -#' # list(2, list(NULL, 4))) -#' # list(3, list(3, NULL)), -#'} -#' @rdname join-methods -#' @aliases fullOuterJoin,RDD,RDD-method +# Full outer join two RDDs +# +# @description +# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V). +# The key types of the two RDDs should be the same. +# +# @param x An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param y An RDD to be joined. Should be an RDD where each element is +# list(K, V). +# @param numPartitions Number of partitions to create. +# @return For each element (k, v) in x and (k, w) in y, the resulting RDD +# will contain all pairs (k, (v, w)) for both (k, v) in x and +# (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements +# in x/y have key k. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3))) +# rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) +# fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)), +# # list(1, list(3, 1)), +# # list(2, list(NULL, 4))) +# # list(3, list(3, NULL)), +#} +# @rdname join-methods +# @aliases fullOuterJoin,RDD,RDD-method setMethod("fullOuterJoin", signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { @@ -671,23 +671,23 @@ setMethod("fullOuterJoin", joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) -#' For each key k in several RDDs, return a resulting RDD that -#' whose values are a list of values for the key in all RDDs. -#' -#' @param ... Several RDDs. -#' @param numPartitions Number of partitions to create. -#' @return a new RDD containing all pairs of elements with values in a list -#' in all RDDs. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) -#' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) -#' cogroup(rdd1, rdd2, numPartitions = 2L) -#' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list())) -#'} -#' @rdname cogroup -#' @aliases cogroup,RDD-method +# For each key k in several RDDs, return a resulting RDD that +# whose values are a list of values for the key in all RDDs. +# +# @param ... Several RDDs. +# @param numPartitions Number of partitions to create. +# @return a new RDD containing all pairs of elements with values in a list +# in all RDDs. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) +# rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) +# cogroup(rdd1, rdd2, numPartitions = 2L) +# # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list())) +#} +# @rdname cogroup +# @aliases cogroup,RDD-method setMethod("cogroup", "RDD", function(..., numPartitions) { @@ -723,20 +723,20 @@ setMethod("cogroup", group.func) }) -#' Sort a (k, v) pair RDD by k. -#' -#' @param x A (k, v) pair RDD to be sorted. -#' @param ascending A flag to indicate whether the sorting is ascending or descending. -#' @param numPartitions Number of partitions to create. -#' @return An RDD where all (k, v) pair elements are sorted. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3))) -#' collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1)) -#'} -#' @rdname sortByKey -#' @aliases sortByKey,RDD,RDD-method +# Sort a (k, v) pair RDD by k. +# +# @param x A (k, v) pair RDD to be sorted. +# @param ascending A flag to indicate whether the sorting is ascending or descending. +# @param numPartitions Number of partitions to create. +# @return An RDD where all (k, v) pair elements are sorted. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3))) +# collect(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1)) +#} +# @rdname sortByKey +# @aliases sortByKey,RDD,RDD-method setMethod("sortByKey", signature(x = "RDD"), function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) { @@ -785,25 +785,25 @@ setMethod("sortByKey", lapplyPartition(newRDD, partitionFunc) }) -#' Subtract a pair RDD with another pair RDD. -#' -#' Return an RDD with the pairs from x whose keys are not in other. -#' -#' @param x An RDD. -#' @param other An RDD. -#' @param numPartitions Number of the partitions in the result RDD. -#' @return An RDD with the pairs from x whose keys are not in other. -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4), -#' list("b", 5), list("a", 2))) -#' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1))) -#' collect(subtractByKey(rdd1, rdd2)) -#' # list(list("b", 4), list("b", 5)) -#'} -#' @rdname subtractByKey -#' @aliases subtractByKey,RDD +# Subtract a pair RDD with another pair RDD. +# +# Return an RDD with the pairs from x whose keys are not in other. +# +# @param x An RDD. +# @param other An RDD. +# @param numPartitions Number of the partitions in the result RDD. +# @return An RDD with the pairs from x whose keys are not in other. +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4), +# list("b", 5), list("a", 2))) +# rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1))) +# collect(subtractByKey(rdd1, rdd2)) +# # list(list("b", 4), list("b", 5)) +#} +# @rdname subtractByKey +# @aliases subtractByKey,RDD setMethod("subtractByKey", signature(x = "RDD", other = "RDD"), function(x, other, numPartitions = SparkR:::numPartitions(x)) { @@ -819,41 +819,41 @@ setMethod("subtractByKey", function (v) { v[[1]] }) }) -#' Return a subset of this RDD sampled by key. -#' -#' @description -#' \code{sampleByKey} Create a sample of this RDD using variable sampling rates -#' for different keys as specified by fractions, a key to sampling rate map. -#' -#' @param x The RDD to sample elements by key, where each element is -#' list(K, V) or c(K, V). -#' @param withReplacement Sampling with replacement or not -#' @param fraction The (rough) sample target fraction -#' @param seed Randomness seed value -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' rdd <- parallelize(sc, 1:3000) -#' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x) -#' else { if (x %% 3 == 1) list("b", x) else list("c", x) }}) -#' fractions <- list(a = 0.2, b = 0.1, c = 0.3) -#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) -#' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE -#' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE -#' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE -#' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE -#' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE -#' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE -#' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE -#' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE -#' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE -#' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4) -#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored -#' fractions <- list(a = 0.2, b = 0.1) -#' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c" -#'} -#' @rdname sampleByKey -#' @aliases sampleByKey,RDD-method +# Return a subset of this RDD sampled by key. +# +# @description +# \code{sampleByKey} Create a sample of this RDD using variable sampling rates +# for different keys as specified by fractions, a key to sampling rate map. +# +# @param x The RDD to sample elements by key, where each element is +# list(K, V) or c(K, V). +# @param withReplacement Sampling with replacement or not +# @param fraction The (rough) sample target fraction +# @param seed Randomness seed value +# @examples +#\dontrun{ +# sc <- sparkR.init() +# rdd <- parallelize(sc, 1:3000) +# pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x) +# else { if (x %% 3 == 1) list("b", x) else list("c", x) }}) +# fractions <- list(a = 0.2, b = 0.1, c = 0.3) +# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) +# 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE +# 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE +# 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE +# lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE +# lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE +# lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE +# lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE +# lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE +# lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE +# fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4) +# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored +# fractions <- list(a = 0.2, b = 0.1) +# sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c" +#} +# @rdname sampleByKey +# @aliases sampleByKey,RDD-method setMethod("sampleByKey", signature(x = "RDD", withReplacement = "logical", fractions = "vector", seed = "integer"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org