[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions URL: https://github.com/apache/spark/pull/27433#discussion_r375710113 ## File path: R/pkg/R/functions.R ## @@ -3281,6 +3322,121 @@ setMethod("row_number", ## Collection functions## +#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable, +#' convert it to o.s.sql.Column and wrap with R Column. +#' Used by higher order functions. +#' +#' @param ... character of length = 1 +#'if length(...) > 1 then argument is interpreted as a nested +#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")} +#'yields unresolved \code{a.b.c} +#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable +unresolved_named_lambda_var <- function(...) { + jc <- newJObject( +"org.apache.spark.sql.Column", +newJObject( + "org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable", + list(...) +) + ) + column(jc) +} + +#' Create o.a.s.sql.expressions.LambdaFunction corresponding +#' to transformation described by func. +#' Used by higher order functions. +#' +#' @param fun R \code{function} (unary, binary or ternary) +#'that transforms \code{Columns} into a \code{Column} +#' @return JVM \code{LambdaFunction} object +create_lambda <- function(fun) { + as_jexpr <- function(x) callJMethod(x@jc, "expr") + + # Process function arguments + parameters <- formals(fun) + nparameters <- length(parameters) + + stopifnot( +nparameters >= 1 & +nparameters <= 3 & +!"..." %in% names(parameters) + ) + + args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) { + unresolved_named_lambda_var(p) + }) + + # Invoke function and validate return type + result <- do.call(fun, args) + stopifnot(class(result) == "Column") + + # Convert both Columns to Scala expressions + jexpr <- as_jexpr(result) + + jargs <- handledCallJStatic( +"org.apache.spark.api.python.PythonUtils", +"toSeq", +handledCallJStatic( + "java.util.Arrays", "asList", lapply(args, as_jexpr) +) + ) + + # Create Scala LambdaFunction + sparkR.newJObject( +"org.apache.spark.sql.catalyst.expressions.LambdaFunction", +jexpr, +jargs, +FALSE + ) +} + +#' Invokes higher order function expression identified by name, +#' (relative to o.a.s.sql.catalyst.expressions) +#' +#' @param name character +#' @param cols list of character or Column objects +#' @param funs list of named list(fun = ..., expected_narg = ...) +#' @return a \code{Column} representing name applied to cols with funs +invoke_higher_order_function <- function(name, cols, funs) { + as_jexpr <- function(x) { +if (class(x) == "character") { + x <- column(x) +} +callJMethod(x@jc, "expr") + } + + jexpr <- do.call(sparkR.newJObject, c( +paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."), +lapply(cols, as_jexpr), +lapply(funs, create_lambda) + )) + + column(sparkR.newJObject("org.apache.spark.sql.Column", jexpr)) Review comment: Oh, sorry... Why do we actually have this one? With other utilities (`call*`) it is not very useful (although lean interface for calling JVM objects can be a huge win). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions URL: https://github.com/apache/spark/pull/27433#discussion_r375709246 ## File path: R/pkg/R/functions.R ## @@ -3281,6 +3322,121 @@ setMethod("row_number", ## Collection functions## +#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable, +#' convert it to o.s.sql.Column and wrap with R Column. +#' Used by higher order functions. +#' +#' @param ... character of length = 1 +#'if length(...) > 1 then argument is interpreted as a nested +#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")} +#'yields unresolved \code{a.b.c} +#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable +unresolved_named_lambda_var <- function(...) { + jc <- newJObject( +"org.apache.spark.sql.Column", +newJObject( + "org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable", + list(...) +) + ) + column(jc) +} + +#' Create o.a.s.sql.expressions.LambdaFunction corresponding +#' to transformation described by func. +#' Used by higher order functions. +#' +#' @param fun R \code{function} (unary, binary or ternary) +#'that transforms \code{Columns} into a \code{Column} +#' @return JVM \code{LambdaFunction} object +create_lambda <- function(fun) { + as_jexpr <- function(x) callJMethod(x@jc, "expr") + + # Process function arguments + parameters <- formals(fun) + nparameters <- length(parameters) + + stopifnot( +nparameters >= 1 & +nparameters <= 3 & +!"..." %in% names(parameters) + ) + + args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) { + unresolved_named_lambda_var(p) + }) + + # Invoke function and validate return type + result <- do.call(fun, args) + stopifnot(class(result) == "Column") + + # Convert both Columns to Scala expressions + jexpr <- as_jexpr(result) + + jargs <- handledCallJStatic( +"org.apache.spark.api.python.PythonUtils", +"toSeq", +handledCallJStatic( + "java.util.Arrays", "asList", lapply(args, as_jexpr) +) + ) + + # Create Scala LambdaFunction + sparkR.newJObject( +"org.apache.spark.sql.catalyst.expressions.LambdaFunction", +jexpr, +jargs, +FALSE + ) +} + +#' Invokes higher order function expression identified by name, +#' (relative to o.a.s.sql.catalyst.expressions) +#' +#' @param name character +#' @param cols list of character or Column objects +#' @param funs list of named list(fun = ..., expected_narg = ...) +#' @return a \code{Column} representing name applied to cols with funs +invoke_higher_order_function <- function(name, cols, funs) { + as_jexpr <- function(x) { +if (class(x) == "character") { + x <- column(x) +} +callJMethod(x@jc, "expr") + } + + jexpr <- do.call(sparkR.newJObject, c( +paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."), Review comment: Style-wise I like neither ```R paste0("org.apache.spark.sql.catalyst.expressions.", name) ``` nor ```R paste0("org.apache.spark.sql.catalyst.expressions", ".", name) ``` `paste` with `sep = "."` feels like the most natural way of expressing such operation if one looks for generic pattern. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions URL: https://github.com/apache/spark/pull/27433#discussion_r375705709 ## File path: R/pkg/R/functions.R ## @@ -3281,6 +3322,121 @@ setMethod("row_number", ## Collection functions## +#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable, +#' convert it to o.s.sql.Column and wrap with R Column. +#' Used by higher order functions. +#' +#' @param ... character of length = 1 +#'if length(...) > 1 then argument is interpreted as a nested +#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")} +#'yields unresolved \code{a.b.c} +#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable +unresolved_named_lambda_var <- function(...) { + jc <- newJObject( +"org.apache.spark.sql.Column", +newJObject( + "org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable", + list(...) +) + ) + column(jc) +} + +#' Create o.a.s.sql.expressions.LambdaFunction corresponding +#' to transformation described by func. +#' Used by higher order functions. +#' +#' @param fun R \code{function} (unary, binary or ternary) +#'that transforms \code{Columns} into a \code{Column} +#' @return JVM \code{LambdaFunction} object +create_lambda <- function(fun) { + as_jexpr <- function(x) callJMethod(x@jc, "expr") + + # Process function arguments + parameters <- formals(fun) + nparameters <- length(parameters) + + stopifnot( Review comment: Maybe to some lesser extent (as variation of argument type is smaller, but so is amount of logic required), but overall same as here ‒ https://github.com/apache/spark/pull/27406#discussion_r375704703. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions URL: https://github.com/apache/spark/pull/27433#discussion_r374069388 ## File path: R/pkg/R/functions.R ## @@ -3281,6 +3322,126 @@ setMethod("row_number", ## Collection functions## +#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable, +#' convert it to o.s.sql.Column and wrap with R Column. +#' Used by higher order functions. +#' +#' @param ... character of length = 1 +#'if length(...) > 1 then argument is interpreted as a nested +#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")} +#'yields unresolved \code{a.b.c} +#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable +unresolved_named_lambda_var <- function(...) { + jc <- sparkR.newJObject( +"org.apache.spark.sql.Column", +sparkR.newJObject( + "org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable", + list(...) +) + ) + column(jc) +} + +#' Create o.a.s.sql.expressions.LambdaFunction corresponding +#' to transformation described by func. +#' Used by higher order functions. +#' +#' @param fun R \code{function} (unary, binary or ternary) +#'that transforms \code{Columns} into a \code{Column} +#' @param expected_nargs numeric a vector of the expected +#'number of arguments. Used for validation +#' @return JVM \code{LambdaFunction} object +create_lambda <- function(fun, expected_nargs) { + as_jexpr <- function(x) callJMethod(x@jc, "expr") + + # Process function arguments + parameters <- formals(fun) + nparameters <- length(parameters) + + stopifnot(nparameters %in% expected_nargs) Review comment: I guess this not a place for a broader discussion about the problem, so let me defend this particular choice (by this I mean both Python and R implementation): - To create lambda placeholders we have to analyze signature ‒ this account for majority of related logic (weights more heavily on Python than on R). This part is not optional, and we mostly piggyback on it (R adds one line of code in `create_lambda` and one additional numeric that is passed from invoking wrapper). - Even if this check is removed we still do arity validation. This is primarily to keep things close to Scala and generate placeholders with matching names (`x`, `y`, `z`) instead of something like [`x1`, `x2`, ...]. It is not strictly necessary, but makes debugging easier, and adds 5 lines of code (could fit in a one without significant loss of readbility, if it wasn't for line length restriction) - So we have literally < 10 lines of lightweight code (out of 400 here, 500 in PySpark variant) to perform argument validation. This gives us ``` > array_filter("xs", function(x, y, z) TRUE) Error in (function (fun, expected_nargs) : nparameters %in% expected_nargs is not TRUE 11. stop(simpleError(msg, call = if (p <- sys.parent(1L)) sys.call(p))) 10. stopifnot(nparameters %in% expected_nargs) at functions.R#3361 9. (function (fun, expected_nargs) { as_jexpr <- function(x) callJMethod(x@jc, "expr") parameters <- formals(fun) ... 8. do.call(create_lambda, args) at functions.R#3412 7. FUN(X[[i]], ...) 6. lapply(funs, function(args) do.call(create_lambda, args)) 5. lapply(funs, function(args) do.call(create_lambda, args)) at functions.R#3412 4. do.call(sparkR.newJObject, c(paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."), lapply(cols, as_jexpr), lapply(funs, function(args) do.call(create_lambda, args at functions.R#3412 3. invoke_higher_order_function("ArrayFilter", cols = list(x), funs = list(list(fun = f, expected_nargs = c(1, 2 at functions.R#3510 2. array_filter("xs", function(x, y, z) TRUE) at generics.R#786 1. array_filter("xs", function(x, y, z) TRUE) ``` vs. ``` 20/02/03 12:38:22 ERROR RBackendHandler: select on 20 failed java.lang.reflect.InvocationTargetException at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:164) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:105) at
[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions URL: https://github.com/apache/spark/pull/27433#discussion_r374051458 ## File path: R/pkg/R/functions.R ## @@ -3281,6 +3322,126 @@ setMethod("row_number", ## Collection functions## +#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable, +#' convert it to o.s.sql.Column and wrap with R Column. +#' Used by higher order functions. +#' +#' @param ... character of length = 1 +#'if length(...) > 1 then argument is interpreted as a nested +#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")} +#'yields unresolved \code{a.b.c} +#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable +unresolved_named_lambda_var <- function(...) { + jc <- sparkR.newJObject( +"org.apache.spark.sql.Column", +sparkR.newJObject( + "org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable", + list(...) +) + ) + column(jc) +} + +#' Create o.a.s.sql.expressions.LambdaFunction corresponding +#' to transformation described by func. +#' Used by higher order functions. +#' +#' @param fun R \code{function} (unary, binary or ternary) +#'that transforms \code{Columns} into a \code{Column} +#' @param expected_nargs numeric a vector of the expected +#'number of arguments. Used for validation +#' @return JVM \code{LambdaFunction} object +create_lambda <- function(fun, expected_nargs) { + as_jexpr <- function(x) callJMethod(x@jc, "expr") + + # Process function arguments + parameters <- formals(fun) + nparameters <- length(parameters) + + stopifnot(nparameters %in% expected_nargs) + + stopifnot( +nparameters >= 1 & +nparameters <= 3 & +!"..." %in% names(parameters) + ) + + args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) { + unresolved_named_lambda_var(p) + }) + + # Invoke function and validate return type + result <- do.call(fun, args) + stopifnot(class(result) == "Column") + + # Convert both Columns to Scala expressions + jexpr <- as_jexpr(result) + + jargs <- callJStatic( Review comment: Nice one, didn't know about it. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org