[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59717/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836 **[Test build #59717 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59717/consoleFull)** for PR 12836 at commit [`10f99d1`](https://github.com/apache/spark/commit/10f99d118e2aa94f65cb0a12ac74650533b60416). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836 **[Test build #59717 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59717/consoleFull)** for PR 12836 at commit [`10f99d1`](https://github.com/apache/spark/commit/10f99d118e2aa94f65cb0a12ac74650533b60416). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836 @NarineK, thanks. I will take a final round of look soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222596477 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222596478 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59631/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222596375 **[Test build #59631 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59631/consoleFull)** for PR 12836 at commit [`7b5767a`](https://github.com/apache/spark/commit/7b5767ad25aaa1f091c4b2d22d7a99cf3d8ec00b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222595900 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59630/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222595899 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222595798 **[Test build #59630 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59630/consoleFull)** for PR 12836 at commit [`a0425c1`](https://github.com/apache/spark/commit/a0425c17906fcd2ea1d8dd6fb33c0fd8a860d4a7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222586936 **[Test build #59631 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59631/consoleFull)** for PR 12836 at commit [`7b5767a`](https://github.com/apache/spark/commit/7b5767ad25aaa1f091c4b2d22d7a99cf3d8ec00b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120888 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120891 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120675 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { +#' y <- data.frame(x$a[1], x$c[1], mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (sqlContext, iris) --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120665 --- Diff: R/pkg/R/deserialize.R --- @@ -197,6 +197,31 @@ readMultipleObjects <- function(inputCon) { data # this is a list of named lists now } +readMultipleObjectsWithKeys <- function(inputCon) { + # readMultipleObjectsWithKeys will read multiple continuous objects from + # a DataOutputStream. There is no preceding field telling the count + # of the objects, so the number of objects varies, we try to read + # all objects in a loop until the end of the stream. The rows in --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120671 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { +#' y <- data.frame(x$a[1], x$c[1], mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (sqlContext, iris) +#' schema <- structType(structField("(Intercept)", "double"), +#' structField("Sepal_Width", "double"),structField("Petal_Length", "double"), +#' structField("Petal_Width", "double")) +#' df1 <- gapply( +#' df, +#' list(df$"Species"), +#' function(x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) +#' data.frame(t(coef(m))) +#' }, schema) +#' collect(df1) +#' +#'Result +#'- +#' Model (Intercept) Sepal_Width Petal_Length Petal_Width +#' 10.6998830.33033700.9455356-0.1697527 +#' 21.8955400.38685760.9083370-0.6792238 +#' 32.3518900.65483500.2375602 0.2521257 +#' +#'} +setMethod("gapply", + signature(x = "SparkDataFrame"), + function(x, col, func, schema) { --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120653 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,54 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @family agg_funcs --- End diff -- removed "agg func" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120620 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,54 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @family agg_funcs +#' @examples +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120599 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,67 +84,78 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +# If DataFrame - mode = 1 and mode = 2, then read column names +if (mode > 0) { --- End diff -- I ended up leaving mode as is. I also think that one variable is less confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120565 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,67 +84,78 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +# If DataFrame - mode = 1 and mode = 2, then read column names +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) + if (mode == 2) { +key <- SparkR:::readObject(inputCon) + } } isEmpty <- SparkR:::readInt(inputCon) if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen - data <- SparkR:::readDeserialize(inputCon) + dataList <- list(SparkR:::readDeserialize(inputCon)) } else if (deserializer == "string") { - data <- as.list(readLines(inputCon)) -} else if (deserializer == "row") { - data <- SparkR:::readMultipleObjects(inputCon) + dataList <- list(as.list(readLines(inputCon))) +} else if (deserializer == "row" && mode == 2) { + dataList <- SparkR:::readMultipleObjectsWithKeys(inputCon) +} else if (deserializer == "row"){ + dataList <- list(SparkR:::readMultipleObjects(inputCon)) } # Timing reading input data for execution inputElap <- elapsedSecs() - -if (isDataFrame) { - if (deserializer == "row") { -# Transform the list of rows into a data.frame -# Note that the optional argument stringsAsFactors for rbind is -# available since R 3.2.4. So we set the global option here. -oldOpt <- getOption("stringsAsFactors") -options(stringsAsFactors = FALSE) -data <- do.call(rbind.data.frame, data) -options(stringsAsFactors = oldOpt) - -names(data) <- colNames +for (i in 1:length(dataList)) { --- End diff -- done! I called it computeHelper, thought compute might be too generic for this specific use case. I can still rename it to compute if you think that it's a better name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120461 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2011,6 +2011,25 @@ class Dataset[T] private[sql]( } /** + * Returns a new [[DataFrame]] which contains the aggregated result of applying --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -379,6 +383,50 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = { pivot(pivotColumn, values.asScala) } + + /** + * Applies the given serialized R function `func` to each group of data. For each unique group, + * the function will be passed the group key and an iterator that contains all of the elements in + * the group. The function can return an iterator containing elements of an arbitrary type which + * will be returned as a new [[DataFrame]]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [[org.apache.spark.sql.expressions#Aggregator Aggregator]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @since 2.0.0 + */ + def flatMapGroupsInR( --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120450 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala --- @@ -325,6 +330,80 @@ case class MapGroupsExec( } /** + * Groups the input rows together and calls the R function with each group and an iterator + * containing all elements in the group. + * The result of this function is flattened before being output. + */ +case class FlatMapGroupsInRExec( +func: Array[Byte], +packageNames: Array[Byte], +broadcastVars: Array[Broadcast[Object]], +inputSchema: StructType, +outputSchema: StructType, +keyDeserializer: Expression, +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val isDeserializedRData = + if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false +val serializerForR = if (!isDeserializedRData) { + SerializationFormats.ROW +} else { + SerializationFormats.BYTE +} +val (deserializerForR, colNames) = + (SerializationFormats.ROW, inputSchema.fieldNames) + +child.execute().mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes) + val getValue = ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) + val outputObject = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) + val groupNames = groupingAttributes.map(_.name).toArray + + val runner = new RRunner[Array[Byte]]( +func, deserializerForR, serializerForR, packageNames, broadcastVars, +isDataFrame = true, colNames = colNames, key = groupNames) + + val hasGroups = grouped.hasNext --- End diff -- Did some refactoring! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222586065 **[Test build #59630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59630/consoleFull)** for PR 12836 at commit [`a0425c1`](https://github.com/apache/spark/commit/a0425c17906fcd2ea1d8dd6fb33c0fd8a860d4a7). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120403 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r65120399 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { --- End diff -- done! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222445838 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59609/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222445837 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222445630 **[Test build #59609 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59609/consoleFull)** for PR 12836 at commit [`aca5395`](https://github.com/apache/spark/commit/aca539575d192056345c166adb5cf8ee0a814c84). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222425980 **[Test build #59609 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59609/consoleFull)** for PR 12836 at commit [`aca5395`](https://github.com/apache/spark/commit/aca539575d192056345c166adb5cf8ee0a814c84). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222424808 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222424809 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59607/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222424682 **[Test build #59607 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59607/consoleFull)** for PR 12836 at commit [`6b91858`](https://github.com/apache/spark/commit/6b918583ba34b5b7514f8bd6ee87247b0256b77a). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222412709 **[Test build #59607 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59607/consoleFull)** for PR 12836 at commit [`6b91858`](https://github.com/apache/spark/commit/6b918583ba34b5b7514f8bd6ee87247b0256b77a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222412185 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222412184 **[Test build #59606 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59606/consoleFull)** for PR 12836 at commit [`52c9f6d`](https://github.com/apache/spark/commit/52c9f6da1cfb0c520e248e187b27cd616ed14f1d). * This patch **fails R style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222412187 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59606/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222411932 **[Test build #59606 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59606/consoleFull)** for PR 12836 at commit [`52c9f6d`](https://github.com/apache/spark/commit/52c9f6da1cfb0c520e248e187b27cd616ed14f1d). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222384540 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222384541 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59598/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222384486 **[Test build #59598 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59598/consoleFull)** for PR 12836 at commit [`4532102`](https://github.com/apache/spark/commit/453210233b55562a85eff75f2ced79f86e2e3255). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222379286 **[Test build #59598 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59598/consoleFull)** for PR 12836 at commit [`4532102`](https://github.com/apache/spark/commit/453210233b55562a85eff75f2ced79f86e2e3255). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222379022 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378750 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59593/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378755 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378756 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59597/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378743 **[Test build #59597 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59597/consoleFull)** for PR 12836 at commit [`4532102`](https://github.com/apache/spark/commit/453210233b55562a85eff75f2ced79f86e2e3255). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378749 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378722 **[Test build #59593 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59593/consoleFull)** for PR 12836 at commit [`f8c994f`](https://github.com/apache/spark/commit/f8c994f04375058789f44637c4e84df48c1d5a01). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222378255 **[Test build #59597 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59597/consoleFull)** for PR 12836 at commit [`4532102`](https://github.com/apache/spark/commit/453210233b55562a85eff75f2ced79f86e2e3255). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222373665 **[Test build #59593 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59593/consoleFull)** for PR 12836 at commit [`f8c994f`](https://github.com/apache/spark/commit/f8c994f04375058789f44637c4e84df48c1d5a01). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64993001 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,67 +84,78 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +# If DataFrame - mode = 1 and mode = 2, then read column names +if (mode > 0) { --- End diff -- I think having a single variable `mode` is better than having a number of boolean variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64992518 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,67 +84,78 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +# If DataFrame - mode = 1 and mode = 2, then read column names +if (mode > 0) { --- End diff -- ``` Thnx @sun-rui ! Maybe it will be better to have smth like: ``` isDataFrame = FALSE dapply = FALSE gapply = FALSE if (mode > 0) { isDataFrame = TRUE if (mode == 1) { dapply = TRUE } else { gapply = TRUE } } ``` because in your example, we'd also need to set, gapply = FALSE and dapply = FALSE accordingly is mode > 0. Smth like: if (mode == 1) { dapply = TRUE gapply = FALSE } else { dapply = FALSE gapply = TRUE } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64983139 --- Diff: R/pkg/R/deserialize.R --- @@ -197,6 +197,31 @@ readMultipleObjects <- function(inputCon) { data # this is a list of named lists now } +readMultipleObjectsWithKeys <- function(inputCon) { + # readMultipleObjectsWithKeys will read multiple continuous objects from + # a DataOutputStream. There is no preceding field telling the count + # of the objects, so the number of objects varies, we try to read + # all objects in a loop until the end of the stream. The rows in + # the stream are separated by grouping-key boundary + data <- list() + subData <- list() + while (TRUE) { +# If reaching the end of the stream, type returned should be "". +type <- readType(inputCon) +if (type == "") { + break +} else if (type == "r") { + # key boundary detected + readTypedObject(inputCon, type) --- End diff -- it is actually a boundary separator not the key we used to refer in : https://github.com/NarineK/spark/blob/9cacd4dbfa0e20d2a855e23f2962a258abbba553/R/pkg/inst/worker/worker.R#L94 No, it is not being used internally, it is a separator(to mark the group boundaries) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64983076 --- Diff: core/src/main/scala/org/apache/spark/api/r/RRunner.scala --- @@ -149,12 +150,23 @@ private[spark] class RRunner[U]( dataOut.writeInt(numPartitions) - dataOut.writeInt(if (isDataFrame) 1 else 0) + val mode = if (isDataFrame && key != null) { +2 // gapply + } else if (isDataFrame) { +1 // dapply + } else { +0 // RDD + } + dataOut.writeInt(mode) if (isDataFrame) { SerDe.writeObject(dataOut, colNames) } + if (key != null) { --- End diff -- It is the grouping columns(key) used for gapply mode. I can also additionally check for gapply mode before writing it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-222184073 Thanks, sun-rui! Will address those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64876284 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala --- @@ -325,6 +330,80 @@ case class MapGroupsExec( } /** + * Groups the input rows together and calls the R function with each group and an iterator + * containing all elements in the group. + * The result of this function is flattened before being output. + */ +case class FlatMapGroupsInRExec( +func: Array[Byte], +packageNames: Array[Byte], +broadcastVars: Array[Broadcast[Object]], +inputSchema: StructType, +outputSchema: StructType, +keyDeserializer: Expression, +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val isDeserializedRData = + if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false +val serializerForR = if (!isDeserializedRData) { + SerializationFormats.ROW +} else { + SerializationFormats.BYTE +} +val (deserializerForR, colNames) = + (SerializationFormats.ROW, inputSchema.fieldNames) + +child.execute().mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes) + val getValue = ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) + val outputObject = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) + val groupNames = groupingAttributes.map(_.name).toArray + + val runner = new RRunner[Array[Byte]]( +func, deserializerForR, serializerForR, packageNames, broadcastVars, +isDataFrame = true, colNames = colNames, key = groupNames) + + val hasGroups = grouped.hasNext --- End diff -- the following code can be simplified. Could you refactor it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64874033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -379,6 +383,50 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = { pivot(pivotColumn, values.asScala) } + + /** + * Applies the given serialized R function `func` to each group of data. For each unique group, + * the function will be passed the group key and an iterator that contains all of the elements in + * the group. The function can return an iterator containing elements of an arbitrary type which + * will be returned as a new [[DataFrame]]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [[org.apache.spark.sql.expressions#Aggregator Aggregator]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @since 2.0.0 + */ + def flatMapGroupsInR( --- End diff -- private[sql] def --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64873939 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2011,6 +2011,25 @@ class Dataset[T] private[sql]( } /** + * Returns a new [[DataFrame]] which contains the aggregated result of applying --- End diff -- Seems this piece of code can be removed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64873380 --- Diff: core/src/main/scala/org/apache/spark/api/r/RRunner.scala --- @@ -149,12 +150,23 @@ private[spark] class RRunner[U]( dataOut.writeInt(numPartitions) - dataOut.writeInt(if (isDataFrame) 1 else 0) + val mode = if (isDataFrame && key != null) { +2 // gapply + } else if (isDataFrame) { +1 // dapply + } else { +0 // RDD + } + dataOut.writeInt(mode) if (isDataFrame) { SerDe.writeObject(dataOut, colNames) } + if (key != null) { --- End diff -- what is this piece of code for? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64872682 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,67 +84,78 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +# If DataFrame - mode = 1 and mode = 2, then read column names +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) + if (mode == 2) { +key <- SparkR:::readObject(inputCon) + } } isEmpty <- SparkR:::readInt(inputCon) if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen - data <- SparkR:::readDeserialize(inputCon) + dataList <- list(SparkR:::readDeserialize(inputCon)) } else if (deserializer == "string") { - data <- as.list(readLines(inputCon)) -} else if (deserializer == "row") { - data <- SparkR:::readMultipleObjects(inputCon) + dataList <- list(as.list(readLines(inputCon))) +} else if (deserializer == "row" && mode == 2) { + dataList <- SparkR:::readMultipleObjectsWithKeys(inputCon) +} else if (deserializer == "row"){ + dataList <- list(SparkR:::readMultipleObjects(inputCon)) } # Timing reading input data for execution inputElap <- elapsedSecs() - -if (isDataFrame) { - if (deserializer == "row") { -# Transform the list of rows into a data.frame -# Note that the optional argument stringsAsFactors for rbind is -# available since R 3.2.4. So we set the global option here. -oldOpt <- getOption("stringsAsFactors") -options(stringsAsFactors = FALSE) -data <- do.call(rbind.data.frame, data) -options(stringsAsFactors = oldOpt) - -names(data) <- colNames +for (i in 1:length(dataList)) { --- End diff -- instead of put all computations in a for loop, it would be better that put the main flow into one function, say, compute(), and then you can ``` if (isDataFrame) { if (dapply) { compute(data) } else { for (subdata in data) { compute(subdata) } } } else { compute (data) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64871473 --- Diff: R/pkg/inst/worker/worker.R --- @@ -84,67 +84,78 @@ broadcastElap <- elapsedSecs() # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +# If DataFrame - mode = 1 and mode = 2, then read column names +if (mode > 0) { --- End diff -- To improve code readability, better not use mode directly. you can ``` if (mode > 0) { isDataFrame = TRUE if (mode == 1) { dapply = TRUE } else { gapply = TRUE } } else { isDataFrame = FALSE dapply = FALSE gapply = FALSE } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64870674 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,54 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @family agg_funcs +#' @examples +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, --- End diff -- remove sqlContext --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64870623 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,54 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @family agg_funcs --- End diff -- not sure this can be simply classified as "agg func"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64870145 --- Diff: R/pkg/R/deserialize.R --- @@ -197,6 +197,31 @@ readMultipleObjects <- function(inputCon) { data # this is a list of named lists now } +readMultipleObjectsWithKeys <- function(inputCon) { + # readMultipleObjectsWithKeys will read multiple continuous objects from + # a DataOutputStream. There is no preceding field telling the count + # of the objects, so the number of objects varies, we try to read + # all objects in a loop until the end of the stream. The rows in + # the stream are separated by grouping-key boundary + data <- list() + subData <- list() + while (TRUE) { +# If reaching the end of the stream, type returned should be "". +type <- readType(inputCon) +if (type == "") { + break +} else if (type == "r") { + # key boundary detected + readTypedObject(inputCon, type) --- End diff -- key is not used later? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64869782 --- Diff: R/pkg/R/deserialize.R --- @@ -197,6 +197,31 @@ readMultipleObjects <- function(inputCon) { data # this is a list of named lists now } +readMultipleObjectsWithKeys <- function(inputCon) { + # readMultipleObjectsWithKeys will read multiple continuous objects from + # a DataOutputStream. There is no preceding field telling the count + # of the objects, so the number of objects varies, we try to read + # all objects in a loop until the end of the stream. The rows in --- End diff -- comment: Replace the statement "The rows in ..." with " This function is for use by gapply. Each group of rows is followed by the grouping key for this group which is then followed by next group." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64867727 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { +#' y <- data.frame(x$a[1], x$c[1], mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (sqlContext, iris) +#' schema <- structType(structField("(Intercept)", "double"), +#' structField("Sepal_Width", "double"),structField("Petal_Length", "double"), +#' structField("Petal_Width", "double")) +#' df1 <- gapply( +#' df, +#' list(df$"Species"), +#' function(x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) +#' data.frame(t(coef(m))) +#' }, schema) +#' collect(df1) +#' +#'Result +#'- +#' Model (Intercept) Sepal_Width Petal_Length Petal_Width +#' 10.6998830.33033700.9455356-0.1697527 +#' 21.8955400.38685760.9083370-0.6792238 +#' 32.3518900.65483500.2375602 0.2521257 +#' +#'} +setMethod("gapply", + signature(x = "SparkDataFrame"), + function(x, col, func, schema) { --- End diff -- "col" is better to be "cols" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64867086 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { +#' y <- data.frame(x$a[1], x$c[1], mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (sqlContext, iris) --- End diff -- remove sqlContext --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64866906 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, --- End diff -- Remember to remove sqlContext as parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64866724 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input --- End diff -- update comment: Group the SparkDataFrame using the specified columns and apply the R function to each group. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64804537 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping --- End diff -- Yes, the key and the Dataframe with the grouping columns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64803994 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { --- End diff -- Thanks, will update the doc! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221966464 Thanks for the update @NarineK - The R changes are looking pretty good to me. Had some minor documentation comments. @sun-rui Could you take one more look ? Also @rxin / @davies -- Would be good if one of you can review the SQL/Scala changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64803396 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping --- End diff -- Minor comment: It would be good to say what the function will get as its input. Right now its the key and a dataframe with the grouping columns ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64803455 --- Diff: R/pkg/R/DataFrame.R --- @@ -1268,6 +1268,82 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Apply a R function to each group of a DataFrame. The group is defined by an input +#' grouping column. +#' +#' @param x A SparkDataFrame +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. +#' The output of func is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' sqlContext, +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(x) { --- End diff -- Do these examples need to get updated now that we are passing the key as well ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user thunterdb commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221930188 That sounds good, we can add `aggregate` later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221804692 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59347/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221804691 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221804455 **[Test build #59347 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59347/consoleFull)** for PR 12836 at commit [`9cacd4d`](https://github.com/apache/spark/commit/9cacd4dbfa0e20d2a855e23f2962a258abbba553). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221802724 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59345/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221802722 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221802453 **[Test build #59345 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59345/consoleFull)** for PR 12836 at commit [`0928740`](https://github.com/apache/spark/commit/09287408137f7d6fbe8f899b12810ab16cbb5c3e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221786353 **[Test build #59347 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59347/consoleFull)** for PR 12836 at commit [`9cacd4d`](https://github.com/apache/spark/commit/9cacd4dbfa0e20d2a855e23f2962a258abbba553). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221785096 **[Test build #59345 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59345/consoleFull)** for PR 12836 at commit [`0928740`](https://github.com/apache/spark/commit/09287408137f7d6fbe8f899b12810ab16cbb5c3e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221756012 yes, let's discuss this later if necessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221669659 Hmm - What is the difference between `dapply_row` and SQL row UDF ? anyways this discussion probably belongs in a new JIRA and not in this PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221585022 @shivaram, I did not mean SQL row UDF. It's something like below: ``` dapply_row <- function(x, FUN, schema) { dapply(x, function(x) { for (each row in x) FUN(row) } } ``` Not sure if it is really useful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221586542 @shivaram, I did not mean SQL row UDF. It's something like below: ``` dapply_row <- function(x, FUN, schema) { dapply(x, function(x) { for (each row in x) FUN(row) } } ``` Not sure if it is really useful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221478783 Lets keep it as `dapply` - The specific choice of applying on a partition as a data frame is built into its semantics. If we do build a single row UDF then we can make it match the Python / Scala UDF API or try to see if it matches `apply` in terms of semantics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221477758 @NarineK, mapply does not apply here, but we can later add something like lapply, which applies a function to each row in a DataFrame. I suddenly realized that it could be better to rename dapply to dapplyPartition @shivaram --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221181147 It seems that many ppl voted for point 2. So, I'll implement 2. 2. function(key, df), where key is a list of grouping column values for this group, df is the data.frame of the group, containing the grouping columns. This is similar to the scala function signature for KeyValueGroupedData.flatMapGroups(). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r64335943 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -21,10 +21,12 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.function._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer --- End diff -- yes, will do that, thnx --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221180766 Ok, I ses, thnkx, @sun-rui --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221180426 We can also add an API later, supporting partial aggregation and final aggregation together, as we have done in RDD API. Refer to "aggregateRDD". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221179691 @thunterdb, @NarineK, definitely we can add API like aggregate() later based on the functionalities of two basic API. I can submit a JIRA issue for it later. We can allow passing a user defined function as FUN. We could support FUN as builit-in functions ('mean', 'sum', etc...) by internally created an R function wrapping it, but it seems not worth, As SparkDataFrame can provide such common aggregation functions, which run on JVM, having better performance than R worker. However, if any built-in functions in R has no parity in Spark Core, we can consider support it in SparkR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221158943 It seems that the generic functions FUN for aggregates have some limitations too: https://stat.ethz.ch/pipermail/r-help/2015-March/426535.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user NarineK commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221145783 Hi @thunterdb , Thank you for the comment. I think @sun-rui, addressed your comment also here, https://github.com/apache/spark/pull/12836#issuecomment-219621230 FUN in the aggregate is a built- in function such as : mean, sum etc. Can I have smth like: ``` func <- function(x) { y <- (data.frame(x$Species, mean(x$Sepal.Length), stringsAsFactors = FALSE)) } aggregate(iris, FUN=func, by=list("Species")) ``` This doesn't seem to work and R doesn't demonstrate examples on that either. Also R speaks about FUN as " summary statistics function" for us it can be anything applied on grouped data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user sun-rui commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221143217 yes, we can support the most general form and add higher level APIs later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12922][SparkR][WIP] Implement gapply() ...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/12836#issuecomment-221142351 @NarineK @sun-rui Thanks a lot for your work on this PR. I think the second option (of giving key and data.frame) is more intuitive / flexible as well. Would be good to investigate if that matches the `aggregate` use case that @thunterdb brings up. BTW I also think that at the underlying implementation level we should support the most general form and then we can have higher level APIs that ignore the key etc. Also @davies would be great if you could take a look at this updated version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org