[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 @gatorsmile, I'm able to access `groupingExprs` from `SQLUtils.scala` through `val groupingExprs: Seq[Expression],` however it seems to be challenging to access the name of the column from pure expression. In RelationalGroupedDataset it is using alias to create a named expression: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L83 What would you suggest as a best option of accessing column name from `Expression` ? Thank you, Narine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 Alright, give me couple days to address to those cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 I think 'prepend' sounds better. What do you think ? Yes, the `key` in `function(key, x) { x }` can be useful for some use cases but I also think that the user could easily prepend it to the dataframe if he/she needs it and since the `key` is already there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 I think @falaki's approach is good, only I find the key which is passed as an argument together with x as an input of function is a little superfluous. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 Thank you, @gatorsmile! I'll give a try. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 @falaki, I'd be fine with a separate `gapplyWithKeys()` method too. @shivaram, @felixcheung what do you think ? Should we add a new `gapplyWithKeys()` method ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14431: [SPARK-16258][SparkR] Automatically append the gr...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14431#discussion_r124327866 --- Diff: R/pkg/R/DataFrame.R --- @@ -1465,10 +1464,10 @@ setMethod("dapplyCollect", #' #' Result #' - -#' Model (Intercept) Sepal_Width Petal_Length Petal_Width -#' 10.6998830.33033700.9455356-0.1697527 -#' 21.8955400.38685760.9083370-0.6792238 -#' 32.3518900.65483500.2375602 0.2521257 +#' Model Species (Intercept) Sepal_Width Petal_Length Petal_Width --- End diff -- In the past we had a discussion about backward compatibility with shivaram. https://github.com/apache/spark/pull/14431#issuecomment-236643502 I think I didn't push R changes, because I wanted to be able to access the grouping columns on sql side first. Without being able to access the grouping columns I couldn't find a way to keep backward compatibility without breaking anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 yes, but we only need read access. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14742: [SPARK-17177][SQL] Make grouping columns accessible from...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14742 yes, we can close this, but it would be great if you could help us a way to access the grouping columns from SparkR in #14431 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14742: [SPARK-17177][SQL] Make grouping columns accessible from...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14742 Hi @gatorsmile, #14431 depends on this. Is there a way I can access the grouping columns from `RelationalGroupedDataset` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 Hi everyone, yes it depends on #14742 . I've been asked to close #14742. For this PR I need to access the grouping columns. If you think that there is an alternative way of accessing that information, I'd be happy to make the changes in this PR. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #10162: [SPARK-11250] [SQL] Generate different alias for ...
Github user NarineK closed the pull request at: https://github.com/apache/spark/pull/10162 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 @HyukjinKwon do you mean closing or fixing the PR ? As I understand from @gatorsmile he wants to close it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 I'd propose to have: 1. One input argument: suffixes[left, right] (if you want we can have 2 similar to pandas). 2. Default values for suffixes (I think defaults are more convenient but we can do it like in pandas and make it required ) 3. The logic can be implemented like in pandas: we will add suffixes regardless if the columns are in join condition or not 4. We can have a separate method for join with suffixes and set new column names after join has happened and do not touch existing join functions ... What do you think @marmbrus, @holdenk --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 In pandas it has 2 arguments: lsuffix='', rsuffix='', respectively for left and right sides. And it appends the suffixes to all column names regardless if they are in join condition or not . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 Thank you for following up on this, @marmbrus ! I looked into two places: R and Pandas DataFrames. In R it seems that they give new names to columns(columns which aren't in merge/join condition) and then merge: https://github.com/talgalili/R-code-snippets/blob/master/merge.data.frame.r#L86 In SparkR I've implemented it for the merge: https://github.com/apache/spark/pull/9012 In Pandas, DataFrame join has the suffixes too. It creates new labels using suffixes: https://github.com/pandas-dev/pandas/blob/master/pandas/tools/merge.py#L901 then, uses the new labels, as an axis (I assume column names) of the joined DataFrame. https://github.com/pandas-dev/pandas/blob/master/pandas/tools/merge.py#L916 Maybe we can have something similar to Pandas ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 I am trying different ways to solve the problem without renaming the columns and it seems that a better place to change the column names would be here: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L268 By changing the names of output attributes. Do you have any preferences @marmbrus ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 I see, I can go over the pull request this weekend. Thanks for the feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #10162: [SPARK-11250] [SQL] Generate different alias for columns...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/10162 I'd be happy to update to the latest master if we want to review this now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14742: [SPARK-17177][SQL] Make grouping columns accessible from...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14742 @liancheng, @rxin, Do you think adding `columns` to `RelationalGroupedDataset` is reasonable or should we find a workaround on R side ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 Made a pull request for grouping columns: #14742 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14742: [SPARK-17177][SQL] Make grouping columns accessible from...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14742 cc: @shivaram, @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14742: [SPARK-17177][SQL] Make grouping columns accessib...
GitHub user NarineK opened a pull request: https://github.com/apache/spark/pull/14742 [SPARK-17177][SQL] Make grouping columns accessible from `RelationalGroupedDataset` ## What changes were proposed in this pull request? Currently, once we create `RelationalGroupedDataset`, we cannot access the grouping columns from its instance. Analog to `Dataset` we can have a public method which returns the list of grouping columns. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L457 This can be useful for instance in SparkR when we want to have certain logic associated with the grouping columns, accessible from `RelationalGroupedDataset`. Similar to `Dataset.columns` I've added `RelationalGroupedDataset.columns` method which makes column names accessible. ## How was this patch tested? Unit tests in `DataFrameAggregateSuite` You can merge this pull request into a Git repository by running: $ git pull https://github.com/NarineK/spark Make_columns_accessible_from_RelationalGroupedDataset Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14742.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14742 commit 29d8a5c6c22202cdf7d6cc44f1d6cbeca5946918 Author: Narine Kokhlikyan Date: 2016-06-20T22:12:11Z Fixed duplicated documentation problem + separated documentation for dapply and dapplyCollect commit 60491b98d5cea46fc752e09fcc8306bcfc9423d0 Author: Narine Kokhlikyan Date: 2016-07-21T05:54:10Z merge with master commit 989df0e9fc411f3e5f25f19e130397f08099795a Author: Narine Kokhlikyan Date: 2016-08-21T02:42:55Z Merge branch 'master' of https://github.com/apache/spark commit 02d6bab047c9ae99f3ef154826a362529d0423be Author: Narine Kokhlikyan Date: 2016-08-21T21:10:07Z Initial commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14384: [Spark-16443][SparkR] Alternating Least Squares (...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14384#discussion_r74551161 --- Diff: R/pkg/inst/tests/testthat/test_mllib.R --- @@ -454,4 +454,61 @@ test_that("spark.survreg", { } }) +test_that("spark.als", { + # R code to reproduce the result. + # + #' data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0), + #' list(2, 1, 1.0), list(2, 2, 5.0)) + #' df <- createDataFrame(data, c("user", "item", "rating")) + #' model <- spark.als(df, ratingCol = "rating", userCol = "user", itemCol = "item", + #'rank = 10, maxIter = 5, seed = 0) + #' test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item")) + #' predict(model, test) + # + # -- output of 'predict(model, data)' + # + # user item prediction --- End diff -- It would be good to have these examples also in method documentation. Also, how do you interpret negatively predicted rating ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 yes, @shivaram , that will be one way to do. Basically, adding a new public function to `RelationalGroupedDataset` which will return the column names. If it is fine from SQL perspective, maybe I can make a separate pull request for that ? cc: @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 My point is the following: Let's say we have the following: `var relationalGroupedDataset = df.groupBy("col1", "col2");` Now, having `relationalGroupedDataset` how can I find out the grouping columns. there is nothing like: ` relationalGroupedDataset.columns relationalGroupedDataset.groupExpression ` Is there ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 Thanks, @shivaram! Yes, we have a handle to RelationalGroupedDataset, but I couldn't access column fields of RelationalGroupedDataset's instance. Is there a way to access the columns ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14384: [Spark-16443][SparkR] Alternating Least Squares (...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14384#discussion_r73999377 --- Diff: R/pkg/R/mllib.R --- @@ -632,3 +642,147 @@ setMethod("predict", signature(object = "AFTSurvivalRegressionModel"), function(object, newData) { return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) }) + + +#' Alternating Least Squares (ALS) for Collaborative Filtering +#' +#' \code{spark.als} learns latent factors in collaborative filtering via alternating least +#' squares. Users can call \code{summary} to obtain fitted latent factors, \code{predict} +#' to make predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. +#' +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/ml-collaborative-filtering.html}{MLlib: +#' Collaborative Filtering}. +#' Additional arguments can be passed to the methods. +#' \describe{ +#'\item{nonnegative}{logical value indicating whether to apply nonnegativity constraints. +#' Default: FALSE} +#'\item{implicitPrefs}{logical value indicating whether to use implicit preference. +#' Default: FALSE} +#'\item{alpha}{alpha parameter in the implicit preference formulation (>= 0). Default: 1.0} +#'\item{seed}{integer seed for random number generation. Default: 0} +#'\item{numUserBlocks}{number of user blocks used to parallelize computation (> 0). +#' Default: 10} +#'\item{numItemBlocks}{number of item blocks used to parallelize computation (> 0). +#' Default: 10} +#'\item{checkpointInterval}{number of checkpoint intervals (>= 1) or disable checkpoint (-1). +#' Default: 10} +#'} +#' +#' @param data A SparkDataFrame for training +#' @param ratingCol column name for ratings +#' @param userCol column name for user ids. Ids must be (or can be coerced into) integers +#' @param itemCol column name for item ids. Ids must be (or can be coerced into) integers +#' @param rank rank of the matrix factorization (> 0) +#' @param reg regularization parameter (>= 0) +#' @param maxIter maximum number of iterations (>= 0) + +#' @return \code{spark.als} returns a fitted ALS model +#' @rdname spark.als +#' @aliases spark.als,SparkDataFrame +#' @name spark.als +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(ratings) +#' model <- spark.als(df, "rating", "user", "item") +#' +#' # extract latent factors +#' stats <- summary(model) +#' userFactors <- stats$userFactors +#' itemFactors <- stats$itemFactors +#' +#' # make predictions +#' predicted <- predict(model, df) +#' showDF(predicted) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' +#' # set other arguments +#' modelS <- spark.als(df, "rating", "user", "item", rank = 20, +#' reg = 0.1, nonnegative = TRUE) +#' statsS <- summary(modelS) +#' } +#' @note spark.als since 2.1.0 +setMethod("spark.als", signature(data = "SparkDataFrame"), + function(data, ratingCol = "rating", userCol = "user", itemCol = "item", + rank = 10, reg = 1.0, maxIter = 10, ...) { + +`%||%` <- function(a, b) if (!is.null(a)) a else b + +args <- list(...) +numUserBlocks <- args$numUserBlocks %||% 10 +numItemBlocks <- args$numItemBlocks %||% 10 +implicitPrefs <- args$implicitPrefs %||% FALSE +alpha <- args$alpha %||% 1.0 +nonnegative <- args$nonnegative %||% FALSE +checkpointInterval <- args$checkpointInterval %||% 10 +seed <- args$seed %||% 0 + +features <- array(c(ratingCol, userCol, itemCol)) +distParams <- array(as.integer(c(numUserBlocks, numItemBlocks, + checkpointInterval, seed))) + +jobj <- callJStatic("org.apache.spark.ml.r.AL
[GitHub] spark pull request #14384: [Spark-16443][SparkR] Alternating Least Squares (...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14384#discussion_r73999041 --- Diff: R/pkg/R/mllib.R --- @@ -632,3 +642,147 @@ setMethod("predict", signature(object = "AFTSurvivalRegressionModel"), function(object, newData) { return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) }) + + +#' Alternating Least Squares (ALS) for Collaborative Filtering +#' +#' \code{spark.als} learns latent factors in collaborative filtering via alternating least +#' squares. Users can call \code{summary} to obtain fitted latent factors, \code{predict} +#' to make predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. +#' +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/ml-collaborative-filtering.html}{MLlib: +#' Collaborative Filtering}. +#' Additional arguments can be passed to the methods. +#' \describe{ +#'\item{nonnegative}{logical value indicating whether to apply nonnegativity constraints. +#' Default: FALSE} +#'\item{implicitPrefs}{logical value indicating whether to use implicit preference. +#' Default: FALSE} +#'\item{alpha}{alpha parameter in the implicit preference formulation (>= 0). Default: 1.0} +#'\item{seed}{integer seed for random number generation. Default: 0} +#'\item{numUserBlocks}{number of user blocks used to parallelize computation (> 0). +#' Default: 10} +#'\item{numItemBlocks}{number of item blocks used to parallelize computation (> 0). +#' Default: 10} +#'\item{checkpointInterval}{number of checkpoint intervals (>= 1) or disable checkpoint (-1). +#' Default: 10} +#'} +#' +#' @param data A SparkDataFrame for training +#' @param ratingCol column name for ratings +#' @param userCol column name for user ids. Ids must be (or can be coerced into) integers +#' @param itemCol column name for item ids. Ids must be (or can be coerced into) integers +#' @param rank rank of the matrix factorization (> 0) +#' @param reg regularization parameter (>= 0) +#' @param maxIter maximum number of iterations (>= 0) + +#' @return \code{spark.als} returns a fitted ALS model +#' @rdname spark.als +#' @aliases spark.als,SparkDataFrame +#' @name spark.als +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(ratings) +#' model <- spark.als(df, "rating", "user", "item") +#' +#' # extract latent factors +#' stats <- summary(model) +#' userFactors <- stats$userFactors +#' itemFactors <- stats$itemFactors +#' +#' # make predictions +#' predicted <- predict(model, df) +#' showDF(predicted) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' +#' # set other arguments +#' modelS <- spark.als(df, "rating", "user", "item", rank = 20, +#' reg = 0.1, nonnegative = TRUE) +#' statsS <- summary(modelS) +#' } +#' @note spark.als since 2.1.0 +setMethod("spark.als", signature(data = "SparkDataFrame"), + function(data, ratingCol = "rating", userCol = "user", itemCol = "item", + rank = 10, reg = 1.0, maxIter = 10, ...) { + +`%||%` <- function(a, b) if (!is.null(a)) a else b --- End diff -- Usually, in SparkR we haven't introduced such operators to solve this type of problems. Maybe it would be good to follow the convention... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14384: [Spark-16443][SparkR] Alternating Least Squares (...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14384#discussion_r73998522 --- Diff: R/pkg/R/mllib.R --- @@ -632,3 +642,147 @@ setMethod("predict", signature(object = "AFTSurvivalRegressionModel"), function(object, newData) { return(dataFrame(callJMethod(object@jobj, "transform", newData@sdf))) }) + + +#' Alternating Least Squares (ALS) for Collaborative Filtering +#' +#' \code{spark.als} learns latent factors in collaborative filtering via alternating least +#' squares. Users can call \code{summary} to obtain fitted latent factors, \code{predict} +#' to make predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. +#' +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/ml-collaborative-filtering.html}{MLlib: +#' Collaborative Filtering}. +#' Additional arguments can be passed to the methods. +#' \describe{ +#'\item{nonnegative}{logical value indicating whether to apply nonnegativity constraints. +#' Default: FALSE} +#'\item{implicitPrefs}{logical value indicating whether to use implicit preference. +#' Default: FALSE} +#'\item{alpha}{alpha parameter in the implicit preference formulation (>= 0). Default: 1.0} +#'\item{seed}{integer seed for random number generation. Default: 0} +#'\item{numUserBlocks}{number of user blocks used to parallelize computation (> 0). +#' Default: 10} +#'\item{numItemBlocks}{number of item blocks used to parallelize computation (> 0). +#' Default: 10} +#'\item{checkpointInterval}{number of checkpoint intervals (>= 1) or disable checkpoint (-1). +#' Default: 10} +#'} +#' +#' @param data A SparkDataFrame for training +#' @param ratingCol column name for ratings +#' @param userCol column name for user ids. Ids must be (or can be coerced into) integers +#' @param itemCol column name for item ids. Ids must be (or can be coerced into) integers +#' @param rank rank of the matrix factorization (> 0) +#' @param reg regularization parameter (>= 0) +#' @param maxIter maximum number of iterations (>= 0) + +#' @return \code{spark.als} returns a fitted ALS model +#' @rdname spark.als +#' @aliases spark.als,SparkDataFrame +#' @name spark.als +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(ratings) --- End diff -- @junyangq, I think that it would be good if you show in examples how you create the `ratings` data.frame. Does R have a build-in ratings dataset ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 It seems that, currently, in SparkR the `GroupedData` which represents scala's GroupedData object doesn't have any information about the grouping keys. `RelationalGroupedDataset` has a private attribute `groupingExpr` which contains information about grouping columns, however it is not accessible from R side. I was thinking that maybe we could pass grouping columns to groups.R like: groupedData(sgd, cols). Any thoughts @shivaram ? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 cool! Let me give a try that option. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14431: [SPARK-16258][SparkR] Automatically append the grouping ...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14431 That's a good point, @shivaram `worker.R` is the component which has the keys and appends it to the output. I don't see any elegant way of doing it in `worker.R` yet. However, I was thinking about the following option: We can still have optional flag in gapply that states if the key is required or not but we will not pass it over to scala side. By default we can always prepend keys in `worker.R` and in `group.R` we can have a check such as: if (!prependKey) { // de-attach/remove the appended key columns. } Is this sound reasonable or is it a still hackish ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14431: [SPARK-16258][SparkR] Automatically append the gr...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14431#discussion_r72916339 --- Diff: docs/sparkr.md --- @@ -429,19 +431,19 @@ result <- gapplyCollect( df, "waiting", function(key, x) { -y <- data.frame(key, max(x$eruptions)) -colnames(y) <- c("waiting", "max_eruption") -y +y <- data.frame(max(x$eruptions)) }) +colnames(result) <- c("waiting", "max_eruption") + head(result[order(result$max_eruption, decreasing = TRUE), ]) ##waiting max_eruption -##1 64 5.100 --- End diff -- previously, I there was a typo in the examples. It is easy to see by running: ``` > result <- data.frame(aggregate(faithful$eruptions, by = list(faithful$waiting), FUN = max)) > result <- head(result[order(result$x, decreasing = TRUE), ]) > result ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14431: [SPARK-16258][SparkR] Automatically append the gr...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14431#discussion_r72916310 --- Diff: docs/sparkr.md --- @@ -398,23 +398,25 @@ and Spark. {% highlight r %} # Determine six waiting times with the largest eruption time in minutes. -schema <- structType(structField("waiting", "double"), structField("max_eruption", "double")) +schema <- structType(structField("max_eruption", "double")) result <- gapply( df, "waiting", function(key, x) { -y <- data.frame(key, max(x$eruptions)) +y <- data.frame(max(x$eruptions)) }, schema) +colnames(result) <- c("waiting", "max_eruption") + head(collect(arrange(result, "max_eruption", decreasing = TRUE))) ##waiting max_eruption -##1 64 5.100 --- End diff -- previously, I there was a typo in the examples. It is easy to see by running: ``` > result <- data.frame(aggregate(faithful$eruptions, by = list(faithful$waiting), FUN = max)) > result <- head(result[order(result$x, decreasing = TRUE), ]) > result ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14431: [SPARK-16258][SparkR][WIP] Gapply add key attach ...
GitHub user NarineK opened a pull request: https://github.com/apache/spark/pull/14431 [SPARK-16258][SparkR][WIP] Gapply add key attach option ## What changes were proposed in this pull request? The following pull request addresses the new feature request described in SPARK-16258. It automatically('by default') appends grouping keys to output `DataFrame`. I've also tried to solve the problem by adding an optional flag in `gapply` that states if the key is required or not. However, the optional flag needs to be passed as an argument through a number of methods which is not necessarily elegant and leads to some issues such as "The number of parameters should not exceed 10" in '/logical/object.scala:290' Since this pull request already appends the grouping key automatically, I was thinking if we really need to pass 'key' as R functions input argument - function(key, x) {} Isn't it superfluous ? I'd be happy to hear your thoughts on that. Thanks! ## How was this patch tested? Test cases in R. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NarineK/spark gapply-add-key-attach-option Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14431.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14431 commit 29d8a5c6c22202cdf7d6cc44f1d6cbeca5946918 Author: Narine Kokhlikyan Date: 2016-06-20T22:12:11Z Fixed duplicated documentation problem + separated documentation for dapply and dapplyCollect commit 60491b98d5cea46fc752e09fcc8306bcfc9423d0 Author: Narine Kokhlikyan Date: 2016-07-21T05:54:10Z merge with master commit 575fcf82ff01f14b1186981c4188b6f9094e6bbc Author: Narine Kokhlikyan Date: 2016-08-01T00:40:07Z gapply: prepend key to output --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 @shivaram, @sun-rui , I was wondering if someone created a jira for the issue described here: https://github.com/apache/spark/pull/12836#issuecomment-225403054 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14090: [SPARK-16112][SparkR] Programming guide for gapply/gappl...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14090 Thanks, I've generated the docs with your suggested way @shivaram, but I'm not sure if I see the same thing as you. I still see some '{% highlight r %}' and some formatting issues in general. I also followed this documentation: https://github.com/apache/spark/tree/master/docs#generating-the-documentation-html Please, let me know if you still see the issues after my latest commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14090: [SPARK-16112][SparkR] Programming guide for gapply/gappl...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14090 Thanks @shivaram, @felixcheung for the comments. I'll address those today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70923645 --- Diff: docs/sparkr.md --- @@ -316,6 +314,139 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below data type mapping between R +and Spark. + + Data type mapping between R and Spark + +RSpark + + byte + byte + + + integer + integer + + + float + float + + + double + double + + + numeric + double + + + character + string + + + string + string + + + binary + binary + + + raw + binary + + + logical + boolean + + + timestamp + timestamp + + + date + date + + + array + array + + + list + array + + + map + map + + + env + map + + + struct --- End diff -- Sounds good. for the mapping between: 'POSIXct / POSIXlt' to 'timestamp' and 'Date' to 'date' do we need to update 'getSQLDataType' method ? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L91 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70921996 --- Diff: docs/sparkr.md --- @@ -316,6 +314,139 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below data type mapping between R +and Spark. + + Data type mapping between R and Spark + +RSpark + + byte + byte + + + integer + integer + + + float + float + + + double + double + + + numeric + double + + + character + string + + + string + string + + + binary + binary + + + raw + binary + + + logical + boolean + + + timestamp + timestamp + + + date + date + + + array + array + + + list + array + + + map + map + + + env + map + + + struct --- End diff -- Thanks for the explanation, @shivaram ! So, I'll remove map, struct and timestamp and leave the rest as is. Does it sound fine ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70920518 --- Diff: docs/sparkr.md --- @@ -316,6 +314,139 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below data type mapping between R +and Spark. + + Data type mapping between R and Spark + +RSpark + + byte + byte + + + integer + integer + + + float + float + + + double + double + + + numeric + double + + + character + string + + + string + string + + + binary + binary + + + raw + binary + + + logical + boolean + + + timestamp + timestamp + + + date + date + + + array + array + + + list + array + + + map + map + + + env + map + + + struct --- End diff -- @shivaram, I've looked at the following list: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L92 It is being called for creating schema's field and it has map, struct, timestamp, etc ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70920244 --- Diff: docs/sparkr.md --- @@ -316,6 +314,139 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below data type mapping between R +and Spark. + + Data type mapping between R and Spark + +RSpark + + byte + byte + + + integer + integer + + + float + float + + + double + double + + + numeric + double + + + character + string + + + string + string + + + binary + binary + + + raw + binary + + + logical + boolean + + + timestamp + timestamp + + + date + date + + + array + array + + + list + array + + + map + map + + + env + map + + + struct --- End diff -- @felixcheung, I think according to the following mapping we expect 'date': https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L91 And it seems that there is a 'Date' in base. Do I understand correct ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14090: [SPARK-16112][SparkR] Programming guide for gapply/gappl...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/14090 Added data type description --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70202736 --- Diff: docs/sparkr.md --- @@ -306,6 +306,64 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must match the R function's output. --- End diff -- Thanks, I was looking at types.R file and have noticed that we have NA's for array, map and struct. https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L42 But I guess in our case we can have: array, map and struct mapped to array, map and struct correspondingly ?! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70202321 --- Diff: docs/sparkr.md --- @@ -306,6 +306,64 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must match the R function's output. --- End diff -- Thanks @shivaram. Does the following mapping looks fine to have in the table ? ``` **R Spark** byte byte integer integer float float double double numericdouble character string stringstring binary binary raw binary logical boolean timestamptimestamp date date array array map map structstruct ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70198331 --- Diff: docs/sparkr.md --- @@ -306,6 +306,64 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must match the R function's output. --- End diff -- or we could probably refer also to this ? https://github.com/apache/spark/blob/master/R/pkg/R/types.R#L21 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70194370 --- Diff: docs/sparkr.md --- @@ -306,6 +306,64 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must match the R function's output. --- End diff -- I see. I think we can describe the following type mapping in the programming guide. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala#L91 Those are the types used in the StructType's fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/14090#discussion_r70168781 --- Diff: docs/sparkr.md --- @@ -306,6 +306,64 @@ head(ldf, 3) {% endhighlight %} + Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +# gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must match the R function's output. --- End diff -- Thanks @felixcheung, Does this sound better ? "It must reflect R function's output schema on the basis of Spark data types. The column names of each output field in the schema are set by user." I could also bring up some examples. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14090: [SPARK-16112][SparkR] Programming guide for gappl...
GitHub user NarineK opened a pull request: https://github.com/apache/spark/pull/14090 [SPARK-16112][SparkR] Programming guide for gapply/gapplyCollect ## What changes were proposed in this pull request? Updates programming guide for spark.gapply/spark.gapplyCollect. Similar to other examples I used faithful dataset to demonstrate gapply's functionality. Please, let me know if you prefer another example. ## How was this patch tested? Existing test cases in R You can merge this pull request into a Git repository by running: $ git pull https://github.com/NarineK/spark gapplyProgGuide Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14090.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14090 commit 29d8a5c6c22202cdf7d6cc44f1d6cbeca5946918 Author: Narine Kokhlikyan Date: 2016-06-20T22:12:11Z Fixed duplicated documentation problem + separated documentation for dapply and dapplyCollect commit 698c4331d2a8bfe7f4b372ebc8123b6c27a57e68 Author: Narine Kokhlikyan Date: 2016-06-23T18:51:48Z merge with master commit 85a4493a03b3601a93c25ebc1eafb2868efec8d8 Author: Narine Kokhlikyan Date: 2016-07-07T13:18:49Z Adding programming guide for gapply/gapplyCollect commit 7781d1c111f38e3608d5ebd468e6d344d52efa5c Author: Narine Kokhlikyan Date: 2016-07-07T13:27:35Z removing output format --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13760: [SPARK-16012][SparkR] implement gapplyCollect which will...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13760 Do you have any questions on this @shivaram , @sun-rui ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13760: [SPARK-16012][SparkR] implement gapplyCollect which will...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13760 @felixcheung , I've addressed the comments or put a comment for the non-addressed ones. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68571809 --- Diff: R/pkg/R/DataFrame.R --- @@ -1370,14 +1370,22 @@ setMethod("dapplyCollect", #' columns with data types integer and string and the mean which is a double. #' schema <- structType(structField("a", "integer"), structField("c", "string"), #' structField("avg", "double")) -#' df1 <- gapply( +#' result <- gapply( --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68571761 --- Diff: R/pkg/R/group.R --- @@ -198,62 +198,61 @@ createMethods() #' #' Applies a R function to each group in the input GroupedData #' -#' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame +#' @param x A GroupedData #' @rdname gapply #' @name gapply #' @export -#' @examples -#' \dontrun{ -#' Computes the arithmetic mean of the second column by grouping -#' on the first and third columns. Output the grouping values and the average. -#' -#' df <- createDataFrame ( -#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), -#' c("a", "b", "c", "d")) -#' -#' Here our output contains three columns, the key which is a combination of two -#' columns with data types integer and string and the mean which is a double. -#' schema <- structType(structField("a", "integer"), structField("c", "string"), -#' structField("avg", "double")) -#' df1 <- gapply( -#' df, -#' list("a", "c"), -#' function(key, x) { -#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) -#' }, -#' schema) -#' collect(df1) -#' -#' Result -#' -- -#' a c avg -#' 3 3 3.0 -#' 1 1 1.5 -#' } +#' @seealso \link{gapplyCollect} --- End diff -- moved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68571781 --- Diff: R/pkg/R/DataFrame.R --- @@ -1419,6 +1427,80 @@ setMethod("gapply", gapply(grouped, func, schema) }) +#' gapplyCollect +#' +#' Groups the SparkDataFrame using the specified columns, applies the R function to each +#' group and collects the result back to R as data.frame. +#' +#' @param x A SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68565249 --- Diff: R/pkg/R/group.R --- @@ -198,62 +198,61 @@ createMethods() #' #' Applies a R function to each group in the input GroupedData #' -#' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame +#' @param x A GroupedData #' @rdname gapply #' @name gapply #' @export -#' @examples -#' \dontrun{ -#' Computes the arithmetic mean of the second column by grouping -#' on the first and third columns. Output the grouping values and the average. -#' -#' df <- createDataFrame ( -#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), -#' c("a", "b", "c", "d")) -#' -#' Here our output contains three columns, the key which is a combination of two -#' columns with data types integer and string and the mean which is a double. -#' schema <- structType(structField("a", "integer"), structField("c", "string"), -#' structField("avg", "double")) -#' df1 <- gapply( -#' df, -#' list("a", "c"), -#' function(key, x) { -#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) -#' }, -#' schema) -#' collect(df1) -#' -#' Result -#' -- -#' a c avg -#' 3 3 3.0 -#' 1 1 1.5 -#' } +#' @seealso \link{gapplyCollect} #' @note gapply(GroupedData) since 2.0.0 setMethod("gapply", signature(x = "GroupedData"), function(x, func, schema) { -try(if (is.null(schema)) stop("schema cannot be NULL")) -packageNamesArr <- serialize(.sparkREnv[[".packages"]], - connection = NULL) -broadcastArr <- lapply(ls(.broadcastNames), - function(name) { get(name, .broadcastNames) }) -sdf <- callJStatic( - "org.apache.spark.sql.api.r.SQLUtils", - "gapply", - x@sgd, - serialize(cleanClosure(func), connection = NULL), - packageNamesArr, - broadcastArr, - schema$jobj) -dataFrame(sdf) +if (is.null(schema)) stop("schema cannot be NULL") +gapplyInternal(x, func, schema) }) + +#' gapplyCollect +#' +#' Applies a R function to each group in the input GroupedData and collects the result +#' back to R as a data.frame. +#' +#' @param x A GroupedData +#' @param func A function to be applied to each group partition specified by GroupedData. +#' The function `func` takes as argument a key - grouping columns and +#' a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @return a SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @seealso \link{gapply} +#' @note gapplyCollect(GroupedData) since 2.0.0 +setMethod("gapplyCollect", + signature(x = "GroupedData"), + function(x, func) { +gdf <- gapplyInternal(x, func, NULL) +content <- callJMethod(gdf@sdf, "collect") +# content is a list of items of struct type. Each item has a single field +# which is a serialized data.frame corresponds to one group of the +# SparkDataFrame. +ldfs <- lapply(content, function(x) { unserialize(x[[1]]) }) +ldf <- do.call(rbind, ldfs) +row.names(ldf) &l
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68564302 --- Diff: R/pkg/R/group.R --- @@ -198,62 +198,61 @@ createMethods() #' #' Applies a R function to each group in the input GroupedData #' -#' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame +#' @param x A GroupedData #' @rdname gapply #' @name gapply #' @export -#' @examples -#' \dontrun{ -#' Computes the arithmetic mean of the second column by grouping -#' on the first and third columns. Output the grouping values and the average. -#' -#' df <- createDataFrame ( -#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), -#' c("a", "b", "c", "d")) -#' -#' Here our output contains three columns, the key which is a combination of two -#' columns with data types integer and string and the mean which is a double. -#' schema <- structType(structField("a", "integer"), structField("c", "string"), -#' structField("avg", "double")) -#' df1 <- gapply( -#' df, -#' list("a", "c"), -#' function(key, x) { -#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) -#' }, -#' schema) -#' collect(df1) -#' -#' Result -#' -- -#' a c avg -#' 3 3 3.0 -#' 1 1 1.5 -#' } +#' @seealso \link{gapplyCollect} #' @note gapply(GroupedData) since 2.0.0 setMethod("gapply", signature(x = "GroupedData"), function(x, func, schema) { -try(if (is.null(schema)) stop("schema cannot be NULL")) -packageNamesArr <- serialize(.sparkREnv[[".packages"]], - connection = NULL) -broadcastArr <- lapply(ls(.broadcastNames), - function(name) { get(name, .broadcastNames) }) -sdf <- callJStatic( - "org.apache.spark.sql.api.r.SQLUtils", - "gapply", - x@sgd, - serialize(cleanClosure(func), connection = NULL), - packageNamesArr, - broadcastArr, - schema$jobj) -dataFrame(sdf) +if (is.null(schema)) stop("schema cannot be NULL") +gapplyInternal(x, func, schema) }) + +#' gapplyCollect +#' +#' Applies a R function to each group in the input GroupedData and collects the result --- End diff -- Well the descriptions are are slightly different. Curently it shows both: Applies a R function to each group in the input GroupedData and collects the result back to R as a data.frame. Groups the SparkDataFrame using the specified columns, applies the R function to each group and collects the result back to R as data.frame. I can remove the one in group.R and leave only the DataFrame one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68563365 --- Diff: R/pkg/R/group.R --- @@ -198,62 +198,61 @@ createMethods() #' #' Applies a R function to each group in the input GroupedData #' -#' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame +#' @param x A GroupedData #' @rdname gapply #' @name gapply #' @export -#' @examples -#' \dontrun{ -#' Computes the arithmetic mean of the second column by grouping -#' on the first and third columns. Output the grouping values and the average. -#' -#' df <- createDataFrame ( -#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), -#' c("a", "b", "c", "d")) -#' -#' Here our output contains three columns, the key which is a combination of two -#' columns with data types integer and string and the mean which is a double. -#' schema <- structType(structField("a", "integer"), structField("c", "string"), -#' structField("avg", "double")) -#' df1 <- gapply( -#' df, -#' list("a", "c"), -#' function(key, x) { -#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) -#' }, -#' schema) -#' collect(df1) -#' -#' Result -#' -- -#' a c avg -#' 3 3 3.0 -#' 1 1 1.5 -#' } +#' @seealso \link{gapplyCollect} #' @note gapply(GroupedData) since 2.0.0 setMethod("gapply", signature(x = "GroupedData"), function(x, func, schema) { -try(if (is.null(schema)) stop("schema cannot be NULL")) -packageNamesArr <- serialize(.sparkREnv[[".packages"]], - connection = NULL) -broadcastArr <- lapply(ls(.broadcastNames), - function(name) { get(name, .broadcastNames) }) -sdf <- callJStatic( - "org.apache.spark.sql.api.r.SQLUtils", - "gapply", - x@sgd, - serialize(cleanClosure(func), connection = NULL), - packageNamesArr, - broadcastArr, - schema$jobj) -dataFrame(sdf) +if (is.null(schema)) stop("schema cannot be NULL") +gapplyInternal(x, func, schema) }) + +#' gapplyCollect +#' +#' Applies a R function to each group in the input GroupedData and collects the result +#' back to R as a data.frame. +#' +#' @param x A GroupedData +#' @param func A function to be applied to each group partition specified by GroupedData. +#' The function `func` takes as argument a key - grouping columns and +#' a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @return a SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @seealso \link{gapply} +#' @note gapplyCollect(GroupedData) since 2.0.0 +setMethod("gapplyCollect", + signature(x = "GroupedData"), + function(x, func) { +gdf <- gapplyInternal(x, func, NULL) +content <- callJMethod(gdf@sdf, "collect") --- End diff -- collect(gdf) doesn't really work. the collect is called on the dataframe: `gdf@sdf` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68542491 --- Diff: R/pkg/R/DataFrame.R --- @@ -1370,14 +1370,22 @@ setMethod("dapplyCollect", #' columns with data types integer and string and the mean which is a double. #' schema <- structType(structField("a", "integer"), structField("c", "string"), #' structField("avg", "double")) -#' df1 <- gapply( +#' result <- gapply( --- End diff -- thanks, @felixcheung , I think I kept it consistent with dapply/dapplyCollect. Those do not have @return. I can add it to gapply --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68298040 --- Diff: R/pkg/R/group.R --- @@ -243,17 +236,73 @@ setMethod("gapply", signature(x = "GroupedData"), function(x, func, schema) { try(if (is.null(schema)) stop("schema cannot be NULL")) --- End diff -- with or without try both work work fine. Without try the error looks like: Error in .local(x, ...) : schema cannot be NULL with try: Error in try(if (is.null(schema)) stop("schema cannot be NULL")) : schema cannot be NULL Is there a convention in SparkR for showing an error message ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68291007 --- Diff: R/pkg/R/group.R --- @@ -199,17 +199,10 @@ createMethods() #' Applies a R function to each group in the input GroupedData #' #' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame #' @rdname gapply #' @name gapply #' @export +#' @seealso \link{gapplyCollect} #' @examples #' \dontrun{ --- End diff -- @felixcheung , I took the same examples what gapply has. I did it similar to dapply and dapplyCollect. dapply and dapplyCollect have the same examples too. I think @sun-rui preferred having those the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] implement gapplyCollect whi...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68227494 --- Diff: R/pkg/R/group.R --- @@ -199,17 +199,10 @@ createMethods() #' Applies a R function to each group in the input GroupedData #' #' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame #' @rdname gapply #' @name gapply #' @export +#' @seealso \link{gapplyCollect} #' @examples #' \dontrun{ --- End diff -- @sun-rui, @felixcheung , @shivaram, do you think it is better to move all examples to DataFrame.R and not spread those in both DataFrame.R and Groups.R ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] gapplyCollect - applies a R...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68116142 --- Diff: R/pkg/R/group.R --- @@ -199,17 +199,10 @@ createMethods() #' Applies a R function to each group in the input GroupedData #' #' @param x a GroupedData -#' @param func A function to be applied to each group partition specified by GroupedData. -#' The function `func` takes as argument a key - grouping columns and -#' a data frame - a local R data.frame. -#' The output of `func` is a local R data.frame. -#' @param schema The schema of the resulting SparkDataFrame after the function is applied. -#' The schema must match to output of `func`. It has to be defined for each -#' output column with preferred output column name and corresponding data type. -#' @return a SparkDataFrame #' @rdname gapply #' @name gapply #' @export +#' @seealso \link{gapplyCollect} #' @examples #' \dontrun{ --- End diff -- @shivaram, yes, the example for calculating the average is almost the same - in groups.R I use group_by and in DataFrame not but we could also combine all in let's say in DataFrame.R and so something like this: ``` df <- createDataFrame ( list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) Here our output contains three columns, the key which is a combination of two columns with data types integer and string and the mean which is a double. schema <- structType(structField("a", "integer"), structField("c", "string"), structField("avg", "double")) df1 <- gapply( df, c("a", "c"), function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, schema) or we can also group the data and afterwards call gapply on GroupedData: gdf <- group_by(df, "a", "c") df1 <- gapply( gdf, function(key, x) { y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) }, schema) collect(df1) ``` Is this better ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] gapplyCollect - applies a R...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r68114211 --- Diff: R/pkg/R/group.R --- @@ -242,18 +235,73 @@ createMethods() setMethod("gapply", signature(x = "GroupedData"), function(x, func, schema) { -try(if (is.null(schema)) stop("schema cannot be NULL")) --- End diff -- yeah, we need it. I tried to do it like dapply but dapply forces it by signature and gapply not. will bring it back thnx --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13660: [SPARK-15672][R][DOC] R programming guide update
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13660#discussion_r67944525 --- Diff: docs/sparkr.md --- @@ -262,6 +262,83 @@ head(df) {% endhighlight %} +### Applying User-defined Function +In SparkR, we support several kinds for User-defined Functions: + + Run a given function on a large dataset using `dapply` or `dapplyCollect` + +# dapply +Apply a function to each partition of `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` --- End diff -- @sun-rui , will do gapply in another PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] gapplyCollect - applies a R...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/13760#discussion_r67936513 --- Diff: R/pkg/R/DataFrame.R --- @@ -1347,6 +1347,65 @@ setMethod("gapply", gapply(grouped, func, schema) }) +#' gapplyCollect +#' +#' Groups the SparkDataFrame using the specified columns, applies the R function to each +#' group and collects the result back to R as data.frame. +#' +#' @param x A SparkDataFrame +#' @rdname gapplyCollect +#' @name gapplyCollect +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' result <- gapplyCollect( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' colnames(y) <- c("key_a", "key_c", "mean_b") +#' y +#' }) +#' +#' Result +#' -- +#' key_a key_c mean_b +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (iris) +#' result <- gapplyCollect( +#' df, +#' list(df$"Species"), +#' function(key, x) { +#' m <- suppressWarnings(lm(Sepal_Length ~ +#' Sepal_Width + Petal_Length + Petal_Width, x)) --- End diff -- Hi @sun-rui, there is an indent (2 spaces) similar to : https://github.com/NarineK/spark/blob/21de08fea1a7b10ee40270eeda9e5a249231f0cf/R/pkg/R/DataFrame.R#L1369 What do you mean by indent ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13790: [SPARK-16082][SparkR]remove duplicated docs in dapply
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13790 @shivaram , I've noticed that I didn't associate the pull request with the jira. I've just did it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13790: remove duplicated docs in dapply
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13790 cc: @sun-rui, @shivaram @felixcheung --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13790: remove duplicated docs in dapply
GitHub user NarineK opened a pull request: https://github.com/apache/spark/pull/13790 remove duplicated docs in dapply ## What changes were proposed in this pull request? Removed unnecessary duplicated documentation in dapply and dapplyCollect. In this pull request I created separate R docs for dapply and dapplyCollect - kept dapply's documentation separate from dapplyCollect's and referred from one to another via a link. ## How was this patch tested? Existing test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NarineK/spark dapply-docs-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13790.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13790 commit 1331a09d6be5aa92f0e40bb2e65b2954db8cd953 Author: Narine Kokhlikyan Date: 2016-06-20T22:29:36Z remove duplicated docs in dapply --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13660: [SPARK-15672][R][DOC] R programming guide update
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13660 Hi @vectorijk , @felixcheung , As I was looking at the documentation generated in R I've noticed that there is some duplicated information. I'm not sure if this is the right place to ask about it, I thought you might have seen it. In I help I see the following: ``` Arguments x A SparkDataFrame func A function to be applied to each partition of the SparkDataFrame. func should have only one parameter, to which a data.frame corresponds to each partition will be passed. The output of func should be a data.frame. schema The schema of the resulting SparkDataFrame after the function is applied. It must match the output of func. x A SparkDataFrame func A function to be applied to each partition of the SparkDataFrame. func should have only one parameter, to which a data.frame corresponds to each partition will be passed. The output of func should be a data.frame. See Also Other SparkDataFrame functions: SparkDataFrame-class, [[, agg, arrange, as.data.frame, attach, cache, collect, colnames, coltypes, columns, count, createOrReplaceTempView, describe, dim, distinct, dropDuplicates, dropna, drop, dtypes, except, explain, filter, first, gapplyCollect, gapply, group_by, head, histogram, insertInto, intersect, isLocal, join, limit, merge, mutate, ncol, persist, printSchema, rename, repartition, sample, saveAsTable, selectExpr, select, showDF, show, str, take, unionAll, unpersist, withColumn, with, write.df, write.jdbc, write.json, write.parquet, write.text Other SparkDataFrame functions: SparkDataFrame-class, [[, agg, arrange, as.data.frame, attach, cache, collect, colnames, coltypes, columns, count, createOrReplaceTempView, describe, dim, distinct, dropDuplicates, dropna, drop, dtypes, except, explain, filter, first, gapplyCollect, gapply, group_by, head, histogram, insertInto, intersect, isLocal, join, limit, merge, mutate, ncol, persist, printSchema, rename, repartition, sample, saveAsTable, selectExpr, select, showDF, show, str, take, unionAll, unpersist, withColumn, with, write.df, write.jdbc, write.json, write.parquet, write.text ``` Is this on purpose ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Thanks for the quick response. I'll create one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 @vectorijk, should I do the pull request for the same jira - https://issues.apache.org/jira/browse/SPARK-15672, or should I create a new jira for the programming guide? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13760: [SPARK-16012][SparkR] GapplyCollect - applies a R...
GitHub user NarineK opened a pull request: https://github.com/apache/spark/pull/13760 [SPARK-16012][SparkR] GapplyCollect - applies a R function to each group similar to gapply and collects the result back to R data.frame ## What changes were proposed in this pull request? gapplyCollect() does gapply() on a SparkDataFrame and collect the result back to R. Compared to gapply() + collect(), gapplyCollect() offers performance optimization as well as programming convenience, as no schema is needed to be provided. This is similar to dapplyCollect(). ## How was this patch tested? Added test cases for gapplyCollect similar to dapplyCollect You can merge this pull request into a Git repository by running: $ git pull https://github.com/NarineK/spark gapplyCollect Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13760.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13760 commit ea31820c9501d1f8cba96bc7f8e0fab04e9af0a2 Author: Narine Kokhlikyan Date: 2016-06-17T10:51:15Z initial version of gapplyCollect commit f8e54dc265ad0eb66a26508bc5221606a9652e22 Author: Narine Kokhlikyan Date: 2016-06-17T11:00:05Z merged with master commit 591c4804764cdce67d22ce52ec38c74f246e738b Author: Narine Kokhlikyan Date: 2016-06-17T11:03:04Z revert .gitignore commit 37b633afdff46374d983d204f13c05c769c7f40e Author: Narine Kokhlikyan Date: 2016-06-18T08:59:05Z added test cases + improved the code commit de5dbb0be0a3fcc42096a10470c543eaf7aa6d5c Author: Narine Kokhlikyan Date: 2016-06-18T09:05:57Z fixed test case --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Hi @vectorijk, Thanks for asking, i think in a separate PR. Do you think including in #13660 would be better ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r67265581 --- Diff: R/pkg/R/DataFrame.R --- @@ -1266,6 +1266,83 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Group the SparkDataFrame using the specified columns and apply the R function to each +#' group. +#' +#' @param x A SparkDataFrame +#' @param cols Grouping columns +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. The function `func` takes as argument +#' a key - grouping columns and a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. --- End diff -- I could have in the documentation smth like: "The schema has to correspond to output SparkDataFrame. It has to be defined for each output column with preferred output column name and corresponding data type." How does this sound ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Thanks, @shivaram and @sun-rui. Yes, I can work on programming guide for gapply. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r67264756 --- Diff: R/pkg/R/DataFrame.R --- @@ -1266,6 +1266,83 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Group the SparkDataFrame using the specified columns and apply the R function to each +#' group. +#' +#' @param x A SparkDataFrame +#' @param cols Grouping columns +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. The function `func` takes as argument +#' a key - grouping columns and a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @family SparkDataFrame functions +#' @rdname gapply +#' @name gapply +#' @export +#' @examples +#' +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' +#' Fits linear models on iris dataset by grouping on the 'Species' column and +#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length' +#' and 'Petal_Width' as training features. +#' +#' df <- createDataFrame (iris) +#' schema <- structType(structField("(Intercept)", "double"), --- End diff -- The names do not have to match, we can give any name we want. Instead of "(Intercept)" I could have "(MyIntercept)". The datatype is important. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r67264555 --- Diff: R/pkg/R/DataFrame.R --- @@ -1266,6 +1266,83 @@ setMethod("dapplyCollect", ldf }) +#' gapply +#' +#' Group the SparkDataFrame using the specified columns and apply the R function to each +#' group. +#' +#' @param x A SparkDataFrame +#' @param cols Grouping columns +#' @param func A function to be applied to each group partition specified by grouping +#' column of the SparkDataFrame. The function `func` takes as argument +#' a key - grouping columns and a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. --- End diff -- The output schema is purely based on the output dataframe, if key is included in the output then we need to include the key to the schema. Basically, the schema has to match to what we want to output. If we want to output only the average in the above example, we could have: schema <- structType(structField("avg", "double")), what really matters is the data-type - it has to be double in above example, it cannot be string or character The name doesn't matter either. I could have "hello", instead "avg'. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r67197168 --- Diff: R/pkg/inst/worker/worker.R --- @@ -79,75 +127,72 @@ if (numBroadcastVars > 0) { # Timing broadcast broadcastElap <- elapsedSecs() +# Initial input timing +inputElap <- broadcastElap # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) -isDataFrame <- as.logical(SparkR:::readInt(inputCon)) +# 0 - RDD mode, 1 - dapply mode, 2 - gapply mode +mode <- SparkR:::readInt(inputCon) -# If isDataFrame, then read column names -if (isDataFrame) { +if (mode > 0) { colNames <- SparkR:::readObject(inputCon) } isEmpty <- SparkR:::readInt(inputCon) +computeInputElapsDiff <- 0 +outputComputeElapsDiff <- 0 if (isEmpty != 0) { - if (numPartitions == -1) { if (deserializer == "byte") { # Now read as many characters as described in funcLen data <- SparkR:::readDeserialize(inputCon) } else if (deserializer == "string") { data <- as.list(readLines(inputCon)) -} else if (deserializer == "row") { +} else if (deserializer == "row" && mode == 2) { + dataWithKeys <- SparkR:::readMultipleObjectsWithKeys(inputCon) + keys <- dataWithKeys$keys + data <- dataWithKeys$data +} else if (deserializer == "row"){ data <- SparkR:::readMultipleObjects(inputCon) } -# Timing reading input data for execution -inputElap <- elapsedSecs() -if (isDataFrame) { - if (deserializer == "row") { -# Transform the list of rows into a data.frame -# Note that the optional argument stringsAsFactors for rbind is -# available since R 3.2.4. So we set the global option here. -oldOpt <- getOption("stringsAsFactors") -options(stringsAsFactors = FALSE) -data <- do.call(rbind.data.frame, data) -options(stringsAsFactors = oldOpt) - -names(data) <- colNames - } else { -# Check to see if data is a valid data.frame -stopifnot(deserializer == "byte") -stopifnot(class(data) == "data.frame") - } - output <- computeFunc(data) - if (serializer == "row") { -# Transform the result data.frame back to a list of rows -output <- split(output, seq(nrow(output))) - } else { -# Serialize the ouput to a byte array -stopifnot(serializer == "byte") +inputElap <- elapsedSecs() +if (mode > 0) { + if (mode == 1) { +# Timing reading input data for execution +output <- compute(mode, partition, serializer, deserializer, NULL, +colNames, computeFunc, outputCon, data) + } else { +# gapply mode +for (i in 1:length(data)) { + # Timing reading input data for execution + inputElap <- elapsedSecs() + output <- compute(mode, partition, serializer, deserializer, keys[[i]], + colNames, computeFunc, outputCon, data[[i]]) + computeElap <- elapsedSecs() + outputResult(serializer, output, outputCon) + outputElap <- elapsedSecs() + computeInputElapsDiff <- computeInputElapsDiff + (computeElap - inputElap) + outputComputeElapsDiff <- outputComputeElapsDiff + (outputElap - computeElap) +} } } else { - output <- computeFunc(partition, data) + # Timing reading input data for execution --- End diff -- Removed the comment - moved to line: 163 https://github.com/NarineK/spark/blob/4d1cc6b0fd3dd2839a6a879f43118c5828916733/R/pkg/inst/worker/worker.R#L163 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Addressed your comments @sun-rui, please let me know if you have any comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66745283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala --- @@ -325,6 +330,71 @@ case class MapGroupsExec( } /** + * Groups the input rows together and calls the R function with each group and an iterator + * containing all elements in the group. + * The result of this function is flattened before being output. + */ +case class FlatMapGroupsInRExec( +func: Array[Byte], +packageNames: Array[Byte], +broadcastVars: Array[Broadcast[Object]], +inputSchema: StructType, +outputSchema: StructType, +keyDeserializer: Expression, +valueDeserializer: Expression, +groupingAttributes: Seq[Attribute], +dataAttributes: Seq[Attribute], +outputObjAttr: Attribute, +child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { + + override def output: Seq[Attribute] = outputObjAttr :: Nil + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val isSerializedRData = + if (outputSchema == SERIALIZED_R_DATA_SCHEMA) true else false +val serializerForR = if (!isSerializedRData) { + SerializationFormats.ROW +} else { + SerializationFormats.BYTE +} + +child.execute().mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val getKey = ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes) + val getValue = ObjectOperator.deserializeRowToObject(valueDeserializer, dataAttributes) + val outputObject = ObjectOperator.wrapObjectToRow(outputObjAttr.dataType) + val runner = new RRunner[Array[Byte]]( +func, SerializationFormats.ROW, serializerForR, packageNames, broadcastVars, +isDataFrame = true, colNames = inputSchema.fieldNames, mode = 2) + + val groupedRBytes = grouped.flatMap { case (key, rowIter) => --- End diff -- Hi @sun-rui, I did it similar to: https://github.com/NarineK/spark/blob/d51441f704e2abad7f7a3cc829664cd201b0fcd2/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala#L322 I we can also use map here. If there is no nested content flatmap will perform like map. In general for dataframe rows, can someone have a row of rows ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66732763 --- Diff: R/pkg/R/group.R --- @@ -142,3 +142,58 @@ createMethods <- function() { } createMethods() + +#' gapply +#' +#' Applies a R function to each group in the input GroupedData +#' +#' @param x a GroupedData +#' @param func A function to be applied to each group partition specified by GroupedData. +#' The function `func` takes as argument a key - grouping columns and +#' a data frame - a local R data.frame. +#' The output of `func` is a local R data.frame. +#' @param schema The schema of the resulting SparkDataFrame after the function is applied. +#' It must match the output of func. +#' @return a SparkDataFrame +#' @rdname gapply +#' @name gapply +#' @examples +#' \dontrun{ +#' Computes the arithmetic mean of the second column by grouping +#' on the first and third columns. Output the grouping values and the average. +#' +#' df <- createDataFrame ( +#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)), +#' c("a", "b", "c", "d")) +#' +#' schema <- structType(structField("a", "integer"), structField("c", "string"), +#' structField("avg", "double")) +#' df1 <- gapply( +#' df, +#' list("a", "c"), +#' function(key, x) { +#' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE) +#' }, +#' schema) +#' collect(df1) +#' +#' Result +#' -- +#' a c avg +#' 3 3 3.0 +#' 1 1 1.5 +#' } +setMethod("gapply", + signature(x = "GroupedData"), + function(x, func, schema) { +packageNamesArr <- serialize(.sparkREnv[[".packages"]], + connection = NULL) +broadcastArr <- lapply(ls(.broadcastNames), + function(name) { get(name, .broadcastNames) }) +sdf <- callJMethod(x@sgd, "flatMapGroupsInR", + serialize(cleanClosure(func), connection = NULL), + packageNamesArr, + broadcastArr, + if (is.null(schema)) { schema } else { schema$jobj }) --- End diff -- Thnx, I set an assertion. we cannot do it exactly like dapply by forcing with schema because gapply for GroupedData is slightly different from DataFrame's gapply. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66717543 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = { pivot(pivotColumn, values.asScala) } + + /** + * Applies the given serialized R function `func` to each group of data. For each unique group, + * the function will be passed the group key and an iterator that contains all of the elements in + * the group. The function can return an iterator containing elements of an arbitrary type which + * will be returned as a new [[DataFrame]]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [[org.apache.spark.sql.expressions#Aggregator Aggregator]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @since 2.0.0 + */ + private[sql] def flatMapGroupsInR( + f: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Object], + outputSchema: StructType): DataFrame = { + val broadcastVarObj = broadcastVars.map(_.asInstanceOf[Broadcast[Object]]) + val groupingNamedExpressions = groupingExprs.map(alias) + val groupingCols = groupingNamedExpressions.map(Column(_)) + val groupingDataFrame = df.select(groupingCols : _*) + val groupingAttributes = groupingNamedExpressions.map(_.toAttribute) + val realOutputSchema = if (outputSchema == null) SERIALIZED_R_DATA_SCHEMA else outputSchema --- End diff -- I see, the first option with assert is easy to do. Didn't we want to add gapplyCollect too ? :) https://docs.google.com/presentation/d/1oj17N5JaE8JDjT2as_DUI6LKutLcEHNZB29HsRGL_dM/edit#slide=id.p12 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Thanks @liancheng and @rxin ! With respect to your point, @rxin - "private[sql] signature in public APIs ." dapply added that signature to `Dataset.scala `and gapply adds it to `RelationalGroupedDataset.scala` classes. We can think of pulling those out in to helper method but maybe we can do it in a separate jira ? cc: @shivaram , @sun-rui --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66712035 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -381,6 +385,50 @@ class RelationalGroupedDataset protected[sql]( def pivot(pivotColumn: String, values: java.util.List[Any]): RelationalGroupedDataset = { pivot(pivotColumn, values.asScala) } + + /** + * Applies the given serialized R function `func` to each group of data. For each unique group, + * the function will be passed the group key and an iterator that contains all of the elements in + * the group. The function can return an iterator containing elements of an arbitrary type which + * will be returned as a new [[DataFrame]]. + * + * This function does not support partial aggregation, and as a result requires shuffling all + * the data in the [[Dataset]]. If an application intends to perform an aggregation over each + * key, it is best to use the reduce function or an + * [[org.apache.spark.sql.expressions#Aggregator Aggregator]]. + * + * Internally, the implementation will spill to disk if any given group is too large to fit into + * memory. However, users must take care to avoid materializing the whole iterator for a group + * (for example, by calling `toList`) unless they are sure that this is possible given the memory + * constraints of their cluster. + * + * @since 2.0.0 + */ + private[sql] def flatMapGroupsInR( + f: Array[Byte], + packageNames: Array[Byte], + broadcastVars: Array[Object], + outputSchema: StructType): DataFrame = { + val broadcastVarObj = broadcastVars.map(_.asInstanceOf[Broadcast[Object]]) + val groupingNamedExpressions = groupingExprs.map(alias) + val groupingCols = groupingNamedExpressions.map(Column(_)) + val groupingDataFrame = df.select(groupingCols : _*) + val groupingAttributes = groupingNamedExpressions.map(_.toAttribute) + val realOutputSchema = if (outputSchema == null) SERIALIZED_R_DATA_SCHEMA else outputSchema --- End diff -- @liancheng , thank you for the review comments. Those are good suggestions, however for: case 1: using Option[StructType] ... - I gave a try but since this method is being called from R side we need to somehow instantiate "scala.Option" class and this doesn't seem to be primitive to do in R. From R side we will basically call the following method: `org.apache.spark.sql.Dataset flatMapGroupsInR (byte[] f, byte[] packageNames, java.lang.Object[] broadcastVars, scala.Option outputSchema)` Case 2: Similar to dapply, gapply forces schema by signature, the default value doesn't really work here. But I can make the changes if it is preferred. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13610: [SPARKR][SQL][SPARK-15884] Overriding stringArgs in MapP...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13610 Thanks! Changed the title! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13610: overwritting stringArgs in MapPartitionsInR
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/13610 @sun-rui , @liancheng, @shivaram --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #13610: overwritting stringArgs in MapPartitionsInR
GitHub user NarineK opened a pull request: https://github.com/apache/spark/pull/13610 overwritting stringArgs in MapPartitionsInR ## What changes were proposed in this pull request? As discussed in https://github.com/apache/spark/pull/12836 we need to override stringArgs method in MapPartitionsInR in order to avoid too large strings generated by "stringArgs" method based on the input arguments. In this case exclude some of the input arguments: serialized R objects. ## How was this patch tested? Existing test cases You can merge this pull request into a Git repository by running: $ git pull https://github.com/NarineK/spark dapply_MapPartitionsInR_stringArgs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/13610.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13610 commit 939dbd5a17e63171ef2c18d5b23874daa75dbfcc Author: Narine Kokhlikyan Date: 2016-06-09T08:16:37Z overwritting stringArgs in MapPartitionsInR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66672823 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -286,6 +290,9 @@ case class FlatMapGroupsInR( child: LogicalPlan) extends UnaryNode with ObjectProducer{ override lazy val schema = outputSchema + + override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, outputSchema, +outputObjAttr, child) --- End diff -- I reverted it because I've noticed that I had a separate branch for that fix. https://github.com/apache/spark/commit/939dbd5a17e63171ef2c18d5b23874daa75dbfcc But if you think it is better to have it here, I can bring it back. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on ...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/12836#discussion_r66670797 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -286,6 +290,9 @@ case class FlatMapGroupsInR( child: LogicalPlan) extends UnaryNode with ObjectProducer{ override lazy val schema = outputSchema + + override protected def stringArgs: Iterator[Any] = Iterator(inputSchema, outputSchema, +outputObjAttr, child) --- End diff -- Thanks, @shivaram, I've added groupingAttributes and dataAttributes in my last commit: https://github.com/apache/spark/pull/12836/commits/0ca74fddc91a94d8b5c69c2e510afeee8531c0e2 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Hi @sun-rui, hi @shivaram, I've overwritten the stringArgs - I've pushed my changes in the following branch. I haven't created a jira yet. https://github.com/apache/spark/commit/939dbd5a17e63171ef2c18d5b23874daa75dbfcc This is how the output looks like after my modification. Do you think this is good enough ? == Parsed Logical Plan == 'SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, a), IntegerType) AS a#13504, if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, b), DoubleType) AS b#13505, if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 2, c), StringType), true) AS c#13506] +- 'MapPartitionsInR [StructField(a,IntegerType,true), StructField(b,DoubleType,true), StructField(c,StringType,true)], [StructField(a,IntegerType,true), StructField(b,DoubleType,true), StructField(c,StringType,true)], obj#13500: org.apache.spark.sql.Row +- 'DeserializeToObject unresolveddeserializer(createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, DoubleType), getcolumnbyordinal(2, StringType).toString, StructField(a,IntegerType,true), StructField(b,DoubleType,true), StructField(c,StringType,true))), obj#13496: org.apache.spark.sql.Row +- LogicalRDD [a#13485, b#13486, c#13487] ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Sure, let me try to override stringArgs and give it a try. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Thank you for the quick responses @sun-rui and @shivaram . Here is how the `dataframe.queyExection.toString` printout starts with: == Parsed Logical Plan == 'SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, a, IntegerType) AS a#13651, if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, avg, DoubleType) AS avg#13652] +- 'MapPartitionsInR [88, 10, 0, 0, 0, 2, 0, 3, 2, 3, 0, 2, 3, 0, 0, 0, 6, 3, 0, 0, 4, 2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 6, 115, 114, 99, 114, 101, 102, 0, 0, 3, 13, 0, 0, 0, 8, 0, 0, 8, 84, 0, 0, 0, 5, 0, 0, 8, 86, 0, 0, 0, 5, 0, 0, 0, 5, 0, 0, 0, 5, 0, 0, 8, 84, 0, 0, 8, 86, 0, 0, 4, 2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 7, 115, 114, 99, 102, 105, 108, 101, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, -14, 0, 0, 0, -2, 0, 0, 0, 19, 0, 0, 0, 29, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 4 . It is very possible that the large array is the serialized R function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Do you know what exactly caused this ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Hi @shivaram , hi @sun-rui , Surprisingly the `dataframe.queyExection.toString` both for dapply and gapply is prepended by a huge array, which I'm not able to understand. It seems that recent commits causes this. I've added the following code snippet in mapPartitionsInR: ``` print(df.queryExecution) print("this was dapply") ``` And this is what I see :( [ ... 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 1, 44, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 6, 115, 99, 104, 101, 109, 97, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 36, 35, 32, 78, 117, 109, 98, 101, 114, 32, 111, 102, 32, 112, 97, 114, 116, 105, 116, 105, 111, 110, 115, 32, 105, 115, 32, 101, 113, 117, 97, 108, 32, 116, 111, 32, 50, 0, 4, 0, 9, 0, 0, 0, 12, 101, 120, 112, 101, 99, 116, 95, 101, 113, 117, 97, 108, 0, 4, 0, 9, 0, 0, 0, 1, 40, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 4, 110, 114, 111, 119, 0, 4, 0, 9, 0, 0, 0, 1, 40, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 3, 100, 102, 49, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 1, 44, 0, 4, 0, 9, 0, 0, 0, 1, 50, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 1, 125, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0 , 0, 0, 0, 4, 0, 9, 0, 0, 0, 6, 117, 110, 108, 105, 110, 107, 0, 4, 0, 9, 0, 0, 0, 1, 40, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 11, 112, 97, 114, 113, 117, 101, 116, 80, 97, 116, 104, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 6, 117, 110, 108, 105, 110, 107, 0, 4, 0, 9, 0, 0, 0, 1, 40, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 8, 106, 115, 111, 110, 80, 97, 116, 104, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 6, 117, 110, 108, 105, 110, 107, 0, 4, 0, 9, 0, 0, 0, 1, 40, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 10, 106, 115, 111, 110, 80, 97, 116, 104, 78, 97, 0, 4, 0, 9, 0, 0, 0, 1, 41, 0, 4, 0, 9, 0, 0, 0, 0, 0, 4, 0, 9, 0, 0, 0, 0, 0, 0, 4, 2, 0, 0, 9, -1, 0, 0, 0, 16, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 9, 112, 97, 114, 115, 101, 68, 97, 116, 97, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 4, 2, 0, 0, 9, -1, 0, 0, 0, 16, 0, 0, 0, 2, 0, 4, 0, 9, 0, 0, 0, 11, 115, 114, 99, 102 , 105, 108, 101, 99, 111, 112, 121, 0, 4, 0, 9, 0, 0, 0, 7, 115, 114, 99, 102, 105, 108, 101, 0, 0, 0, -2, 0, 0, 4, 2, 0, 0, 9, -1, 0, 0, 0, 16, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 6, 115, 114, 99, 114, 101, 102, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, -3, 0, 0, 0, -2, 0, 0, 0, 19, 0, 0, 0, 29, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 4, 2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 120, 0, 0, 0, -5, 0, 0, 0, -2, 0, 0, 2, 6, 0, 0, 4, 2, 0, 0, 1, -1, 0, 0, 0, 19, 0, 0, 0, 2, 0, 0, 3, 13, 0, 0, 0, 8, 0, 0, 8, 84, 0, 0, 0, 17, 0, 0, 8, 84, 0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 0, 17, 0, 0, 8, 84, 0, 0, 8, 84, 0, 0, 4, 2, 0, 0, 2, -1, 0, 0, 3, -1, 0, 0, 4, 2, 0, 0, 9, -1, 0, 0, 0, 16, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 6, 115, 114, 99, 114, 101, 102, 0, 0, 0, -2, 0, 0, 3, 13, 0, 0, 0, 8, 0, 0, 8, 85, 0, 0, 0, 7, 0, 0, 8, 85, 0, 0, 0, 42, 0, 0, 0, 7, 0, 0, 0, 42, 0, 0, 8, 85, 0, 0, 8, 85, 0, 0, 4, 2, 0, 0, 2, -1, 0, 0, 3, -1, 0, 0, 4, 2, 0, 0, 9, -1, 0, 0, 0, 16, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 6, 115, 114, 99, 114, 101, 102, 0, 0, 0, -2, 0, 0, 4, 2, 0, 0, 2, -1, 0, 0, 3, -1, 0, 0, 4, 2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 11, 119, 104, 111, 108, 101, 83, 114, 99, 114, 101, 102, 0, 0, 3, 13, 0, 0, 0, 8, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 8, 86, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 1, 0, 0, 8, 86, 0, 0, 4, 2, 0, 0, 2, -1, 0, 0, 3, -1, 0, 0, 4, 2, 0, 0, 9, -1, 0, 0, 0, 16, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 6, 115, 114, 99, 114, 101, 102, 0, 0, 0, -2, 0, 0, 0, -2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 123, 0, 0, 0, 2, 0, 0, 0, 6, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 2, 60, 45, 0, 0, 0, 2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 121, 0, 0, 0, 2, 0, 0, 0, 6, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 40, 0, 0, 0, 2, 0, 0, 0, 6, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 10, 100, 97, 116, 97, 46, 102, 114, 97, 109, 101, 0, 0, 0, 2, 0, 0, 0, 6, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 91, 0, 0, 0, 2, 0, 0, 0, 6, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 36, 0, 0, 0, 2, 0, 0, 17, -1, 0, 0, 0, 2, 0, 0, 0, 1, 0, 4, 0, 9, 0, 0, 0, 1, 97, 0, 0, 0, -2, 0, 0, 0, 2, 0, 0, 0
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 I can print-out the query plan on scala side and see what does it look like for that example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 not sure why it fails. It fails for my new test case on iris dataset. The resulting dataframe has 35x2 dimensions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Locally, run-tests.sh run successfully, but it fails on jenkins ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 @shivaram, I didn't change the code, but merged with master, because prior to this the build was failing because some pyspark tests didn't pass. After my today's merge, when I run gapply test cases from R studio everything passes but if I run using ./run-tests.sh - command line, it fails on arrange ... I'm changing the test cases, so that I call order after collecting the dataframe ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12836: [SPARK-12922][SparkR][WIP] Implement gapply() on DataFra...
Github user NarineK commented on the issue: https://github.com/apache/spark/pull/12836 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org