[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions

2020-02-06 Thread GitBox
zero323 commented on a change in pull request #27433: 
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r375710113
 
 

 ##
 File path: R/pkg/R/functions.R
 ##
 @@ -3281,6 +3322,121 @@ setMethod("row_number",
 
 ## Collection functions##
 
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#'if length(...) > 1 then argument is interpreted as a nested
+#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#'yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+  jc <- newJObject(
+"org.apache.spark.sql.Column",
+newJObject(
+  
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+  list(...)
+)
+  )
+  column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#'that transforms \code{Columns} into a \code{Column}
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun) {
+  as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+  # Process function arguments
+  parameters <- formals(fun)
+  nparameters <- length(parameters)
+
+  stopifnot(
+nparameters >= 1 &
+nparameters <= 3 &
+!"..." %in% names(parameters)
+  )
+
+  args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) {
+  unresolved_named_lambda_var(p)
+  })
+
+  # Invoke function and validate return type
+  result <- do.call(fun, args)
+  stopifnot(class(result) == "Column")
+
+  # Convert both Columns to Scala expressions
+  jexpr <- as_jexpr(result)
+
+  jargs <- handledCallJStatic(
+"org.apache.spark.api.python.PythonUtils",
+"toSeq",
+handledCallJStatic(
+  "java.util.Arrays", "asList", lapply(args, as_jexpr)
+)
+  )
+
+  # Create Scala LambdaFunction
+  sparkR.newJObject(
+"org.apache.spark.sql.catalyst.expressions.LambdaFunction",
+jexpr,
+jargs,
+FALSE
+  )
+}
+
+#' Invokes higher order function expression identified by name,
+#' (relative to o.a.s.sql.catalyst.expressions)
+#'
+#' @param name character
+#' @param cols list of character or Column objects
+#' @param funs list of named list(fun = ..., expected_narg = ...)
+#' @return a \code{Column} representing name applied to cols with funs
+invoke_higher_order_function <- function(name, cols, funs) {
+  as_jexpr <- function(x) {
+if (class(x) == "character") {
+  x <- column(x)
+}
+callJMethod(x@jc, "expr")
+  }
+
+  jexpr <- do.call(sparkR.newJObject, c(
+paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."),
+lapply(cols, as_jexpr),
+lapply(funs, create_lambda)
+  ))
+
+  column(sparkR.newJObject("org.apache.spark.sql.Column", jexpr))
 
 Review comment:
   Oh, sorry...
   
   Why do we actually have this one? With other utilities (`call*`) it is not 
very useful (although lean interface for calling JVM objects can be a huge win).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions

2020-02-06 Thread GitBox
zero323 commented on a change in pull request #27433: 
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r375709246
 
 

 ##
 File path: R/pkg/R/functions.R
 ##
 @@ -3281,6 +3322,121 @@ setMethod("row_number",
 
 ## Collection functions##
 
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#'if length(...) > 1 then argument is interpreted as a nested
+#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#'yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+  jc <- newJObject(
+"org.apache.spark.sql.Column",
+newJObject(
+  
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+  list(...)
+)
+  )
+  column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#'that transforms \code{Columns} into a \code{Column}
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun) {
+  as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+  # Process function arguments
+  parameters <- formals(fun)
+  nparameters <- length(parameters)
+
+  stopifnot(
+nparameters >= 1 &
+nparameters <= 3 &
+!"..." %in% names(parameters)
+  )
+
+  args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) {
+  unresolved_named_lambda_var(p)
+  })
+
+  # Invoke function and validate return type
+  result <- do.call(fun, args)
+  stopifnot(class(result) == "Column")
+
+  # Convert both Columns to Scala expressions
+  jexpr <- as_jexpr(result)
+
+  jargs <- handledCallJStatic(
+"org.apache.spark.api.python.PythonUtils",
+"toSeq",
+handledCallJStatic(
+  "java.util.Arrays", "asList", lapply(args, as_jexpr)
+)
+  )
+
+  # Create Scala LambdaFunction
+  sparkR.newJObject(
+"org.apache.spark.sql.catalyst.expressions.LambdaFunction",
+jexpr,
+jargs,
+FALSE
+  )
+}
+
+#' Invokes higher order function expression identified by name,
+#' (relative to o.a.s.sql.catalyst.expressions)
+#'
+#' @param name character
+#' @param cols list of character or Column objects
+#' @param funs list of named list(fun = ..., expected_narg = ...)
+#' @return a \code{Column} representing name applied to cols with funs
+invoke_higher_order_function <- function(name, cols, funs) {
+  as_jexpr <- function(x) {
+if (class(x) == "character") {
+  x <- column(x)
+}
+callJMethod(x@jc, "expr")
+  }
+
+  jexpr <- do.call(sparkR.newJObject, c(
+paste("org.apache.spark.sql.catalyst.expressions", name, sep = "."),
 
 Review comment:
   Style-wise I like neither
   
   ```R
   paste0("org.apache.spark.sql.catalyst.expressions.", name)
   ```
   
   nor
   
   ```R
   paste0("org.apache.spark.sql.catalyst.expressions", ".", name)
   ```
   
   `paste` with `sep = "."`  feels like the most natural way of expressing such 
operation if one looks for generic pattern.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions

2020-02-06 Thread GitBox
zero323 commented on a change in pull request #27433: 
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r375705709
 
 

 ##
 File path: R/pkg/R/functions.R
 ##
 @@ -3281,6 +3322,121 @@ setMethod("row_number",
 
 ## Collection functions##
 
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#'if length(...) > 1 then argument is interpreted as a nested
+#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#'yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+  jc <- newJObject(
+"org.apache.spark.sql.Column",
+newJObject(
+  
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+  list(...)
+)
+  )
+  column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#'that transforms \code{Columns} into a \code{Column}
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun) {
+  as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+  # Process function arguments
+  parameters <- formals(fun)
+  nparameters <- length(parameters)
+
+  stopifnot(
 
 Review comment:
   Maybe to some lesser extent (as variation of argument type is smaller, but 
so is amount of logic required), but overall same as here  ‒ 
https://github.com/apache/spark/pull/27406#discussion_r375704703.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions

2020-02-03 Thread GitBox
zero323 commented on a change in pull request #27433: 
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r374069388
 
 

 ##
 File path: R/pkg/R/functions.R
 ##
 @@ -3281,6 +3322,126 @@ setMethod("row_number",
 
 ## Collection functions##
 
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#'if length(...) > 1 then argument is interpreted as a nested
+#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#'yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+  jc <- sparkR.newJObject(
+"org.apache.spark.sql.Column",
+sparkR.newJObject(
+  
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+  list(...)
+)
+  )
+  column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#'that transforms \code{Columns} into a \code{Column}
+#' @param expected_nargs numeric a vector of the expected
+#'number of arguments. Used for validation
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun, expected_nargs) {
+  as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+  # Process function arguments
+  parameters <- formals(fun)
+  nparameters <- length(parameters)
+
+  stopifnot(nparameters %in% expected_nargs)
 
 Review comment:
   I guess this not a place for a broader discussion about the problem, so let 
me defend this particular choice (by this I mean both Python and R 
implementation):
   
   - To create lambda placeholders we have to analyze signature ‒ this account 
for majority of related logic (weights more heavily on Python than on R). This 
part is not optional, and we mostly piggyback on it (R adds one line of code in 
`create_lambda` and one additional numeric that is passed from invoking 
wrapper).
   - Even if this check is removed we still do arity validation. This is 
primarily to keep things close to Scala and generate  placeholders with 
matching names (`x`, `y`, `z`) instead of something like [`x1`, `x2`, ...]. It 
is not strictly necessary, but makes debugging easier, and adds 5 lines of code 
(could fit in a one without significant loss of readbility, if it wasn't for 
line length restriction)
   - So we have literally < 10 lines of lightweight code (out of 400 here, 500 
in PySpark variant) to perform argument validation.  This gives us
   
   ```
   > array_filter("xs", function(x, y, z) TRUE)
Error in (function (fun, expected_nargs)  : 
 nparameters %in% expected_nargs is not TRUE 
   11.
   stop(simpleError(msg, call = if (p <- sys.parent(1L)) sys.call(p))) 
   10.
   stopifnot(nparameters %in% expected_nargs) at functions.R#3361
   9.
   (function (fun, expected_nargs) 
   {
   as_jexpr <- function(x) callJMethod(x@jc, "expr")
   parameters <- formals(fun) ... 
   8.
   do.call(create_lambda, args) at functions.R#3412
   7.
   FUN(X[[i]], ...) 
   6.
   lapply(funs, function(args) do.call(create_lambda, args)) 
   5.
   lapply(funs, function(args) do.call(create_lambda, args)) at 
functions.R#3412
   4.
   do.call(sparkR.newJObject, 
c(paste("org.apache.spark.sql.catalyst.expressions", 
   name, sep = "."), lapply(cols, as_jexpr), lapply(funs, 
function(args) do.call(create_lambda, 
   args at functions.R#3412
   3.
   invoke_higher_order_function("ArrayFilter", cols = list(x), funs = 
list(list(fun = f, 
   expected_nargs = c(1, 2 at functions.R#3510
   2.
   array_filter("xs", function(x, y, z) TRUE) at generics.R#786
   1.
   array_filter("xs", function(x, y, z) TRUE) 
   ```
   
   vs. 
   
   ```
   20/02/03 12:38:22 ERROR RBackendHandler: select on 20 failed
   java.lang.reflect.InvocationTargetException
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:164)
at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:105)
at 

[GitHub] [spark] zero323 commented on a change in pull request #27433: [SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions

2020-02-03 Thread GitBox
zero323 commented on a change in pull request #27433: 
[SPARK-30682][SPARKR][SQL] Add SparkR interface for higher order functions
URL: https://github.com/apache/spark/pull/27433#discussion_r374051458
 
 

 ##
 File path: R/pkg/R/functions.R
 ##
 @@ -3281,6 +3322,126 @@ setMethod("row_number",
 
 ## Collection functions##
 
+#' Create o.a.s.sql.expressions.UnresolvedNamedLambdaVariable,
+#' convert it to o.s.sql.Column and wrap with R Column.
+#' Used by higher order functions.
+#'
+#' @param ... character of length = 1
+#'if length(...) > 1 then argument is interpreted as a nested
+#'Column, for example \code{unresolved_named_lambda_var("a", "b", "c")}
+#'yields unresolved \code{a.b.c}
+#' @return Column object wrapping JVM UnresolvedNamedLambdaVariable
+unresolved_named_lambda_var <- function(...) {
+  jc <- sparkR.newJObject(
+"org.apache.spark.sql.Column",
+sparkR.newJObject(
+  
"org.apache.spark.sql.catalyst.expressions.UnresolvedNamedLambdaVariable",
+  list(...)
+)
+  )
+  column(jc)
+}
+
+#' Create o.a.s.sql.expressions.LambdaFunction corresponding
+#' to transformation described by func.
+#' Used by higher order functions.
+#'
+#' @param fun R \code{function} (unary, binary or ternary)
+#'that transforms \code{Columns} into a \code{Column}
+#' @param expected_nargs numeric a vector of the expected
+#'number of arguments. Used for validation
+#' @return JVM \code{LambdaFunction} object
+create_lambda <- function(fun, expected_nargs) {
+  as_jexpr <- function(x) callJMethod(x@jc, "expr")
+
+  # Process function arguments
+  parameters <- formals(fun)
+  nparameters <- length(parameters)
+
+  stopifnot(nparameters %in% expected_nargs)
+
+  stopifnot(
+nparameters >= 1 &
+nparameters <= 3 &
+!"..." %in% names(parameters)
+  )
+
+  args <- lapply(c("x", "y", "z")[seq_along(parameters)], function(p) {
+  unresolved_named_lambda_var(p)
+  })
+
+  # Invoke function and validate return type
+  result <- do.call(fun, args)
+  stopifnot(class(result) == "Column")
+
+  # Convert both Columns to Scala expressions
+  jexpr <- as_jexpr(result)
+
+  jargs <- callJStatic(
 
 Review comment:
   Nice one, didn't know about it. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org