[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21103 Added few minor comments, but nothing serious to be solved right now. LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205806876 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,233 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { --- End diff -- Maybe I'm missing something, but why we need to apply these checks if there won't be any ```null``` flag merging performed? If ```left.dataType``` and ```right.dataType``` are different, will be casted according to the ```ImplicitTypeCasts``` coercion rule. If they differ only in ```null``` flags, ```left.dataType``` could be directly returned since there won't be any array elements from ```right``` present in the result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205848094 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,317 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck --- End diff -- Why do we need this check (see the question [here](https://github.com/apache/spark/pull/21103#discussion_r205806876))? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205847180 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,317 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r206228544 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression) } /** - * Will become common base class for [[ArrayUnion]], ArrayIntersect, and ArrayExcept. + * Will become common base class for [[ArrayUnion]], ArrayIntersect, and [[ArrayExcept]]. */ abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { --- End diff -- I'm not sure to what level Presto is considered as a reference. Just FYI these operations in Presto can accept arrays of different types. ``` presto:default> SELECT array_except(ARRAY[5, 1, 7], ARRAY[7.0, 1.0, 3.0]); _col0 --- [5.0] (1 row) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21236#discussion_r207148777 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -98,6 +98,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { if (expected.isNaN) result.isNaN else expected == result case (result: Float, expected: Float) => if (expected.isNaN) result.isNaN else expected == result + case (result: UnsafeRow, expected: GenericInternalRow) => --- End diff -- Hi @srowen, ```(InternalRow, InternalRow)``` case was introduced later in [21838](https://github.com/apache/spark/pull/21838) and covers the logic of the case with ```UnsafeRow```. So we can just remove the unreachable piece of code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21982: [SPARK-23909][SQL] Add aggregate function.
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21982 Isn't this PR related to the Jira ticket [SPARK-23911](https://issues.apache.org/jira/browse/SPARK-23911)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207908454 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -205,29 +230,85 @@ case class ArrayTransform( (elementVar, indexVar) } - override def eval(input: InternalRow): Any = { -val arr = this.input.eval(input).asInstanceOf[ArrayData] -if (arr == null) { - null -} else { - val f = functionForEval - val result = new GenericArrayData(new Array[Any](arr.numElements)) - var i = 0 - while (i < arr.numElements) { -elementVar.value.set(arr.get(i, elementVar.dataType)) -if (indexVar.isDefined) { - indexVar.get.value.set(i) -} -result.update(i, f.eval(input)) -i += 1 + override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = { +val arr = inputValue.asInstanceOf[ArrayData] +val f = functionForEval +val result = new GenericArrayData(new Array[Any](arr.numElements)) +var i = 0 +while (i < arr.numElements) { + elementVar.value.set(arr.get(i, elementVar.dataType)) + if (indexVar.isDefined) { +indexVar.get.value.set(i) } - result + result.update(i, f.eval(inputRow)) + i += 1 } +result } override def prettyName: String = "transform" } +/** + * Filters entries in a map using the provided function. + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Filters entries in a map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v); + [1 -> 0, 3 -> -1] + """, +since = "2.4.0") +case class MapFilter( +input: Expression, +function: Expression) + extends MapBasedUnaryHigherOrderFunction with CodegenFallback { + + @transient val (keyType, valueType, valueContainsNull) = input.dataType match { +case MapType(kType, vType, vContainsNull) => (kType, vType, vContainsNull) +case _ => + val MapType(kType, vType, vContainsNull) = MapType.defaultConcreteType + (kType, vType, vContainsNull) + } + + @transient lazy val (keyVar, valueVar) = { +val args = function.asInstanceOf[LambdaFunction].arguments +(args.head.asInstanceOf[NamedLambdaVariable], args.tail.head.asInstanceOf[NamedLambdaVariable]) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapFilter = { +function match { + case LambdaFunction(_, _, _) => --- End diff -- Is this pattern matching necessary? If so, shouldn't ```ArrayFilter``` use it as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/22017 [SPARK-23938][SQL] Add map_zip_with function ## What changes were proposed in this pull request? This PR adds a new SQL function called ```map_zip_with```. It merges the two given maps into a single map by applying function to the pair of values with the same key. ## How was this patch tested? Added new tests into: - DataFrameFunctionsSuite.scala - HigherOrderFunctionsSuite.scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-23938 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22017.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22017 commit ef56011f03d8bae4634e5d3108e4d6502482383c Author: Marek Novotny Date: 2018-08-06T23:42:45Z [SPARK-23938][SQL] Add map_zip_with function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22017 cc @ueshin @mgaido91 @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208160707 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); --- End diff -- nit: missing space -> ```k, v``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208164141 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) --- End diff -- Is there any reason for changing ```valueContainsNull``` flag if the function transforms just keys? WDYT about: ``` val MapType(_, valueType, valueContainsNull) = input.dataType.asInstanceOf[MapType] MapType(function.dataType, valueType, valueContainsNull) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208169140 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -2071,6 +2071,158 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex4.getMessage.contains("data type mismatch: argument 3 requires int type")) } + test("transform keys function - test various primitive data types combinations") { +val dfExample1 = Seq( + Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7) +).toDF("i") + +val dfExample2 = Seq( + Map[Int, String](1 -> "a", 2 -> "b", 3 -> "c") +).toDF("x") + +val dfExample3 = Seq( + Map[String, Int]("a" -> 1, "b" -> 2, "c" -> 3) +).toDF("y") + +val dfExample4 = Seq( + Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0) +).toDF("z") + +val dfExample5 = Seq( + Map[Int, Boolean](25 -> true, 26 -> false) +).toDF("a") + +val dfExample6 = Seq( + Map[Int, String](25 -> "ab", 26 -> "cd") +).toDF("b") + +val dfExample7 = Seq( + Map[Array[Int], Boolean](Array(1, 2) -> false) +).toDF("c") + + +def testMapOfPrimitiveTypesCombination(): Unit = { + checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + v)"), +Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7 + + checkAnswer(dfExample2.selectExpr("transform_keys(x, (k, v) -> k + 1)"), +Seq(Row(Map(2 -> "a", 3 -> "b", 4 -> "c" + + checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> v * v)"), +Seq(Row(Map(1 -> 1, 4 -> 2, 9 -> 3 + + checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> length(k) + v)"), +Seq(Row(Map(2 -> 1, 3 -> 2, 4 -> 3 + + checkAnswer( +dfExample3.selectExpr("transform_keys(y, (k, v) -> concat(k, cast(v as String)))"), +Seq(Row(Map("a1" -> 1, "b2" -> 2, "c3" -> 3 + + checkAnswer(dfExample4.selectExpr("transform_keys(z, " + +"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 'three'))[k])"), +Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7 + + checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> CAST(v * 2 AS BIGINT) + k)"), +Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7 + + checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> k + v)"), +Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7 + + checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> k % 2 = 0 OR v)"), +Seq(Row(Map(true -> true, true -> false + + checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample6.selectExpr( +"transform_keys(b, (k, v) -> concat(conv(k, 10, 16) , substr(v, 1, 1)))"), +Seq(Row(Map("19a" -> "ab", "1Ac" -> "cd" + + checkAnswer(dfExample7.selectExpr("transform_keys(c, (k, v) -> array_contains(k, 3) AND v)"), +Seq(Row(Map(false -> false +} +// Test with local relation, the Project will be evaluated without codegen +testMapOfPrimitiveTypesCombination() +dfExample1.cache() +dfExample2.cache() +dfExample3.cache() +dfExample4.cache() +dfExample5.cache() +dfExample6.cache() +// Test with cached relation, the Project will be evaluated with codegen +testMapOfPrimitiveTypesCombination() --- End diff -- Do we have do that if the expression implements ```CodegenFallback```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208169969 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala --- @@ -181,4 +187,46 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper (acc, array) => coalesce(aggregate(array, acc, (acc, elem) => acc + elem), acc)), 15) } + + test("TransformKeys") { +val ai0 = Literal.create( + Map(1 -> 1, 2 -> 2, 3 -> 3), --- End diff -- It's maybe irrelevant but WDYT about adding test cases with ```null``` values? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208167785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): + TransformKeys = { +val (keyElementType, valueElementType, containsNull) = input.dataType match { + case MapType(keyType, valueType, containsNullValue) => +(keyType, valueType, containsNullValue) + case _ => +val MapType(keyType, valueType, containsNullValue) = MapType.defaultConcreteType +(keyType, valueType, containsNullValue) +} +copy(function = f(function, (keyElementType, false) :: (valueElementType, containsNull) :: Nil)) + } + + @transient lazy val (keyVar, valueVar) = { +val LambdaFunction( +_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function +(keyVar, valueVar) + } + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[MapData] +if (arr == null) { + null +} else { + val f = functionForEval + val resultKeys = new GenericArrayData(new Array[Any](arr.numElements)) + var i = 0 + while (i < arr.numElements) { +keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) +valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) +resultKeys.update(i, f.eval(input)) --- End diff -- Maybe I'm missing something, but couldn't ```f.eval(input)``` be evaluated to ```null```? Keys are not allowed to be```null```. Other functions have usually a ```null``` check and throw ```RuntimeException``` for such cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208161643 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. --- End diff -- maybe a better comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208190999 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): + TransformKeys = { +val (keyElementType, valueElementType, containsNull) = input.dataType match { + case MapType(keyType, valueType, containsNullValue) => +(keyType, valueType, containsNullValue) + case _ => +val MapType(keyType, valueType, containsNullValue) = MapType.defaultConcreteType +(keyType, valueType, containsNullValue) +} +copy(function = f(function, (keyElementType, false) :: (valueElementType, containsNull) :: Nil)) + } + + @transient lazy val (keyVar, valueVar) = { +val LambdaFunction( +_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function +(keyVar, valueVar) + } + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[MapData] +if (arr == null) { + null +} else { + val f = functionForEval + val resultKeys = new GenericArrayData(new Array[Any](arr.numElements)) + var i = 0 + while (i < arr.numElements) { +keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) +valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) +resultKeys.update(i, f.eval(input)) --- End diff -- I'm not a fun of duplicated keys either, but other functions transforming maps have the same problem. See the discussions [here](https://github.com/apache/spark/pull/21282#discussion_r187234431) and [here](https://github.com/apache/spark/pull/21258#discussion_r186410527). Example: ``` scala> spark.range(1).selectExpr("map(0,1,0,2)").show() ++ | map(0, 1, 0, 2)| ++ |[0 -> 1, 0 -> 2]| ++ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208204796 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + private def getMapType(expr: Expression) = expr.dataType match { +case m: MapType => m +case _ => MapType.defaultConcreteType + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = { +val mapData1 = value1.asInstanceOf[MapData] +val mapData2 = value2.asInstanceOf[MapData] +val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray()) +val values = new GenericArrayData(new Array[Any](keys.numElements())) +keys.foreach(keyType, (idx: Int, key: Any) => { + val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering) --- End diff -- Thanks for mentioning this! I'm not happy with the current complexity either. I've assumed that the implementation of maps will change into something with O(1) element access in future. By then, the complexity would be O(N) for types supporting equals as well and we would safe a portion of duplicated code. If you think that maps will remain like this for a long time, really like your suggestion with indexes. @ueshin What's your view on that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208211250 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + private def getMapType(expr: Expression) = expr.dataType match { +case m: MapType => m +case _ => MapType.defaultConcreteType + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = { +val mapData1 = value1.asInstanceOf[MapData] +val mapData2 = value2.asInstanceOf[MapData] +val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray()) +val values = new GenericArrayData(new Array[Any](keys.numElements())) +keys.foreach(keyType, (idx: Int, key: Any) => { + val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering) --- End diff -- Ok, I will change it. Thanks a lot! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208280351 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable --- End diff -- ```nullable``` flag is rather related to the cases when the whole map is ```null```. The case that you are referring to is handled by ```valueContainsNull``` flag of ```MapType``` (see the line [423](https://github.com/apache/spark/pull/22017/files/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25#diff-ef52827ed9b41efc1fbd056a06ef7c6aR423)). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208399620 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) + } + + @transient lazy val (arr1Var, arr2Var) = { +val LambdaFunction(_, + (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: Nil, _) = function +(arr1Var, arr2Var) + } + + override def eval(input: InternalRow): Any = { +val leftArr = left.eval(input).asInstanceOf[ArrayData] +val rightArr = right.eval(input).asInstanceOf[ArrayData] + +if (leftArr == null || rightArr == null) { --- End diff -- If ```leftArr``` is ```null```, ```right``` doesn't have to be evaluated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208403145 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) --- End diff -- If you want to support different size of input arrays (The jira ticket says: _"Both arrays must be the same length."_), what about the scenario when one array is empty and the second has elements? Shouldn't we use ```true``` instead of ```leftContainsNull``` and ```rightContainsNull```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208398313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { --- End diff -- You can utilize ```HigherOrderFunction.arrayArgumentType```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208519941 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- If we changed it to ```(Option[Int], Option[Int])```, wouldn't we need two similar ```i``` loops instead of one? My motivation for using also the ```ArrayBuffer``` is preserve the order of keys. A random order would break map comparison
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208527423 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] +val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]] +val keys = Array(keys1, keys2) +var z = 0 +while(z < 2) { + var i = 0 + val array = keys(z) + while (i < array.numElements()) { +val
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208550479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- > I really don't think so, it would be the same as now I think Let's assume that ```indexes``` are tuple for now. ```indexes(z).isEmpty``` could replace with ```indexes
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208559241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- Yeah, but my point is how to crate a new tuple from a old one without using ```_1```, ```_2```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208605882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- > you don't need to check neither whether the key is there nor the size of the output array, you just need to add them. What about duplicated keys? They can be created with other map functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208647186 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- ```indexes(z).isEmpty``` ensures that we insert always the the first occurrence of the key, which follows behavior of ```GetMapValue```. If we didn't perform such a check the last occurrence would ended up in result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208675350 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- This is valid argument, it's a rare edge case. The last question before I change it. WDYT about performance a mutable array vs. ```oldTuple.copy(_2 = newValue)```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208751629 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, +since = "2.4.0") +case class TransformValues( +input: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val map = input.dataType.asInstanceOf[MapType] +MapType(map.keyType, function.dataType, map.valueContainsNull) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + @transient val (keyType, valueType, valueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(input.dataType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): --- End diff -- nit: formatting --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208747953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); --- End diff -- nit:```(k, v)``` and maybe I would use ```v + 1``` instead of ```k + 1```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208746631 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values --- End diff -- typos: Transforms values; with Maybe can you think of a better comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208749197 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, +since = "2.4.0") +case class TransformValues( +input: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val map = input.dataType.asInstanceOf[MapType] +MapType(map.keyType, function.dataType, map.valueContainsNull) --- End diff -- ```map.valueContainsNull``` -> ```function.nullable```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208750446 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, +since = "2.4.0") +case class TransformValues( +input: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val map = input.dataType.asInstanceOf[MapType] +MapType(map.keyType, function.dataType, map.valueContainsNull) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) --- End diff -- This is already specified by ```MapBasedSimpleHigherOrderFunction```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208868838 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -225,7 +264,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa val lit = InternalRow(expected, expected) val expectedRow = UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit) - if (unsafeRow != expectedRow) { + val field = StructField("field", expression.dataType) + val dataType = StructType(field :: field :: Nil) + if (!checkResult(unsafeRow, expectedRow, dataType)) { --- End diff -- ```UnsafeRow```s are compared based on equality of backing arrays. This approach doesn't work well when ignoring order in unsafe representation of maps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208871779 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,184 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = --- End diff -- You are right, thanks! WDYT about introducing a coercion rule handling different key types? For cases like (```IntType```, ```LongType```) might be handy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208872928 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- @mgaido91 Are you comfortable with reverting back to the previous version? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21121 Sure, closing ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke closed the pull request at: https://github.com/apache/spark/pull/21121 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208941728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- Like this idea, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209188342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,186 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val keyType = +TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) --- End diff -- Even though there is a coercion rule for unification of key types. The key types may differ in nullability flags if they are complex. In theory, we could use ```==``` and ```findTightestCommonType``` in the coercion rule since there is no codegen to be optimized for ```null``` checks. But unfortunatelly, ```bind``` gets called once before execution of coercion rules, so ```findTightestCommonType``` is important for setting up a correct input type for lamda function. Maybe, we could play with order of analysis rules, but I'm not sure about all the consequences. @ueshin could shad some light on analysis rules ordering? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209311502 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -231,6 +231,15 @@ object TypeCoercion { }) } + /** + * Similar to [[findTightestCommonType]] but with string promotion. + */ + def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = { --- End diff -- If we have maps with decimals of different precision as keys. ```Cast``` will fail in analysis phase since it can't cast a key to nullable (potential lost of precision). IMHO, the type mismatch exception from this function will be more accurate. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21282: [SPARK-23934][SQL] Adding map_from_entries functi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21282#discussion_r192573167 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -118,6 +120,229 @@ case class MapValues(child: Expression) override def prettyName: String = "map_values" } +/** + * Returns a map created from the given array of entries. + */ +@ExpressionDescription( + usage = "_FUNC_(arrayOfEntries) - Returns a map created from the given array of entries.", + examples = """ +Examples: + > SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b'))); + {1:"a",2:"b"} + """, + since = "2.4.0") +case class MapFromEntries(child: Expression) extends UnaryExpression +{ + private lazy val resolvedDataType: Option[MapType] = child.dataType match { +case ArrayType( + StructType(Array( +StructField(_, keyType, false, _), +StructField(_, valueType, valueNullable, _))), + false) => Some(MapType(keyType, valueType, valueNullable)) +case _ => None + } + + override def dataType: MapType = resolvedDataType.get + + override def checkInputDataTypes(): TypeCheckResult = resolvedDataType match { +case Some(_) => TypeCheckResult.TypeCheckSuccess +case None => TypeCheckResult.TypeCheckFailure(s"'${child.sql}' is of " + + s"${child.dataType.simpleString} type. $prettyName accepts only null-free arrays " + + "of pair structs. Values of the first struct field can't contain nulls and produce " + + "duplicates.") + } + + override protected def nullSafeEval(input: Any): Any = { +val arrayData = input.asInstanceOf[ArrayData] +val length = arrayData.numElements() +val keyArray = new Array[AnyRef](length) +val keySet = new OpenHashSet[AnyRef]() +val valueArray = new Array[AnyRef](length) +var i = 0; +while (i < length) { + val entry = arrayData.getStruct(i, 2) + val key = entry.get(0, dataType.keyType) + if (key == null) { +throw new RuntimeException("The first field from a struct (key) can't be null.") + } + if (keySet.contains(key)) { --- End diff -- Ok, no problem. I've removed duplicity checks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21282: [SPARK-23934][SQL] Adding map_from_entries functi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21282#discussion_r192875181 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -308,6 +309,234 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp override def prettyName: String = "map_entries" } +/** + * Returns a map created from the given array of entries. + */ +@ExpressionDescription( + usage = "_FUNC_(arrayOfEntries) - Returns a map created from the given array of entries.", + examples = """ +Examples: + > SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b'))); + {1:"a",2:"b"} + """, + since = "2.4.0") +case class MapFromEntries(child: Expression) extends UnaryExpression { + + @transient + private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = child.dataType match { +case ArrayType( + StructType(Array( +StructField(_, keyType, keyNullable, _), +StructField(_, valueType, valueNullable, _))), + containsNull) => Some((MapType(keyType, valueType, valueNullable), keyNullable, containsNull)) +case _ => None + } + + private def nullEntries: Boolean = dataTypeDetails.get._3 + + override def dataType: MapType = dataTypeDetails.get._1 + + override def checkInputDataTypes(): TypeCheckResult = dataTypeDetails match { +case Some(_) => TypeCheckResult.TypeCheckSuccess +case None => TypeCheckResult.TypeCheckFailure(s"'${child.sql}' is of " + + s"${child.dataType.simpleString} type. $prettyName accepts only arrays of pair structs.") + } + + override protected def nullSafeEval(input: Any): Any = { +val arrayData = input.asInstanceOf[ArrayData] +val length = arrayData.numElements() +val numEntries = if (nullEntries) (0 until length).count(!arrayData.isNullAt(_)) else length +val keyArray = new Array[AnyRef](numEntries) +val valueArray = new Array[AnyRef](numEntries) +var i = 0 +var j = 0 +while (i < length) { + if (!arrayData.isNullAt(i)) { --- End diff -- Hi @ueshin, wouldn't it be better return `null` in this case? And follow null handling of other functions like `flatten`? ``` flatten(array(array(1,2), null, array(3,4))) => null ``` WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/21620 [SPARK-24636][SQL] Type coercion of arrays for array_join function ## What changes were proposed in this pull request? Presto's implementation accepts arbitrary arrays of primitive types as an input: ``` presto> SELECT array_join(ARRAY [1, 2, 3], ', '); _col0 - 1, 2, 3 (1 row) ``` This PR proposes to implement a type coercion rule for ```array_join``` function that converts arrays of primitive as well as non-primitive types to arrays of string. ## How was this patch tested? New test cases add into: - sql-tests/inputs/typeCoercion/native/arrayJoin.sql - DataFrameFunctionsSuite.scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-24636 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21620.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21620 commit 98017714e06ea7a730b1515d5bcfd583d7808a5a Author: Marek Novotny Date: 2018-06-23T10:08:42Z [SPARK-24636][SQL] Type coercion of arrays for array_join function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21620: [SPARK-24636][SQL] Type coercion of arrays for array_joi...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21620 cc @ueshin @mgaido91 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21620#discussion_r197716236 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -536,6 +536,11 @@ object TypeCoercion { case None => c } + case ArrayJoin(arr, d, nr) if !ArrayType(StringType).acceptsType(arr.dataType) && +ArrayType.acceptsType(arr.dataType) => +val containsNull = arr.dataType.asInstanceOf[ArrayType].containsNull +ArrayJoin(Cast(arr, ArrayType(StringType, containsNull)), d, nr) --- End diff -- Hi @mgaido91, to be honest, I've considered this option before submitting this PR. But I'm glad that you mentioned this approach. At least, we can discuss pros and cons of different solutions. Usage of ```ImplicitTypeCasts.implicitCast``` would enable conversion only from primitive types. I think it would be nice to support non-primitive types as well. WDYT? Re: Casting to ```StringType```: According to ```Cast.canCast``` method should be possible to cast any type to ```StringType```: **line 42:** ``` case (_, StringType) => true``` Or am I missing something? I hope test cases in *.../typeCoercion/native/arrayJoin.sql* cover to ```StringType``` conversions from all Spark types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21620#discussion_r197730826 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -536,6 +536,11 @@ object TypeCoercion { case None => c } + case ArrayJoin(arr, d, nr) if !ArrayType(StringType).acceptsType(arr.dataType) && +ArrayType.acceptsType(arr.dataType) => +val containsNull = arr.dataType.asInstanceOf[ArrayType].containsNull +ArrayJoin(Cast(arr, ArrayType(StringType, containsNull)), d, nr) --- End diff -- Ok, no problem. Let's support just arrays of primitive types for now. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21620#discussion_r197768113 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -536,6 +536,14 @@ object TypeCoercion { case None => c } + case aj @ ArrayJoin(arr, d, nr) if !ArrayType(StringType).acceptsType(arr.dataType) && +ArrayType.acceptsType(arr.dataType) => +val containsNull = arr.dataType.asInstanceOf[ArrayType].containsNull +ImplicitTypeCasts.implicitCast(arr, ArrayType(StringType, containsNull)) match { + case Some(finalDataType) => ArrayJoin(finalDataType, d, nr) --- End diff -- spot on, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/21687 [SPARK-24165][SQL] Fixing the output data type of CaseWhen expression ## What changes were proposed in this pull request? This PR is proposing a fix for the output data type of ```CaseWhen``` expression. The current implementation ignores nullability of nested types from different execution branches and returns type of the first branch. This could lead to unwanted ```NullPointerException``` from other expressions depending on a CaseWhen expression. Example: ``` val rows = new util.ArrayList[Row]() rows.add(Row(true, ("a", 1))) rows.add(Row(false, (null, 2))) val schema = StructType(Seq( StructField("cond", BooleanType, false), StructField("s", StructType(Seq( StructField("val1", StringType, true), StructField("val2", IntegerType, false) )), false) )) val df = spark.createDataFrame(rows, schema) df .select(when('cond, struct(lit("x").as("val1"), lit(10).as("val2"))).otherwise('s) as "res") .select('res.getField("val1")) .show() ``` Exception: ``` Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44) ... ``` Output schema: ``` root |-- res.val1: string (nullable = false) ``` ## How was this patch tested? New test cases added into - DataFrameSuite.scala - conditionalExpressions.scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-24165 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21687.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21687 commit 71040635723a4dc3bc55b4415261d5a7abf4ed50 Author: Marek Novotny Date: 2018-07-01T13:36:24Z [SPARK-24165][SQL] Fixing the output data type of CaseWhen expression when resolving nullability of nested types --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r199425774 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -129,7 +129,7 @@ case class CaseWhen( case Seq(dt1, dt2) => dt1.sameType(dt2) } - override def dataType: DataType = branches.head._2.dataType + override def dataType: DataType = valueTypes.reduce(TypeCoercion.findTightestCommonType(_, _).get) --- End diff -- Thanks for your suggestion, but is it the best solution? 1. It seems that this solution would also fail on the exemple in the description. The root data types in both branches are non-nullable, so ```branches.head._2.dataType``` gets called again. ``` |-- named_struct(val1, x AS `val1`, val2, 10 AS `val2`): struct (nullable = false) ||-- val1: string (nullable = false) ||-- val2: integer (nullable = false) ``` and ``` |-- s: struct (nullable = false) ||-- val1: string (nullable = true) ||-- val2: integer (nullable = false) ``` 2. Let's assume that there is no if statement and we call just ```.asNullable```. This will make all the struct fields nullable and changes ```containsNull``` to ```true``` for all ```MapTypes``` and ```ArrayTypes``` within the data type structure. Is it what we want? Personally, I think we need something that goes recursively through non-primitive types and merges ```nullable``` and ```containsNull``` flags. That's exactly what ```TypeCoercion.findTightestCommonType``` does when passing ```sameType``` equal data types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r199426921 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -129,7 +129,7 @@ case class CaseWhen( case Seq(dt1, dt2) => dt1.sameType(dt2) } - override def dataType: DataType = branches.head._2.dataType + override def dataType: DataType = valueTypes.reduce(TypeCoercion.findTightestCommonType(_, _).get) --- End diff -- Yeah, the types are ```sameType``` equal (ignoring nullability). I just leveraging ```TypeCoercion.findTightestCommonType``` for merging ```nullable``` and ```containsNull``` flags. I can introduce a new method for this purpose, but I'm afraid that the implementation will be very similar to ```TypeCoercion.findTightestCommonType```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r199427016 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala --- @@ -113,6 +113,35 @@ class ConditionalExpressionSuite extends SparkFunSuite with ExpressionEvalHelper assert(CaseWhen(Seq((c2, c4_notNull), (c3, c5))).nullable === true) } + test("case when - nullability of nested types") { --- End diff -- Ok, no problem. Will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21687: [SPARK-24165][SQL] Fixing the output data type of CaseWh...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21687 @viirya Yeah, it looks like the same problem, but It's worked around via different implementation of ```IfCoercion``` rule. This rule utilizes ```!=``` operator for comparison. So if two types differ in nullability flags, casting is applied to get the types alligned. ```CaseWhenCoercion``` rule utilizes ```sameType``` instead. We could change this rule according to the ```IfCoercion``` rule, but is it neccesary to do that? An extra ```Cast``` expression won't perform any effective casting (int to int, string to string,...). We just need to supply correct meta-data information to parent expressions to perform null-safe codegen. Personally, I would resolve this nullability problem in expressions, not it coercion rules. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r199451068 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala --- @@ -129,7 +129,7 @@ case class CaseWhen( case Seq(dt1, dt2) => dt1.sameType(dt2) } - override def dataType: DataType = branches.head._2.dataType + override def dataType: DataType = valueTypes.reduce(TypeCoercion.findTightestCommonType(_, _).get) --- End diff -- Ok, will do that :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21704: [SPARK-24734][SQL] Fix containsNull of Concat for array ...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21704 @ueshin Thanks for bringing this topic! This problem with different ```nullable```/```containsNull``` flags seems to be more generic. In [21687](https://github.com/apache/spark/pull/21687), we've touched a similar problem with ```CaseWhen``` and ```If``` expression. So I think It would nice if we could think together about a generic and consistent solution for all espressions. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21704#discussion_r200054252 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends Expression { } } - override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } --- End diff -- So far, we've identified also ```CaseWhen``` and ```If``` discussed [here](https://github.com/apache/spark/pull/21687). I've just noticed that ```Coalesce``` looks also suspicious. What is the key purpose of ```SimplifyCasts```? to remove an extra expression node or avoid casts from between to indentical types? If the second option is the purpose, what about changing ```SimplifyCasts``` rule to start replacing ```Cast``` with a new dummy cast expression that will hold only a target data type and won't perform any casting? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21704#discussion_r200113984 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends Expression { } } - override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) --- End diff -- Yes, it should work (see a [test](https://github.com/ueshin/apache-spark/blob/30d5aed41085cfb82e3da43099adf972b953ece8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala#L951) for it). Did we miss anything? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21704#discussion_r200116003 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends Expression { } } - override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) + case dt => dt +}.getOrElse(StringType) + } --- End diff -- Oh, see. In that case, it would be nice to introduce a method that will resolve the output DataType and merges ```nullable```/```containNull``` flags of non-primitive types recursively for such expressions. For the most cases we could encapsulate the function into a new trait. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21704#discussion_r200116898 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends Expression { } } - override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) --- End diff -- yeah, + ```valueContainsNull``` for ```MapType``` and ```nullable``` for ```StructField``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21704#discussion_r200134825 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends Expression { } } - override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) --- End diff -- @ueshin For ```Concat```, ```Coalesce```, etc. it seems to be that case since a coercion rule is executed if there is any nullability difference on any level of nesting. But it's not the case of ```CaseWhenCoercion``` rule, since ```sameType``` method is used for comparison. I'm wondering if the goal is to avoid generation of extra ```Cast``` expressions, shouldn't other coercion rules utilize ```sameType``` method as well? Let's assume that the result of ```concat``` is subsequently used by ```flatten```, wouldn't it lead to generation of extra null safe checks as mentioned [here](https://github.com/apache/spark/pull/21704#discussion_r200110924)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21704#discussion_r200265965 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends Expression { } } - override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + override def dataType: DataType = { +val dataTypes = children.map(_.dataType) +dataTypes.headOption.map { + case ArrayType(et, _) => +ArrayType(et, dataTypes.exists(_.asInstanceOf[ArrayType].containsNull)) --- End diff -- Please ignore the part of my previous comment regarding ```flatten``` function. The output data type of ```concat```, etc. will be the same regardless what resolves ```null``` flags. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21687: [SPARK-24165][SQL] Fixing conditional expressions to han...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21687 I've implemented a trait looking after merging ```null``` flags and added more tests. also cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r200848714 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression { } } +/** + * A trait resolving nullable, containsNull, valueContainsNull flags of the output date type. + * This logic is usually utilized by expressions combining data from multiple child expressions + * of non-primitive types (e.g. [[CaseWhen]]). + */ +trait NonPrimitiveTypeMergingExpression extends Expression +{ + /** + * A collection of data types used for resolution the output type of the expression. By default, + * data types of all child expressions. The collection must not be empty. + */ + @transient + lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType) + + /** + * A method determining whether the input types are equal ignoring nullable, containsNull and + * valueContainsNull flags and thus convenient for resolution of the final data type. + */ + def areInputTypesForMergingEqual: Boolean = { +inputTypesForMerging.lengthCompare(1) <= 0 || inputTypesForMerging.sliding(2, 1).forall { + case Seq(dt1, dt2) => dt1.sameType(dt2) +} + } + + private def mergeTwoDataTypes(dt1: DataType, dt2: DataType): DataType = (dt1, dt2) match { +case (t1, t2) if t1 == t2 => t1 +case (ArrayType(et1, cn1), ArrayType(et2, cn2)) => + ArrayType(mergeTwoDataTypes(et1, et2), cn1 || cn2) +case (MapType(kt1, vt1, vcn1), MapType(kt2, vt2, vcn2)) => + MapType(mergeTwoDataTypes(kt1, kt2), mergeTwoDataTypes(vt1, vt2), vcn1 || vcn2) +case (StructType(fields1), StructType(fields2)) => + val newFields = fields1.zip(fields2).map { --- End diff -- I can add it, but is it necessary if [we already check](https://github.com/apache/spark/pull/21687/files/3b2c083aa5614984792ca9130d4935aa82b7b510#diff-b3ebf3b40b9d4b6e98bb29ac8bb5aadaR742) that the input types are ```sameType``` equal before calling the method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r200849422 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression { } } +/** + * A trait resolving nullable, containsNull, valueContainsNull flags of the output date type. + * This logic is usually utilized by expressions combining data from multiple child expressions + * of non-primitive types (e.g. [[CaseWhen]]). + */ +trait NonPrimitiveTypeMergingExpression extends Expression +{ + /** + * A collection of data types used for resolution the output type of the expression. By default, + * data types of all child expressions. The collection must not be empty. + */ + @transient + lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType) + + /** + * A method determining whether the input types are equal ignoring nullable, containsNull and + * valueContainsNull flags and thus convenient for resolution of the final data type. + */ + def areInputTypesForMergingEqual: Boolean = { +inputTypesForMerging.lengthCompare(1) <= 0 || inputTypesForMerging.sliding(2, 1).forall { + case Seq(dt1, dt2) => dt1.sameType(dt2) +} + } + + private def mergeTwoDataTypes(dt1: DataType, dt2: DataType): DataType = (dt1, dt2) match { +case (t1, t2) if t1 == t2 => t1 +case (ArrayType(et1, cn1), ArrayType(et2, cn2)) => + ArrayType(mergeTwoDataTypes(et1, et2), cn1 || cn2) +case (MapType(kt1, vt1, vcn1), MapType(kt2, vt2, vcn2)) => + MapType(mergeTwoDataTypes(kt1, kt2), mergeTwoDataTypes(vt1, vt2), vcn1 || vcn2) +case (StructType(fields1), StructType(fields2)) => + val newFields = fields1.zip(fields2).map { +case (f1, f2) if f1 == f2 => f1 +case (StructField(name, fdt1, nl1, _), StructField(_, fdt2, nl2, _)) => + StructField(name, mergeTwoDataTypes(fdt1, fdt2), nl1 || nl2) --- End diff -- The comment of ```metadata``` field says: > The metadata should be preserved during transformation if the content of the column is not modified, e.g, in selection. So I would say no, since the expressions inheriting from this trait will combine data from multiple columns into one. If decide to extend the definition and merge metadata from multiple columns, it would make sense to also change ```findTightestCommonType``` method for cases when coercion rules are executed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r200850107 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression { } } +/** + * A trait resolving nullable, containsNull, valueContainsNull flags of the output date type. + * This logic is usually utilized by expressions combining data from multiple child expressions + * of non-primitive types (e.g. [[CaseWhen]]). + */ +trait NonPrimitiveTypeMergingExpression extends Expression --- End diff -- ```Complex``` also sounds better to me :-) But I would like to stay consistent of the terminology (see a [comment](https://github.com/apache/spark/pull/20938/files/eeab72786aa0c28b123cc21a87bd8619a63831bd#r181345710) from a different PR.) What about using some neutral name like ```NullFlagsMergingExpression``` or similar? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r200850235 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -688,10 +688,10 @@ object TypeCoercion { plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case e if !e.childrenResolved => e // Find tightest common type for If, if the true value and false value have different types. - case i @ If(pred, left, right) if left.dataType != right.dataType => -findWiderTypeForTwo(left.dataType, right.dataType).map { widestType => - val newLeft = if (left.dataType == widestType) left else Cast(left, widestType) - val newRight = if (right.dataType == widestType) right else Cast(right, widestType) + case i @ If(pred, left, right) if !i.areInputTypesForMergingEqual => +findWiderTypeForTwo(left.dataType, right.dataType).map { commonType => + val newLeft = if (left.dataType.sameType(commonType)) left else Cast(left, commonType) + val newRight = if (right.dataType.sameType(commonType)) right else Cast(right, commonType) --- End diff -- ```widestType``` sounded to me a bit misleading, but I can revert it back if you find it OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r200851845 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression { } } +/** + * A trait resolving nullable, containsNull, valueContainsNull flags of the output date type. + * This logic is usually utilized by expressions combining data from multiple child expressions + * of non-primitive types (e.g. [[CaseWhen]]). + */ +trait NonPrimitiveTypeMergingExpression extends Expression +{ + /** + * A collection of data types used for resolution the output type of the expression. By default, + * data types of all child expressions. The collection must not be empty. + */ + @transient + lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType) + + /** + * A method determining whether the input types are equal ignoring nullable, containsNull and + * valueContainsNull flags and thus convenient for resolution of the final data type. + */ + def areInputTypesForMergingEqual: Boolean = { +inputTypesForMerging.lengthCompare(1) <= 0 || inputTypesForMerging.sliding(2, 1).forall { + case Seq(dt1, dt2) => dt1.sameType(dt2) +} + } + + private def mergeTwoDataTypes(dt1: DataType, dt2: DataType): DataType = (dt1, dt2) match { +case (t1, t2) if t1 == t2 => t1 +case (ArrayType(et1, cn1), ArrayType(et2, cn2)) => --- End diff -- That sounds like a good idea! Some parts like ```fields1.length == fields2.length``` and ```resolver(field1.name, field2.name)``` seem to be extra in this context, but it could work. @ueshin WDYT about making ```findTypeForComplex``` public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r200852015 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression { } } +/** + * A trait resolving nullable, containsNull, valueContainsNull flags of the output date type. + * This logic is usually utilized by expressions combining data from multiple child expressions + * of non-primitive types (e.g. [[CaseWhen]]). + */ +trait NonPrimitiveTypeMergingExpression extends Expression --- End diff -- My previous comment seems to be irrelevant since ```complex``` has already been used (```findTypeForComplex```). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21687: [SPARK-24165][SQL] Fixing conditional expressions to han...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21687 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/21747 [SPARK-24165][SQL][branch-2.3] Fixing conditional expressions to handle nullability of nested types ## What changes were proposed in this pull request? This PR is proposing a fix for the output data type of ```If``` and ```CaseWhen``` expression. Upon till now, the implementation of exprassions has ignored nullability of nested types from different execution branches and returned the type of the first branch. This could lead to an unwanted ```NullPointerException``` from other expressions depending on a ```If```/```CaseWhen``` expression. Example: ``` val rows = new util.ArrayList[Row]() rows.add(Row(true, ("a", 1))) rows.add(Row(false, (null, 2))) val schema = StructType(Seq( StructField("cond", BooleanType, false), StructField("s", StructType(Seq( StructField("val1", StringType, true), StructField("val2", IntegerType, false) )), false) )) val df = spark.createDataFrame(rows, schema) df .select(when('cond, struct(lit("x").as("val1"), lit(10).as("val2"))).otherwise('s) as "res") .select('res.getField("val1")) .show() ``` Exception: ``` Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44) at org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44) ... ``` Output schema: ``` root |-- res.val1: string (nullable = false) ``` ## How was this patch tested? New test cases added into - DataFrameSuite.scala - conditionalExpressions.scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-24165-branch-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21747.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21747 commit a2fe63e1d48b0291feaa9fcd008654da051d1f1b Author: Marek Novotny Date: 2018-07-11T04:21:03Z [SPARK-24165][SQL][branch-2.3] Fixing conditional expressions to handle nullability of nested types --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional expres...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21747 cc @cloud-fan @ueshin @viirya @maropu --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional expres...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21747 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22017 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209514957 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,186 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val keyType = +TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) --- End diff -- IMHO, if ```checkInputDataTypes``` was executed before ```bind```, ```findTightestCommonType``` could play the same role. But yeah, ```findCommonTypeDifferentOnlyInNullFlags``` will be semantically more accurate. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209515431 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -231,6 +231,15 @@ object TypeCoercion { }) } + /** + * Similar to [[findTightestCommonType]] but with string promotion. + */ + def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = { --- End diff -- Thanks for both your PRs! I will submit changes once they get in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209533017 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,186 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val keyType = +TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) --- End diff -- Oh, I see. We also need to check the output data type of lambda functions for the expressions like ```ArrayFilter```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209643649 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -422,45 +425,49 @@ case class ArrayExists( """, since = "2.4.0") case class ArrayAggregate( -input: Expression, +argument: Expression, zero: Expression, merge: Expression, finish: Expression) extends HigherOrderFunction with CodegenFallback { - def this(input: Expression, zero: Expression, merge: Expression) = { -this(input, zero, merge, LambdaFunction.identity) + def this(argument: Expression, zero: Expression, merge: Expression) = { +this(argument, zero, merge, LambdaFunction.identity) } - override def inputs: Seq[Expression] = input :: zero :: Nil + override def arguments: Seq[Expression] = argument :: zero :: Nil + + override def argumentTypes: Seq[AbstractDataType] = ArrayType :: AnyDataType :: Nil override def functions: Seq[Expression] = merge :: finish :: Nil - override def nullable: Boolean = input.nullable || finish.nullable + override def functionTypes: Seq[AbstractDataType] = zero.dataType :: AnyDataType :: Nil + + override def nullable: Boolean = argument.nullable || finish.nullable override def dataType: DataType = finish.dataType override def checkInputDataTypes(): TypeCheckResult = { -if (!ArrayType.acceptsType(input.dataType)) { - TypeCheckResult.TypeCheckFailure( -s"argument 1 requires ${ArrayType.simpleString} type, " + - s"however, '${input.sql}' is of ${input.dataType.catalogString} type.") -} else if (!DataType.equalsStructurally( -zero.dataType, merge.dataType, ignoreNullability = true)) { - TypeCheckResult.TypeCheckFailure( -s"argument 3 requires ${zero.dataType.simpleString} type, " + - s"however, '${merge.sql}' is of ${merge.dataType.catalogString} type.") -} else { - TypeCheckResult.TypeCheckSuccess +checkArgumentDataTypes() match { --- End diff -- just a quick question: Isn't calling of ```checkArgumentDataTypes``` extra here if ```checkArgumentDataTypes``` is called as such before ```checkInputDataTypes```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209876913 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -496,3 +496,194 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + def functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType + + @transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType + + @transient lazy val keyType = +TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get --- End diff -- If ```leftKeyType``` is ```ArrayType(IntegerType, false)``` and ```rightKeyType``` is ```ArrayType(IntegerType, true)``` for instance, the coercion rule is not executed ```leftKeyType.sameType(rightKeyType) == true```. An array with nulls seems to be a valid key.: ``` scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show() +---+ |map(array(1, 2, CAST(NULL AS INT)), 12)| +---+ |[[1, 2,] -> 12]| +---+ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r209920407 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { --- End diff -- This comment is not valid anymore. The method has been removed by [#22075](https://github.com/apache/spark/pull/22075). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/22110 [SPARK-25122][SQL] Deduplication of supports equals code ## What changes were proposed in this pull request? The method ```*supportEquals``` determining whether elements of a data type could be used as items in a hash set or as keys in a hash map is duplicated across multiple collection and higher-order functions. This PR suggests to deduplicate the method. ## How was this patch tested? Run tests in: - DataFrameFunctionsSuite - CollectionExpressionsSuite - HigherOrderExpressionsSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-25122 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22110 commit dd292e8cf3ed1788793e626da3a136e9acb9d81c Author: Marek Novotny Date: 2018-08-15T08:18:05Z [SPARK-25122][SQL] Deduplication of supports equals code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22110: [SPARK-25122][SQL] Deduplication of supports equals code
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22110 cc @ueshin @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210212586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala --- @@ -115,6 +115,8 @@ protected[sql] abstract class AtomicType extends DataType { private[sql] type InternalType private[sql] val tag: TypeTag[InternalType] private[sql] val ordering: Ordering[InternalType] + + private[spark] override def supportsEquals: Boolean = true --- End diff -- Not all of the expressions utilize ```OpenHashSet``` or ```OpenHashMap```. What about ```TypeUtils``` that contains methods like ```getInterpretedOrdering```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210350632 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys --- End diff -- I'm open to any changes :) But if you want to explicitly mention the ```equals``` method, I would also mention ```hashCode``` generally needed for usage in "hash" collections. But then this not 100% true for Spark's specialized ```OpenHashSets``` and ```OpenHashMaps``` since they calculate hash by themselves. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210357474 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys --- End diff -- What about changing the name of the method to ```typeCanBeHashed```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r210368929 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -497,6 +497,62 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } +/** + * Transform Keys for every entry of the map by applying the transform_keys function. + * Returns map with transformed key entries + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +argument: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = argument.nullable + + @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType + + override def dataType: DataType = { +MapType(function.dataType, valueType, valueContainsNull) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): TransformKeys = { +copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil)) + } + + @transient lazy val LambdaFunction( + _, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function + + + override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = { +val map = argumentValue.asInstanceOf[MapData] +val f = functionForEval --- End diff -- Can't we use ```functionForEval``` directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r210366383 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -497,6 +497,62 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } +/** + * Transform Keys for every entry of the map by applying the transform_keys function. + * Returns map with transformed key entries + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +argument: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = argument.nullable + + @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType + + override def dataType: DataType = { +MapType(function.dataType, valueType, valueContainsNull) --- End diff -- nit: just in one line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r210373675 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -2302,6 +2302,97 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex5.getMessage.contains("function map_zip_with does not support ordering on type map")) } + test("transform keys function - test various primitive data types combinations") { +val dfExample1 = Seq( + Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7) +).toDF("i") + +val dfExample2 = Seq( + Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0) +).toDF("j") + +val dfExample3 = Seq( + Map[Int, Boolean](25 -> true, 26 -> false) +).toDF("x") + +val dfExample4 = Seq( + Map[Array[Int], Boolean](Array(1, 2) -> false) +).toDF("y") + + +def testMapOfPrimitiveTypesCombination(): Unit = { + checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + v)"), +Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7 + + checkAnswer(dfExample2.selectExpr("transform_keys(j, " + +"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 'three'))[k])"), +Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7 + + checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> CAST(v * 2 AS BIGINT) + k)"), +Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7 + + checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> k + v)"), +Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7 + + checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"), +Seq(Row(Map(true -> true, true -> false + + checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample4.selectExpr("transform_keys(y, (k, v) -> array_contains(k, 3) AND v)"), +Seq(Row(Map(false -> false +} +// Test with local relation, the Project will be evaluated without codegen +testMapOfPrimitiveTypesCombination() +dfExample1.cache() +dfExample2.cache() +dfExample3.cache() +dfExample4.cache() +// Test with cached relation, the Project will be evaluated with codegen +testMapOfPrimitiveTypesCombination() + } + + test("transform keys function - Invalid lambda functions and exceptions") { +val dfExample1 = Seq( + Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7) +).toDF("i") + +val dfExample2 = Seq( + Map[String, String]("a" -> "b") +).toDF("j") + +val dfExample3 = Seq( + Map[String, String]("a" -> null) +).toDF("x") + +def testInvalidLambdaFunctions(): Unit = { + val ex1 = intercept[AnalysisException] { +dfExample1.selectExpr("transform_keys(i, k -> k )") + } + assert(ex1.getMessage.contains("The number of lambda function arguments '1' does not match")) + + val ex2 = intercept[AnalysisException] { +dfExample2.selectExpr("transform_keys(j, (k, v, x) -> k + 1)") + } + assert(ex2.getMessage.contains( + "The number of lambda function arguments '3' does not match")) + + val ex3 = intercept[RuntimeException] { +dfExample3.selectExpr("transform_keys(x, (k, v) -> v)").show() + } + assert(ex3.getMessage.contains("Cannot use null as map key!")) +} + +testInvalidLambdaFunctions() +dfExample1.cache() +dfExample2.cache() +testInvalidLambdaFunctions() --- End diff -- @ueshin I would like to ask you a generic question regarding higher-order functions. Is it necessary to perform checks with codegen paths if all the newly added functions extends from ```CodegenFallback```? Eventually, is there a plan to add coden for these functions in future? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210493260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys + * of a hash map. + */ + def typeCanBeHashed(dataType: DataType): Boolean = dataType match { --- End diff -- I will change it :) Just one question to ```hashCode```. If ```case classes``` are used, ```equals``` and ```hashCode``` are generated by compiler. But if we define ```equals``` manually, shouldn't also hold ```a.equals(b) == true``` => ```a.hashCode == b.hashCode```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r210561102 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -497,6 +497,53 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } +/** + * Returns a map that applies the function to each value of the map. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> v + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, + since = "2.4.0") +case class TransformValues( +argument: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = argument.nullable + + @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType + + override def dataType: DataType = MapType(keyType, function.dataType, valueContainsNull) --- End diff -- Shouldn't the ```dataType``` be defined as ```MapType(keyType, function.dataType, function.nullable)```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210563516 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys + * of a hash map. + */ + def typeCanBeHashed(dataType: DataType): Boolean = dataType match { --- End diff -- Ok, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22126: [SPARK-23938][SQL][FOLLOW-UP][TEST] Nullabilities...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22126#discussion_r210724650 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala --- @@ -363,9 +363,9 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper left: Expression, right: Expression, f: (Expression, Expression, Expression) => Expression): Expression = { - val MapType(kt, vt1, vcn1) = left.dataType.asInstanceOf[MapType] - val MapType(_, vt2, vcn2) = right.dataType.asInstanceOf[MapType] - MapZipWith(left, right, createLambda(kt, false, vt1, vcn1, vt2, vcn2, f)) + val MapType(kt, vt1, _) = left.dataType.asInstanceOf[MapType] --- End diff -- Optional suggestion: Maybe we could remove```asInstanceOf[MapType]``` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22131: [SPARK-25141][SQL][TEST] Modify tests for higher-order f...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22131 LGTM too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22243: [MINOR] Avoid code duplication for nullable in Hi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22243#discussion_r213022884 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -155,6 +155,8 @@ trait HigherOrderFunction extends Expression with ExpectsInputTypes { */ trait SimpleHigherOrderFunction extends HigherOrderFunction { + override def nullable: Boolean = argument.nullable --- End diff -- If we moved the definition of ```nullable``` straight to ```HigherOrderFunction``` as ```arguments.exists(_.nullable)```, we could also avoid the duplicities in ```ZipWith``` and ```MapZipWith```. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org