[GitHub] [spark] HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API
HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API URL: https://github.com/apache/spark/pull/24232#discussion_r315993792 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ## @@ -1917,19 +1921,33 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { null ).toDF("i") +// transform(i, x -> x + 1) +val resA = Seq( + Row(Seq(2, 10, 9, 8)), + Row(Seq(6, 9, 10, 8, 3)), + Row(Seq.empty), + Row(null)) + +// transform(i, (x, i) -> x + i) +val resB = Seq( + Row(Seq(1, 10, 10, 10)), + Row(Seq(5, 9, 11, 10, 6)), + Row(Seq.empty), + Row(null)) + def testArrayOfPrimitiveTypeNotContainsNull(): Unit = { - checkAnswer(df.selectExpr("transform(i, x -> x + 1)"), -Seq( - Row(Seq(2, 10, 9, 8)), - Row(Seq(6, 9, 10, 8, 3)), - Row(Seq.empty), - Row(null))) - checkAnswer(df.selectExpr("transform(i, (x, i) -> x + i)"), -Seq( - Row(Seq(1, 10, 10, 10)), - Row(Seq(5, 9, 11, 10, 6)), - Row(Seq.empty), - Row(null))) + checkAnswer(df.selectExpr("transform(i, x -> x + 1)"), resA) + checkAnswer(df.selectExpr("transform(i, (x, i) -> x + i)"), resB) + + checkAnswer(df.select(transform(col("i"), x => x + 1)), resA) + checkAnswer(df.select(transform(col("i"), (x, i) => x + i)), resB) + + checkAnswer(df.select(transform(col("i"), new JFunc { +def call(x: Column) = x + 1 + })), resA) + checkAnswer(df.select(transform(col("i"), new JFunc2 { +def call(x: Column, i: Column) = x + i + })), resB) Review comment: > As for these, the arguments are scala.Function1 and org.apache.spark.api.java.function.Function, Java compiler recognizes both as SAM (Single Abstract Method) type when resolving x -> x, and they are ambiguous. Sounds like a valid concern to me. Does such `x -> x` syntax work in Java too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API
HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API URL: https://github.com/apache/spark/pull/24232#discussion_r315993279 ## File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ## @@ -1917,19 +1921,33 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { null ).toDF("i") +// transform(i, x -> x + 1) +val resA = Seq( + Row(Seq(2, 10, 9, 8)), + Row(Seq(6, 9, 10, 8, 3)), + Row(Seq.empty), + Row(null)) + +// transform(i, (x, i) -> x + i) +val resB = Seq( + Row(Seq(1, 10, 10, 10)), + Row(Seq(5, 9, 11, 10, 6)), + Row(Seq.empty), + Row(null)) + def testArrayOfPrimitiveTypeNotContainsNull(): Unit = { - checkAnswer(df.selectExpr("transform(i, x -> x + 1)"), -Seq( - Row(Seq(2, 10, 9, 8)), - Row(Seq(6, 9, 10, 8, 3)), - Row(Seq.empty), - Row(null))) - checkAnswer(df.selectExpr("transform(i, (x, i) -> x + i)"), -Seq( - Row(Seq(1, 10, 10, 10)), - Row(Seq(5, 9, 11, 10, 6)), - Row(Seq.empty), - Row(null))) + checkAnswer(df.selectExpr("transform(i, x -> x + 1)"), resA) + checkAnswer(df.selectExpr("transform(i, (x, i) -> x + i)"), resB) + + checkAnswer(df.select(transform(col("i"), x => x + 1)), resA) + checkAnswer(df.select(transform(col("i"), (x, i) => x + i)), resB) + + checkAnswer(df.select(transform(col("i"), new JFunc { +def call(x: Column) = x + 1 + })), resA) + checkAnswer(df.select(transform(col("i"), new JFunc2 { +def call(x: Column, i: Column) = x + i + })), resB) Review comment: @nvander1, let's move Java specific tests to Java's. Ideally it should be moved to there and it's better to clarify such doubts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API
HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API URL: https://github.com/apache/spark/pull/24232#discussion_r311453310 ## File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ## @@ -3343,6 +3344,296 @@ object functions { ArrayExcept(col1.expr, col2.expr) } + private def createLambda(f: Column => Column) = { +val x = UnresolvedNamedLambdaVariable(Seq("x")) +val function = f(Column(x)).expr +LambdaFunction(function, Seq(x)) + } + + private def createLambda(f: (Column, Column) => Column) = { +val x = UnresolvedNamedLambdaVariable(Seq("x")) +val y = UnresolvedNamedLambdaVariable(Seq("y")) +val function = f(Column(x), Column(y)).expr +LambdaFunction(function, Seq(x, y)) + } + + private def createLambda(f: (Column, Column, Column) => Column) = { +val x = UnresolvedNamedLambdaVariable(Seq("x")) +val y = UnresolvedNamedLambdaVariable(Seq("y")) +val z = UnresolvedNamedLambdaVariable(Seq("z")) +val function = f(Column(x), Column(y), Column(z)).expr +LambdaFunction(function, Seq(x, y, z)) + } + + private def createLambda(f: JavaFunction[Column, Column]) = { +val x = UnresolvedNamedLambdaVariable(Seq("x")) +val function = f.call(Column(x)).expr +LambdaFunction(function, Seq(x)) + } + + private def createLambda(f: JavaFunction2[Column, Column, Column]) = { +val x = UnresolvedNamedLambdaVariable(Seq("x")) +val y = UnresolvedNamedLambdaVariable(Seq("y")) +val function = f.call(Column(x), Column(y)).expr +LambdaFunction(function, Seq(x, y)) + } + + private def createLambda(f: JavaFunction3[Column, Column, Column, Column]) = { +val x = UnresolvedNamedLambdaVariable(Seq("x")) +val y = UnresolvedNamedLambdaVariable(Seq("y")) +val z = UnresolvedNamedLambdaVariable(Seq("z")) +val function = f.call(Column(x), Column(y), Column(z)).expr +LambdaFunction(function, Seq(x, y, z)) + } + + /** + * (Scala-specific) Returns an array of elements after applying a tranformation to each element + * in the input array. + * + * @group collection_funcs + */ + def transform(column: Column, f: Column => Column): Column = withExpr { +ArrayTransform(column.expr, createLambda(f)) + } + + /** + * (Scala-specific) Returns an array of elements after applying a tranformation to each element + * in the input array. + * + * @group collection_funcs + */ + def transform(column: Column, f: (Column, Column) => Column): Column = withExpr { +ArrayTransform(column.expr, createLambda(f)) + } + + /** + * (Scala-specific) Returns whether a predicate holds for one or more elements in the array. + * + * @group collection_funcs + */ + def exists(column: Column, f: Column => Column): Column = withExpr { +ArrayExists(column.expr, createLambda(f)) + } + + /** + * (Scala-specific) Returns whether a predicate holds for every element in the array. + * + * @group collection_funcs + */ + def forall(column: Column, f: Column => Column): Column = withExpr { +ArrayForAll(column.expr, createLambda(f)) + } + + /** + * (Scala-specific) Returns an array of elements for which a predicate holds in a given array. + * + * @group collection_funcs + */ + def filter(column: Column, f: Column => Column): Column = withExpr { +ArrayFilter(column.expr, createLambda(f)) + } + + /** + * (Scala-specific) Applies a binary operator to an initial state and all elements in the array, + * and reduces this to a single state. The final state is converted into the final result + * by applying a finish function. + * + * @group collection_funcs + */ + def aggregate(expr: Column, zero: Column, merge: (Column, Column) => Column, +finish: Column => Column): Column = withExpr { +ArrayAggregate( + expr.expr, + zero.expr, + createLambda(merge), + createLambda(finish) +) + } + + /** + * (Scala-specific) Applies a binary operator to an initial state and all elements in the array, + * and reduces this to a single state. + * + * @group collection_funcs + */ + def aggregate(expr: Column, zero: Column, merge: (Column, Column) => Column): Column = +aggregate(expr, zero, merge, c => c) + + /** + * (Scala-specific) Merge two given arrays, element-wise, into a signle array using a function. + * If one array is shorter, nulls are appended at the end to match the length of the longer + * array, before applying the function. + * + * @group collection_funcs + */ + def zip_with(left: Column, right: Column, f: (Column, Column) => Column): Column = withExpr { +ZipWith(left.expr, right.expr, createLambda(f)) + } + + /** + * (Scala-specific) Applies a function to every key-value pair in a map and returns + * a map with the results of those applications as the new keys for the pairs. + * + * @group collection
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API
HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API URL: https://github.com/apache/spark/pull/24232#discussion_r269970597 ## File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ## @@ -3316,6 +3316,141 @@ object functions { ArrayExcept(col1.expr, col2.expr) } + private def expressionFunction(f: Column => Column) + : Expression => Expression = +x => f(Column(x)).expr + + private def expressionFunction(f: (Column, Column) => Column) + : (Expression, Expression) => Expression = +(x, y) => f(Column(x), Column(y)).expr + + private def expressionFunction(f: (Column, Column, Column) => Column) + : (Expression, Expression, Expression) => Expression = +(x, y, z) => f(Column(x), Column(y), Column(z)).expr + + /** + * Returns an array of elements after applying a tranformation to each element + * in the input array. + * + * @group collection_funcs + */ + def transform(column: Column, f: Column => Column): Column = withExpr { +HigherOrderUtils.transform(column.expr, expressionFunction(f)) + } + + /** + * Returns an array of elements after applying a tranformation to each element + * in the input array. + * + * @group collection_funcs + */ + def transform(column: Column, f: (Column, Column) => Column): Column = withExpr { +HigherOrderUtils.transform(column.expr, expressionFunction(f)) + } + + /** + * Returns whether a predicate holds for one or more elements in the array. + * + * @group collection_funcs + */ + def exists(column: Column, f: Column => Column): Column = withExpr { Review comment: Let's add `(Scala-specific) ` at least for each doc. BTW, please take a look for style guide at https://github.com/databricks/scala-style-guide This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API
HyukjinKwon commented on a change in pull request #24232: [SPARK-27297] [SQL] Add higher order functions to scala API URL: https://github.com/apache/spark/pull/24232#discussion_r269958689 ## File path: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ## @@ -3316,6 +3316,141 @@ object functions { ArrayExcept(col1.expr, col2.expr) } + private def expressionFunction(f: Column => Column) + : Expression => Expression = +x => f(Column(x)).expr + + private def expressionFunction(f: (Column, Column) => Column) + : (Expression, Expression) => Expression = +(x, y) => f(Column(x), Column(y)).expr + + private def expressionFunction(f: (Column, Column, Column) => Column) + : (Expression, Expression, Expression) => Expression = +(x, y, z) => f(Column(x), Column(y), Column(z)).expr + + /** + * Returns an array of elements after applying a tranformation to each element + * in the input array. + * + * @group collection_funcs + */ + def transform(column: Column, f: Column => Column): Column = withExpr { +HigherOrderUtils.transform(column.expr, expressionFunction(f)) + } + + /** + * Returns an array of elements after applying a tranformation to each element + * in the input array. + * + * @group collection_funcs + */ + def transform(column: Column, f: (Column, Column) => Column): Column = withExpr { +HigherOrderUtils.transform(column.expr, expressionFunction(f)) + } + + /** + * Returns whether a predicate holds for one or more elements in the array. + * + * @group collection_funcs + */ + def exists(column: Column, f: Column => Column): Column = withExpr { Review comment: But how do we support this in Java? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org