[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21103
  
Added few minor comments, but nothing serious to be solved right now.

LGTM



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205806876
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

Maybe I'm missing something, but why we need to apply these checks if there 
won't be any ```null``` flag merging performed?

If ```left.dataType``` and ```right.dataType``` are different, will be 
casted according to the ```ImplicitTypeCasts``` coercion rule. If they differ 
only in ```null``` flags, ```left.dataType``` could be directly returned since 
there won't be any array elements from ```right``` present in the result.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205848094
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
--- End diff --

Why do we need this check (see the question 
[here](https://github.com/apache/spark/pull/21103#discussion_r205806876))?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205847180
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206228544
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression)
 }
 
 /**
- * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
ArrayExcept.
+ * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
[[ArrayExcept]].
  */
 abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
--- End diff --

I'm not sure to what level Presto is considered as a reference. Just FYI 
these operations in Presto can accept arrays of different types.
```
presto:default> SELECT array_except(ARRAY[5, 1, 7], ARRAY[7.0, 1.0, 3.0]);
 _col0 
---
 [5.0] 
(1 row)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function

2018-08-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21236#discussion_r207148777
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -98,6 +98,9 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
 if (expected.isNaN) result.isNaN else expected == result
   case (result: Float, expected: Float) =>
 if (expected.isNaN) result.isNaN else expected == result
+  case (result: UnsafeRow, expected: GenericInternalRow) =>
--- End diff --

Hi @srowen,
```(InternalRow, InternalRow)``` case was introduced later in 
[21838](https://github.com/apache/spark/pull/21838) and covers the logic of the 
case with ```UnsafeRow```. So we can just remove the unreachable piece of code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21982: [SPARK-23909][SQL] Add aggregate function.

2018-08-03 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21982
  
Isn't this PR related to the Jira ticket 
[SPARK-23911](https://issues.apache.org/jira/browse/SPARK-23911)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function

2018-08-06 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21986#discussion_r207908454
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -205,29 +230,85 @@ case class ArrayTransform(
 (elementVar, indexVar)
   }
 
-  override def eval(input: InternalRow): Any = {
-val arr = this.input.eval(input).asInstanceOf[ArrayData]
-if (arr == null) {
-  null
-} else {
-  val f = functionForEval
-  val result = new GenericArrayData(new Array[Any](arr.numElements))
-  var i = 0
-  while (i < arr.numElements) {
-elementVar.value.set(arr.get(i, elementVar.dataType))
-if (indexVar.isDefined) {
-  indexVar.get.value.set(i)
-}
-result.update(i, f.eval(input))
-i += 1
+  override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = 
{
+val arr = inputValue.asInstanceOf[ArrayData]
+val f = functionForEval
+val result = new GenericArrayData(new Array[Any](arr.numElements))
+var i = 0
+while (i < arr.numElements) {
+  elementVar.value.set(arr.get(i, elementVar.dataType))
+  if (indexVar.isDefined) {
+indexVar.get.value.set(i)
   }
-  result
+  result.update(i, f.eval(inputRow))
+  i += 1
 }
+result
   }
 
   override def prettyName: String = "transform"
 }
 
+/**
+ * Filters entries in a map using the provided function.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Filters entries in a map using the 
function.",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v);
+   [1 -> 0, 3 -> -1]
+  """,
+since = "2.4.0")
+case class MapFilter(
+input: Expression,
+function: Expression)
+  extends MapBasedUnaryHigherOrderFunction with CodegenFallback {
+
+  @transient val (keyType, valueType, valueContainsNull) = input.dataType 
match {
+case MapType(kType, vType, vContainsNull) => (kType, vType, 
vContainsNull)
+case _ =>
+  val MapType(kType, vType, vContainsNull) = 
MapType.defaultConcreteType
+  (kType, vType, vContainsNull)
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val args = function.asInstanceOf[LambdaFunction].arguments
+(args.head.asInstanceOf[NamedLambdaVariable], 
args.tail.head.asInstanceOf[NamedLambdaVariable])
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapFilter = {
+function match {
+  case LambdaFunction(_, _, _) =>
--- End diff --

Is this pattern matching necessary? If so, shouldn't ```ArrayFilter``` use 
it as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-06 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/22017

[SPARK-23938][SQL] Add map_zip_with function

## What changes were proposed in this pull request?

This PR adds a new SQL function called ```map_zip_with```. It merges the 
two given maps into a single map by applying function to the pair of values 
with the same key. 

## How was this patch tested?

Added new tests into:
- DataFrameFunctionsSuite.scala
- HigherOrderFunctionsSuite.scala


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-23938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22017.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22017


commit ef56011f03d8bae4634e5d3108e4d6502482383c
Author: Marek Novotny 
Date:   2018-08-06T23:42:45Z

[SPARK-23938][SQL] Add map_zip_with function




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-06 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22017
  
cc @ueshin @mgaido91 @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208160707
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
--- End diff --

nit: missing space -> ```k, v```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208164141
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
--- End diff --

Is there any reason for changing ```valueContainsNull``` flag if the 
function transforms just keys? WDYT about:
```
val MapType(_, valueType, valueContainsNull) = 
input.dataType.asInstanceOf[MapType]
MapType(function.dataType, valueType, valueContainsNull)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208169140
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -2071,6 +2071,158 @@ class DataFrameFunctionsSuite extends QueryTest 
with SharedSQLContext {
 assert(ex4.getMessage.contains("data type mismatch: argument 3 
requires int type"))
   }
 
+  test("transform keys function - test various primitive data types 
combinations") {
+val dfExample1 = Seq(
+  Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7)
+).toDF("i")
+
+val dfExample2 = Seq(
+  Map[Int, String](1 -> "a", 2 -> "b", 3 -> "c")
+).toDF("x")
+
+val dfExample3 = Seq(
+  Map[String, Int]("a" -> 1, "b" -> 2, "c" -> 3)
+).toDF("y")
+
+val dfExample4 = Seq(
+  Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0)
+).toDF("z")
+
+val dfExample5 = Seq(
+  Map[Int, Boolean](25 -> true, 26 -> false)
+).toDF("a")
+
+val dfExample6 = Seq(
+  Map[Int, String](25 -> "ab", 26 -> "cd")
+).toDF("b")
+
+val dfExample7 = Seq(
+  Map[Array[Int], Boolean](Array(1, 2) -> false)
+).toDF("c")
+
+
+def testMapOfPrimitiveTypesCombination(): Unit = {
+  checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + 
v)"),
+Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(x, (k, v) -> k + 
1)"),
+Seq(Row(Map(2 -> "a", 3 -> "b", 4 -> "c"
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> v * 
v)"),
+Seq(Row(Map(1 -> 1, 4 -> 2, 9 -> 3
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> 
length(k) + v)"),
+Seq(Row(Map(2 -> 1, 3 -> 2, 4 -> 3
+
+  checkAnswer(
+dfExample3.selectExpr("transform_keys(y, (k, v) -> concat(k, 
cast(v as String)))"),
+Seq(Row(Map("a1" -> 1, "b2" -> 2, "c3" -> 3
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(z, " +
+"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 
'three'))[k])"),
+Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> 
CAST(v * 2 AS BIGINT) + k)"),
+Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> k + 
v)"),
+Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7
+
+  checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) ->  k % 
2 = 0 OR v)"),
+Seq(Row(Map(true -> true, true -> false
+
+  checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample6.selectExpr(
+"transform_keys(b, (k, v) ->  concat(conv(k, 10, 16) , substr(v, 
1, 1)))"),
+Seq(Row(Map("19a" -> "ab", "1Ac" -> "cd"
+
+  checkAnswer(dfExample7.selectExpr("transform_keys(c, (k, v) -> 
array_contains(k, 3) AND v)"),
+Seq(Row(Map(false -> false
+}
+// Test with local relation, the Project will be evaluated without 
codegen
+testMapOfPrimitiveTypesCombination()
+dfExample1.cache()
+dfExample2.cache()
+dfExample3.cache()
+dfExample4.cache()
+dfExample5.cache()
+dfExample6.cache()
+// Test with cached relation, the Project will be evaluated with 
codegen
+testMapOfPrimitiveTypesCombination()
--- End diff --

Do we have do that if the expression implements ```CodegenFallback```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208169969
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
 ---
@@ -181,4 +187,46 @@ class HigherOrderFunctionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 (acc, array) => coalesce(aggregate(array, acc, (acc, elem) => acc 
+ elem), acc)),
   15)
   }
+
+  test("TransformKeys") {
+val ai0 = Literal.create(
+  Map(1 -> 1, 2 -> 2, 3 -> 3),
--- End diff --

It's maybe irrelevant but WDYT about adding test cases with ```null``` 
values?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208167785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
+  TransformKeys = {
+val (keyElementType, valueElementType, containsNull) = input.dataType 
match {
+  case MapType(keyType, valueType, containsNullValue) =>
+(keyType, valueType, containsNullValue)
+  case _ =>
+val MapType(keyType, valueType, containsNullValue) = 
MapType.defaultConcreteType
+(keyType, valueType, containsNullValue)
+}
+copy(function = f(function, (keyElementType, false) :: 
(valueElementType, containsNull) :: Nil))
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val LambdaFunction(
+_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+(keyVar, valueVar)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[MapData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val resultKeys = new GenericArrayData(new 
Array[Any](arr.numElements))
+  var i = 0
+  while (i < arr.numElements) {
+keyVar.value.set(arr.keyArray().get(i, keyVar.dataType))
+valueVar.value.set(arr.valueArray().get(i, valueVar.dataType))
+resultKeys.update(i, f.eval(input))
--- End diff --

Maybe I'm missing something, but couldn't ```f.eval(input)``` be evaluated 
to ```null```? Keys are not allowed to be```null```. Other functions have 
usually a ```null``` check and throw ```RuntimeException``` for such cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208161643
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
--- End diff --

maybe a better comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208190999
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
+  TransformKeys = {
+val (keyElementType, valueElementType, containsNull) = input.dataType 
match {
+  case MapType(keyType, valueType, containsNullValue) =>
+(keyType, valueType, containsNullValue)
+  case _ =>
+val MapType(keyType, valueType, containsNullValue) = 
MapType.defaultConcreteType
+(keyType, valueType, containsNullValue)
+}
+copy(function = f(function, (keyElementType, false) :: 
(valueElementType, containsNull) :: Nil))
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val LambdaFunction(
+_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+(keyVar, valueVar)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[MapData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val resultKeys = new GenericArrayData(new 
Array[Any](arr.numElements))
+  var i = 0
+  while (i < arr.numElements) {
+keyVar.value.set(arr.keyArray().get(i, keyVar.dataType))
+valueVar.value.set(arr.valueArray().get(i, valueVar.dataType))
+resultKeys.update(i, f.eval(input))
--- End diff --

I'm not a fun of duplicated keys either, but other functions transforming 
maps have the same problem. See the discussions 
[here](https://github.com/apache/spark/pull/21282#discussion_r187234431) and 
[here](https://github.com/apache/spark/pull/21258#discussion_r186410527).

Example:
```
scala> spark.range(1).selectExpr("map(0,1,0,2)").show()
++
| map(0, 1, 0, 2)|
++
|[0 -> 1, 0 -> 2]|
++
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208204796
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  private def getMapType(expr: Expression) = expr.dataType match {
+case m: MapType => m
+case _ => MapType.defaultConcreteType
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: 
Any): Any = {
+val mapData1 = value1.asInstanceOf[MapData]
+val mapData2 = value2.asInstanceOf[MapData]
+val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
+val values = new GenericArrayData(new Array[Any](keys.numElements()))
+keys.foreach(keyType, (idx: Int, key: Any) => {
+  val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, 
leftValueType, ordering)
--- End diff --

Thanks for mentioning this! I'm not happy with the current complexity 
either. I've assumed that the implementation of maps will change into something 
with O(1) element access in future. By then, the complexity would be O(N) for 
types supporting equals as well and we would safe a portion of duplicated code.

If you think that maps will remain like this for a long time, really like 
your suggestion with indexes.

@ueshin What's your view on that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208211250
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  private def getMapType(expr: Expression) = expr.dataType match {
+case m: MapType => m
+case _ => MapType.defaultConcreteType
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: 
Any): Any = {
+val mapData1 = value1.asInstanceOf[MapData]
+val mapData2 = value2.asInstanceOf[MapData]
+val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
+val values = new GenericArrayData(new Array[Any](keys.numElements()))
+keys.foreach(keyType, (idx: Int, key: Any) => {
+  val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, 
leftValueType, ordering)
--- End diff --

Ok, I will change it. Thanks a lot!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208280351
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

```nullable``` flag is rather related to the cases when the whole map is 
```null```. The case that you are referring to is handled by 
```valueContainsNull``` flag of ```MapType``` (see the line 
[423](https://github.com/apache/spark/pull/22017/files/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25#diff-ef52827ed9b41efc1fbd056a06ef7c6aR423)).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208399620
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+val (rightElementType, rightContainsNull) = right.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+copy(function = f(function,
+  (leftElementType, leftContainsNull) :: (rightElementType, 
rightContainsNull) :: Nil))
+  }
+
+  @transient lazy val (arr1Var, arr2Var) = {
+val LambdaFunction(_,
+  (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: 
Nil, _) = function
+(arr1Var, arr2Var)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val leftArr = left.eval(input).asInstanceOf[ArrayData]
+val rightArr = right.eval(input).asInstanceOf[ArrayData]
+
+if (leftArr == null || rightArr == null) {
--- End diff --

If ```leftArr``` is ```null```, ```right``` doesn't have to be evaluated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208403145
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+val (rightElementType, rightContainsNull) = right.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+copy(function = f(function,
+  (leftElementType, leftContainsNull) :: (rightElementType, 
rightContainsNull) :: Nil))
--- End diff --

If you want to support different size of input arrays (The jira ticket 
says: _"Both arrays must be the same length."_), what about the scenario when 
one array is empty and the second has elements? Shouldn't we use ```true``` 
instead of  ```leftContainsNull``` and ```rightContainsNull```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208398313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
--- End diff --

You can utilize ```HigherOrderFunction.arrayArgumentType```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208519941
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

If we changed it to ```(Option[Int], Option[Int])```, wouldn't we need two 
similar ```i``` loops instead of one?

My motivation for using also the ```ArrayBuffer``` is preserve the order of 
keys. A random order would break map comparison 

[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208527423
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
+val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
+val keys = Array(keys1, keys2)
+var z = 0
+while(z < 2) {
+  var i = 0
+  val array = keys(z)
+  while (i < array.numElements()) {
+val

[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208550479
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

> I really don't think so, it would be the same as now I think

Let's assume that ```indexes``` are tuple for now. ```indexes(z).isEmpty``` 
could replace with ```indexes

[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208559241
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

Yeah, but my point is how to crate a new tuple from a old one without using 
```_1```, ```_2```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208605882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

> you don't need to check neither whether the key is there nor the size of 
the output array, you just need to add them.

What about duplicated keys? They can be created with other map functions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208647186
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

```indexes(z).isEmpty``` ensures that we insert always the the first 
occurrence of the key, which follows behavior of ```GetMapValue```. If we 
didn't perform such a check the last occurrence would ended up in result.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208675350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

This is valid argument, it's a rare edge case. The last question before I 
change it. WDYT about performance a mutable array vs. ```oldTuple.copy(_2 = 
newValue)```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208751629
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+map(array(1, 2, 3), array(2, 3, 4))
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+since = "2.4.0")
+case class TransformValues(
+input: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val map = input.dataType.asInstanceOf[MapType]
+MapType(map.keyType, function.dataType, map.valueContainsNull)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  @transient val (keyType, valueType, valueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(input.dataType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
--- End diff --

nit: formatting


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208747953
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
--- End diff --

nit:```(k, v)``` and maybe I would use ```v + 1``` instead of ```k + 1```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208746631
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
--- End diff --

typos: Transforms values; with 
Maybe can you think of a better comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208749197
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+map(array(1, 2, 3), array(2, 3, 4))
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+since = "2.4.0")
+case class TransformValues(
+input: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val map = input.dataType.asInstanceOf[MapType]
+MapType(map.keyType, function.dataType, map.valueContainsNull)
--- End diff --

```map.valueContainsNull``` -> ```function.nullable```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208750446
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+map(array(1, 2, 3), array(2, 3, 4))
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+since = "2.4.0")
+case class TransformValues(
+input: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val map = input.dataType.asInstanceOf[MapType]
+MapType(map.keyType, function.dataType, map.valueContainsNull)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
--- End diff --

This is already specified by ```MapBasedSimpleHigherOrderFunction```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208868838
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -225,7 +264,9 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks with PlanTestBa
   val lit = InternalRow(expected, expected)
   val expectedRow =
 UnsafeProjection.create(Array(expression.dataType, 
expression.dataType)).apply(lit)
-  if (unsafeRow != expectedRow) {
+  val field = StructField("field", expression.dataType)
+  val dataType = StructType(field :: field :: Nil)
+  if (!checkResult(unsafeRow, expectedRow, dataType)) {
--- End diff --

```UnsafeRow```s are compared based on equality of backing arrays. This 
approach doesn't work well when ignoring order in unsafe representation of maps.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208871779
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,184 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
--- End diff --

You are right, thanks!

WDYT about introducing a coercion rule handling different key types? For 
cases like (```IntType```, ```LongType```) might be handy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208872928
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

@mgaido91 Are you comfortable with reverting back to the previous version?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21121
  
Sure, closing ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-08-09 Thread mn-mikke
Github user mn-mikke closed the pull request at:

https://github.com/apache/spark/pull/21121


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208941728
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

Like this idea, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-10 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209188342
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,186 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (rightKeyType, rightValueType, 
rightValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val keyType =
+TypeCoercion.findTightestCommonType(leftKeyType, 
rightKeyType).getOrElse(NullType)
--- End diff --

Even though there is a coercion rule for unification of key types. The key 
types may differ in nullability flags if they are complex. In theory, we could 
use ```==``` and ```findTightestCommonType``` in the coercion rule  since there 
is no codegen to be optimized for ```null``` checks. But unfortunatelly, 
```bind``` gets called once before execution of coercion rules, so 
```findTightestCommonType``` is important for setting up a correct input type 
for lamda function.

Maybe, we could play with order of analysis rules, but I'm not sure about 
all the consequences. @ueshin could shad some light on analysis rules ordering?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-10 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209311502
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -231,6 +231,15 @@ object TypeCoercion {
   })
   }
 
+  /**
+   * Similar to [[findTightestCommonType]] but with string promotion.
+   */
+  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): 
Option[DataType] = {
--- End diff --

If we have maps with decimals of different precision as keys. ```Cast``` 
will fail in analysis phase since it can't cast a key to nullable (potential 
lost of precision). IMHO, the type mismatch exception from this function will 
be more accurate. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21282: [SPARK-23934][SQL] Adding map_from_entries functi...

2018-06-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21282#discussion_r192573167
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -118,6 +120,229 @@ case class MapValues(child: Expression)
   override def prettyName: String = "map_values"
 }
 
+/**
+ * Returns a map created from the given array of entries.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(arrayOfEntries) - Returns a map created from the given 
array of entries.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));
+   {1:"a",2:"b"}
+  """,
+  since = "2.4.0")
+case class MapFromEntries(child: Expression) extends UnaryExpression
+{
+  private lazy val resolvedDataType: Option[MapType] = child.dataType 
match {
+case ArrayType(
+  StructType(Array(
+StructField(_, keyType, false, _),
+StructField(_, valueType, valueNullable, _))),
+  false) => Some(MapType(keyType, valueType, valueNullable))
+case _ => None
+  }
+
+  override def dataType: MapType = resolvedDataType.get
+
+  override def checkInputDataTypes(): TypeCheckResult = resolvedDataType 
match {
+case Some(_) => TypeCheckResult.TypeCheckSuccess
+case None => TypeCheckResult.TypeCheckFailure(s"'${child.sql}' is of " 
+
+  s"${child.dataType.simpleString} type. $prettyName accepts only 
null-free arrays " +
+  "of pair structs. Values of the first struct field can't contain 
nulls and produce " +
+  "duplicates.")
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val arrayData = input.asInstanceOf[ArrayData]
+val length = arrayData.numElements()
+val keyArray = new Array[AnyRef](length)
+val keySet = new OpenHashSet[AnyRef]()
+val valueArray = new Array[AnyRef](length)
+var i = 0;
+while (i < length) {
+  val entry = arrayData.getStruct(i, 2)
+  val key = entry.get(0, dataType.keyType)
+  if (key == null) {
+throw new RuntimeException("The first field from a struct (key) 
can't be null.")
+  }
+  if (keySet.contains(key)) {
--- End diff --

Ok, no problem. I've removed duplicity checks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21282: [SPARK-23934][SQL] Adding map_from_entries functi...

2018-06-04 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21282#discussion_r192875181
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -308,6 +309,234 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
   override def prettyName: String = "map_entries"
 }
 
+/**
+ * Returns a map created from the given array of entries.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(arrayOfEntries) - Returns a map created from the given 
array of entries.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(struct(1, 'a'), struct(2, 'b')));
+   {1:"a",2:"b"}
+  """,
+  since = "2.4.0")
+case class MapFromEntries(child: Expression) extends UnaryExpression {
+
+  @transient
+  private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = 
child.dataType match {
+case ArrayType(
+  StructType(Array(
+StructField(_, keyType, keyNullable, _),
+StructField(_, valueType, valueNullable, _))),
+  containsNull) => Some((MapType(keyType, valueType, valueNullable), 
keyNullable, containsNull))
+case _ => None
+  }
+
+  private def nullEntries: Boolean = dataTypeDetails.get._3
+
+  override def dataType: MapType = dataTypeDetails.get._1
+
+  override def checkInputDataTypes(): TypeCheckResult = dataTypeDetails 
match {
+case Some(_) => TypeCheckResult.TypeCheckSuccess
+case None => TypeCheckResult.TypeCheckFailure(s"'${child.sql}' is of " 
+
+  s"${child.dataType.simpleString} type. $prettyName accepts only 
arrays of pair structs.")
+  }
+
+  override protected def nullSafeEval(input: Any): Any = {
+val arrayData = input.asInstanceOf[ArrayData]
+val length = arrayData.numElements()
+val numEntries = if (nullEntries) (0 until 
length).count(!arrayData.isNullAt(_)) else length
+val keyArray = new Array[AnyRef](numEntries)
+val valueArray = new Array[AnyRef](numEntries)
+var i = 0
+var j = 0
+while (i < length) {
+  if (!arrayData.isNullAt(i)) {
--- End diff --

Hi @ueshin,
wouldn't it be better return `null` in this case? And follow null handling 
of other functions like `flatten`? 

```
flatten(array(array(1,2), null, array(3,4))) => null
```
WDYT?
   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...

2018-06-23 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/21620

[SPARK-24636][SQL] Type coercion of arrays for array_join function

## What changes were proposed in this pull request?
Presto's implementation accepts arbitrary arrays of primitive types as an 
input:

```
presto> SELECT array_join(ARRAY [1, 2, 3], ', ');
_col0
-
1, 2, 3
(1 row)
```

This PR proposes to implement a type coercion rule for ```array_join``` 
function that converts arrays of primitive as well as non-primitive types to 
arrays of string.  

## How was this patch tested?

New test cases add into:
- sql-tests/inputs/typeCoercion/native/arrayJoin.sql
- DataFrameFunctionsSuite.scala


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-24636

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21620


commit 98017714e06ea7a730b1515d5bcfd583d7808a5a
Author: Marek Novotny 
Date:   2018-06-23T10:08:42Z

[SPARK-24636][SQL] Type coercion of arrays for array_join function




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21620: [SPARK-24636][SQL] Type coercion of arrays for array_joi...

2018-06-23 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21620
  
cc @ueshin @mgaido91 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...

2018-06-25 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21620#discussion_r197716236
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -536,6 +536,11 @@ object TypeCoercion {
   case None => c
 }
 
+  case ArrayJoin(arr, d, nr) if 
!ArrayType(StringType).acceptsType(arr.dataType) &&
+ArrayType.acceptsType(arr.dataType) =>
+val containsNull = 
arr.dataType.asInstanceOf[ArrayType].containsNull
+ArrayJoin(Cast(arr, ArrayType(StringType, containsNull)), d, nr)
--- End diff --

Hi @mgaido91,
to be honest, I've considered this option before submitting this PR. But 
I'm glad that you mentioned this approach. At least, we can discuss pros and 
cons of different solutions. Usage of ```ImplicitTypeCasts.implicitCast``` 
would enable conversion only from primitive types. I think it would be nice to 
support non-primitive types as well. WDYT?

Re: Casting to ```StringType```: According to ```Cast.canCast``` method 
should be possible to cast any type to ```StringType```:
**line 42:** ```   case (_, StringType) => true```
Or am I missing something? I hope test cases in 
*.../typeCoercion/native/arrayJoin.sql* cover to ```StringType``` conversions 
from all Spark types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...

2018-06-25 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21620#discussion_r197730826
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -536,6 +536,11 @@ object TypeCoercion {
   case None => c
 }
 
+  case ArrayJoin(arr, d, nr) if 
!ArrayType(StringType).acceptsType(arr.dataType) &&
+ArrayType.acceptsType(arr.dataType) =>
+val containsNull = 
arr.dataType.asInstanceOf[ArrayType].containsNull
+ArrayJoin(Cast(arr, ArrayType(StringType, containsNull)), d, nr)
--- End diff --

Ok, no problem. Let's support just arrays of primitive types for now. 
Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21620: [SPARK-24636][SQL] Type coercion of arrays for ar...

2018-06-25 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21620#discussion_r197768113
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -536,6 +536,14 @@ object TypeCoercion {
   case None => c
 }
 
+  case aj @ ArrayJoin(arr, d, nr) if 
!ArrayType(StringType).acceptsType(arr.dataType) &&
+ArrayType.acceptsType(arr.dataType) =>
+val containsNull = 
arr.dataType.asInstanceOf[ArrayType].containsNull
+ImplicitTypeCasts.implicitCast(arr, ArrayType(StringType, 
containsNull)) match {
+  case Some(finalDataType) => ArrayJoin(finalDataType, d, nr)
--- End diff --

spot on, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...

2018-07-01 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/21687

[SPARK-24165][SQL] Fixing the output data type of CaseWhen expression

## What changes were proposed in this pull request?
This PR is proposing a fix for the output data type of ```CaseWhen``` 
expression. The current implementation ignores nullability of nested types from 
different execution branches and returns type of the first branch.

This could lead to unwanted ```NullPointerException``` from other 
expressions depending on a CaseWhen expression.

Example:
```
val rows = new util.ArrayList[Row]()
rows.add(Row(true, ("a", 1)))
rows.add(Row(false, (null, 2)))
val schema = StructType(Seq(
  StructField("cond", BooleanType, false),
  StructField("s", StructType(Seq(
StructField("val1", StringType, true),
StructField("val2", IntegerType, false)
  )), false)
))

val df = spark.createDataFrame(rows, schema)

df
  .select(when('cond, struct(lit("x").as("val1"), 
lit(10).as("val2"))).otherwise('s) as "res")
  .select('res.getField("val1"))
  .show()
```
Exception:
```
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
at 
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
...
```
Output schema:
```
root
 |-- res.val1: string (nullable = false)
```

## How was this patch tested?
New test cases added into
- DataFrameSuite.scala
- conditionalExpressions.scala


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-24165

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21687.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21687


commit 71040635723a4dc3bc55b4415261d5a7abf4ed50
Author: Marek Novotny 
Date:   2018-07-01T13:36:24Z

[SPARK-24165][SQL] Fixing the output data type of CaseWhen expression when 
resolving nullability of nested types




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...

2018-07-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r199425774
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -129,7 +129,7 @@ case class CaseWhen(
 case Seq(dt1, dt2) => dt1.sameType(dt2)
   }
 
-  override def dataType: DataType = branches.head._2.dataType
+  override def dataType: DataType = 
valueTypes.reduce(TypeCoercion.findTightestCommonType(_, _).get)
--- End diff --

Thanks for your suggestion, but is it the best solution?
1. It seems that this solution would also fail on the exemple in the 
description. The root data types in both branches are non-nullable, so 
```branches.head._2.dataType``` gets called again.
```
 |-- named_struct(val1, x AS `val1`, val2, 10 AS `val2`): struct (nullable 
= false)
 ||-- val1: string (nullable = false)
 ||-- val2: integer (nullable = false)
```
and
```
 |-- s: struct (nullable = false)
 ||-- val1: string (nullable = true)
 ||-- val2: integer (nullable = false)
```
2. Let's assume that there is no if statement and we call just 
```.asNullable```. This will make all the struct fields nullable and changes 
```containsNull``` to ```true``` for all ```MapTypes``` and ```ArrayTypes``` 
within the data type structure. Is it what we want? 

Personally, I think we need something that goes recursively through 
non-primitive types and merges ```nullable``` and ```containsNull``` flags. 
That's exactly what ```TypeCoercion.findTightestCommonType``` does when passing 
```sameType``` equal data types.
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...

2018-07-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r199426921
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -129,7 +129,7 @@ case class CaseWhen(
 case Seq(dt1, dt2) => dt1.sameType(dt2)
   }
 
-  override def dataType: DataType = branches.head._2.dataType
+  override def dataType: DataType = 
valueTypes.reduce(TypeCoercion.findTightestCommonType(_, _).get)
--- End diff --

Yeah, the types are ```sameType``` equal (ignoring nullability). I just 
leveraging ```TypeCoercion.findTightestCommonType``` for merging ```nullable``` 
and ```containsNull``` flags. 

I can introduce a new method for this purpose, but I'm afraid that the 
implementation will be very similar to 
```TypeCoercion.findTightestCommonType```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...

2018-07-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r199427016
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ConditionalExpressionSuite.scala
 ---
@@ -113,6 +113,35 @@ class ConditionalExpressionSuite extends SparkFunSuite 
with ExpressionEvalHelper
 assert(CaseWhen(Seq((c2, c4_notNull), (c3, c5))).nullable === true)
   }
 
+  test("case when - nullability of nested types") {
--- End diff --

Ok, no problem. Will do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21687: [SPARK-24165][SQL] Fixing the output data type of CaseWh...

2018-07-02 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21687
  
@viirya Yeah, it looks like the same problem, but It's worked around via 
different implementation of ```IfCoercion``` rule. This rule utilizes ```!=``` 
operator for comparison.  So if two types differ in nullability flags, casting 
is applied to get the types alligned.

```CaseWhenCoercion``` rule utilizes ```sameType``` instead. We could 
change this rule according to the ```IfCoercion``` rule, but is it neccesary to 
do that? 
An extra ```Cast``` expression won't perform any effective casting (int to 
int, string to string,...). We just need to supply correct meta-data 
information to parent expressions to perform null-safe codegen.

Personally, I would resolve this nullability problem in expressions, not it 
coercion rules. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing the output data type of...

2018-07-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r199451068
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
 ---
@@ -129,7 +129,7 @@ case class CaseWhen(
 case Seq(dt1, dt2) => dt1.sameType(dt2)
   }
 
-  override def dataType: DataType = branches.head._2.dataType
+  override def dataType: DataType = 
valueTypes.reduce(TypeCoercion.findTightestCommonType(_, _).get)
--- End diff --

Ok, will do that :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21704: [SPARK-24734][SQL] Fix containsNull of Concat for array ...

2018-07-03 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21704
  
@ueshin Thanks for bringing this topic! This problem with different 
```nullable```/```containsNull``` flags seems to be more generic. In 
[21687](https://github.com/apache/spark/pull/21687), we've touched a similar 
problem with ```CaseWhen``` and ```If``` expression. So I think It would nice 
if we could think together about a generic and consistent solution for all 
espressions. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...

2018-07-04 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21704#discussion_r200054252
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 }
   }
 
-  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+  override def dataType: DataType = {
+val dataTypes = children.map(_.dataType)
+dataTypes.headOption.map {
+  case ArrayType(et, _) =>
+ArrayType(et, 
dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
+  case dt => dt
+}.getOrElse(StringType)
+  }
--- End diff --

So far, we've identified also ```CaseWhen``` and  ```If``` discussed 
[here](https://github.com/apache/spark/pull/21687). I've just noticed that 
```Coalesce``` looks also suspicious.

What is the key purpose of ```SimplifyCasts```? to remove an extra 
expression node or avoid casts from between to indentical types? If the second 
option is the purpose, what about changing ```SimplifyCasts``` rule to start 
replacing  ```Cast``` with a new dummy cast expression that will hold only a 
target data type and won't perform any casting?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...

2018-07-04 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21704#discussion_r200113984
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 }
   }
 
-  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+  override def dataType: DataType = {
+val dataTypes = children.map(_.dataType)
+dataTypes.headOption.map {
+  case ArrayType(et, _) =>
+ArrayType(et, 
dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
--- End diff --

Yes, it should work (see a 
[test](https://github.com/ueshin/apache-spark/blob/30d5aed41085cfb82e3da43099adf972b953ece8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala#L951)
 for it). Did we miss anything?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...

2018-07-04 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21704#discussion_r200116003
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 }
   }
 
-  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+  override def dataType: DataType = {
+val dataTypes = children.map(_.dataType)
+dataTypes.headOption.map {
+  case ArrayType(et, _) =>
+ArrayType(et, 
dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
+  case dt => dt
+}.getOrElse(StringType)
+  }
--- End diff --

Oh, see. In that case, it would be nice to introduce a method that will 
resolve the output DataType and merges ```nullable```/```containNull``` flags 
of non-primitive types recursively for such expressions. For the most cases we 
could encapsulate the function into a new trait. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...

2018-07-04 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21704#discussion_r200116898
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 }
   }
 
-  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+  override def dataType: DataType = {
+val dataTypes = children.map(_.dataType)
+dataTypes.headOption.map {
+  case ArrayType(et, _) =>
+ArrayType(et, 
dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
--- End diff --

yeah, + ```valueContainsNull``` for ```MapType``` and ```nullable``` for 
```StructField```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...

2018-07-04 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21704#discussion_r200134825
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 }
   }
 
-  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+  override def dataType: DataType = {
+val dataTypes = children.map(_.dataType)
+dataTypes.headOption.map {
+  case ArrayType(et, _) =>
+ArrayType(et, 
dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
--- End diff --

@ueshin For ```Concat```, ```Coalesce```, etc.  it seems to be that case 
since a coercion rule is executed if there is any nullability difference on any 
level of nesting. But it's not the case of ```CaseWhenCoercion``` rule, since 
```sameType``` method is used for comparison.

I'm wondering if the goal is to avoid generation of extra ```Cast``` 
expressions, shouldn't other coercion rules utilize ```sameType``` method as 
well? Let's assume that the result of ```concat``` is subsequently used by 
```flatten```, wouldn't  it lead to generation of extra null safe checks as 
mentioned 
[here](https://github.com/apache/spark/pull/21704#discussion_r200110924)? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21704: [SPARK-24734][SQL] Fix containsNull of Concat for...

2018-07-05 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21704#discussion_r200265965
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2007,7 +2007,14 @@ case class Concat(children: Seq[Expression]) extends 
Expression {
 }
   }
 
-  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+  override def dataType: DataType = {
+val dataTypes = children.map(_.dataType)
+dataTypes.headOption.map {
+  case ArrayType(et, _) =>
+ArrayType(et, 
dataTypes.exists(_.asInstanceOf[ArrayType].containsNull))
--- End diff --

Please ignore the part of my previous comment regarding ```flatten``` 
function. The output data type of ```concat```, etc. will be the same 
regardless what resolves ```null``` flags.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21687: [SPARK-24165][SQL] Fixing conditional expressions to han...

2018-07-06 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21687
  
I've implemented a trait looking after merging ```null``` flags and added 
more tests.

also cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r200848714
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression {
   }
 }
 
+/**
+ * A trait resolving nullable, containsNull, valueContainsNull flags of 
the output date type.
+ * This logic is usually utilized by expressions combining data from 
multiple child expressions
+ * of non-primitive types (e.g. [[CaseWhen]]).
+ */
+trait NonPrimitiveTypeMergingExpression extends Expression
+{
+  /**
+   * A collection of data types used for resolution the output type of the 
expression. By default,
+   * data types of all child expressions. The collection must not be empty.
+   */
+  @transient
+  lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType)
+
+  /**
+   * A method determining whether the input types are equal ignoring 
nullable, containsNull and
+   * valueContainsNull flags and thus convenient for resolution of the 
final data type.
+   */
+  def areInputTypesForMergingEqual: Boolean = {
+inputTypesForMerging.lengthCompare(1) <= 0 || 
inputTypesForMerging.sliding(2, 1).forall {
+  case Seq(dt1, dt2) => dt1.sameType(dt2)
+}
+  }
+
+  private def mergeTwoDataTypes(dt1: DataType, dt2: DataType): DataType = 
(dt1, dt2) match {
+case (t1, t2) if t1 == t2 => t1
+case (ArrayType(et1, cn1), ArrayType(et2, cn2)) =>
+  ArrayType(mergeTwoDataTypes(et1, et2), cn1 || cn2)
+case (MapType(kt1, vt1, vcn1), MapType(kt2, vt2, vcn2)) =>
+  MapType(mergeTwoDataTypes(kt1, kt2), mergeTwoDataTypes(vt1, vt2), 
vcn1 || vcn2)
+case (StructType(fields1), StructType(fields2)) =>
+  val newFields = fields1.zip(fields2).map {
--- End diff --

I can add it, but is it necessary if [we already 
check](https://github.com/apache/spark/pull/21687/files/3b2c083aa5614984792ca9130d4935aa82b7b510#diff-b3ebf3b40b9d4b6e98bb29ac8bb5aadaR742)
 that the input types are ```sameType``` equal before calling the method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r200849422
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression {
   }
 }
 
+/**
+ * A trait resolving nullable, containsNull, valueContainsNull flags of 
the output date type.
+ * This logic is usually utilized by expressions combining data from 
multiple child expressions
+ * of non-primitive types (e.g. [[CaseWhen]]).
+ */
+trait NonPrimitiveTypeMergingExpression extends Expression
+{
+  /**
+   * A collection of data types used for resolution the output type of the 
expression. By default,
+   * data types of all child expressions. The collection must not be empty.
+   */
+  @transient
+  lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType)
+
+  /**
+   * A method determining whether the input types are equal ignoring 
nullable, containsNull and
+   * valueContainsNull flags and thus convenient for resolution of the 
final data type.
+   */
+  def areInputTypesForMergingEqual: Boolean = {
+inputTypesForMerging.lengthCompare(1) <= 0 || 
inputTypesForMerging.sliding(2, 1).forall {
+  case Seq(dt1, dt2) => dt1.sameType(dt2)
+}
+  }
+
+  private def mergeTwoDataTypes(dt1: DataType, dt2: DataType): DataType = 
(dt1, dt2) match {
+case (t1, t2) if t1 == t2 => t1
+case (ArrayType(et1, cn1), ArrayType(et2, cn2)) =>
+  ArrayType(mergeTwoDataTypes(et1, et2), cn1 || cn2)
+case (MapType(kt1, vt1, vcn1), MapType(kt2, vt2, vcn2)) =>
+  MapType(mergeTwoDataTypes(kt1, kt2), mergeTwoDataTypes(vt1, vt2), 
vcn1 || vcn2)
+case (StructType(fields1), StructType(fields2)) =>
+  val newFields = fields1.zip(fields2).map {
+case (f1, f2) if f1 == f2 => f1
+case (StructField(name, fdt1, nl1, _), StructField(_, fdt2, nl2, 
_)) =>
+  StructField(name, mergeTwoDataTypes(fdt1, fdt2), nl1 || nl2)
--- End diff --

The comment of ```metadata``` field says:
> The metadata should be preserved during transformation if the content of 
the column is not modified, e.g, in selection.

So I would say no, since the expressions inheriting from this trait will 
combine data from multiple columns into one.

If decide to extend the definition and merge metadata from multiple 
columns, it would make sense to also change  ```findTightestCommonType``` 
method for cases when coercion rules are executed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r200850107
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression {
   }
 }
 
+/**
+ * A trait resolving nullable, containsNull, valueContainsNull flags of 
the output date type.
+ * This logic is usually utilized by expressions combining data from 
multiple child expressions
+ * of non-primitive types (e.g. [[CaseWhen]]).
+ */
+trait NonPrimitiveTypeMergingExpression extends Expression
--- End diff --

```Complex``` also sounds better to me :-) But I would like to stay 
consistent of the terminology (see a 
[comment](https://github.com/apache/spark/pull/20938/files/eeab72786aa0c28b123cc21a87bd8619a63831bd#r181345710)
 from a different PR.) What about using some neutral name like 
```NullFlagsMergingExpression``` or similar?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r200850235
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -688,10 +688,10 @@ object TypeCoercion {
 plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
   case e if !e.childrenResolved => e
   // Find tightest common type for If, if the true value and false 
value have different types.
-  case i @ If(pred, left, right) if left.dataType != right.dataType =>
-findWiderTypeForTwo(left.dataType, right.dataType).map { 
widestType =>
-  val newLeft = if (left.dataType == widestType) left else 
Cast(left, widestType)
-  val newRight = if (right.dataType == widestType) right else 
Cast(right, widestType)
+  case i @ If(pred, left, right) if !i.areInputTypesForMergingEqual =>
+findWiderTypeForTwo(left.dataType, right.dataType).map { 
commonType =>
+  val newLeft = if (left.dataType.sameType(commonType)) left else 
Cast(left, commonType)
+  val newRight = if (right.dataType.sameType(commonType)) right 
else Cast(right, commonType)
--- End diff --

```widestType``` sounded to me a bit misleading, but I can revert it back 
if you find it OK.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r200851845
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression {
   }
 }
 
+/**
+ * A trait resolving nullable, containsNull, valueContainsNull flags of 
the output date type.
+ * This logic is usually utilized by expressions combining data from 
multiple child expressions
+ * of non-primitive types (e.g. [[CaseWhen]]).
+ */
+trait NonPrimitiveTypeMergingExpression extends Expression
+{
+  /**
+   * A collection of data types used for resolution the output type of the 
expression. By default,
+   * data types of all child expressions. The collection must not be empty.
+   */
+  @transient
+  lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType)
+
+  /**
+   * A method determining whether the input types are equal ignoring 
nullable, containsNull and
+   * valueContainsNull flags and thus convenient for resolution of the 
final data type.
+   */
+  def areInputTypesForMergingEqual: Boolean = {
+inputTypesForMerging.lengthCompare(1) <= 0 || 
inputTypesForMerging.sliding(2, 1).forall {
+  case Seq(dt1, dt2) => dt1.sameType(dt2)
+}
+  }
+
+  private def mergeTwoDataTypes(dt1: DataType, dt2: DataType): DataType = 
(dt1, dt2) match {
+case (t1, t2) if t1 == t2 => t1
+case (ArrayType(et1, cn1), ArrayType(et2, cn2)) =>
--- End diff --

That sounds like a good idea! Some parts like ```fields1.length == 
fields2.length``` and ```resolver(field1.name, field2.name)``` seem to be extra 
in this context, but it could work.

@ueshin WDYT about making ```findTypeForComplex``` public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r200852015
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -695,6 +695,56 @@ abstract class TernaryExpression extends Expression {
   }
 }
 
+/**
+ * A trait resolving nullable, containsNull, valueContainsNull flags of 
the output date type.
+ * This logic is usually utilized by expressions combining data from 
multiple child expressions
+ * of non-primitive types (e.g. [[CaseWhen]]).
+ */
+trait NonPrimitiveTypeMergingExpression extends Expression
--- End diff --

My previous comment seems to be irrelevant since ```complex``` has already 
been used (```findTypeForComplex```). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21687: [SPARK-24165][SQL] Fixing conditional expressions to han...

2018-07-10 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21687
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional...

2018-07-11 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/21747

[SPARK-24165][SQL][branch-2.3] Fixing conditional expressions to handle 
nullability of nested types

## What changes were proposed in this pull request?
This PR is proposing a fix for the output data type of ```If``` and 
```CaseWhen``` expression. Upon till now, the implementation of exprassions has 
ignored nullability of nested types from different execution branches and 
returned the type of the first branch.

This could lead to an unwanted ```NullPointerException``` from other 
expressions depending on a ```If```/```CaseWhen``` expression.

Example:
```
val rows = new util.ArrayList[Row]()
rows.add(Row(true, ("a", 1)))
rows.add(Row(false, (null, 2)))
val schema = StructType(Seq(
  StructField("cond", BooleanType, false),
  StructField("s", StructType(Seq(
StructField("val1", StringType, true),
StructField("val2", IntegerType, false)
  )), false)
))

val df = spark.createDataFrame(rows, schema)

df
  .select(when('cond, struct(lit("x").as("val1"), 
lit(10).as("val2"))).otherwise('s) as "res")
  .select('res.getField("val1"))
  .show()
```
Exception:
```
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:109)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)
at 
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
at 
org.apache.spark.sql.execution.LocalTableScanExec$$anonfun$unsafeRows$1.apply(LocalTableScanExec.scala:44)
...
```
Output schema:
```
root
 |-- res.val1: string (nullable = false)
```

## How was this patch tested?
New test cases added into
- DataFrameSuite.scala
- conditionalExpressions.scala

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-24165-branch-2.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21747.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21747


commit a2fe63e1d48b0291feaa9fcd008654da051d1f1b
Author: Marek Novotny 
Date:   2018-07-11T04:21:03Z

[SPARK-24165][SQL][branch-2.3] Fixing conditional expressions to handle 
nullability of nested types




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional expres...

2018-07-11 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21747
  
cc @cloud-fan @ueshin @viirya @maropu 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional expres...

2018-07-11 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21747
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-10 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22017
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209514957
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,186 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (rightKeyType, rightValueType, 
rightValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val keyType =
+TypeCoercion.findTightestCommonType(leftKeyType, 
rightKeyType).getOrElse(NullType)
--- End diff --

IMHO, if ```checkInputDataTypes``` was executed before ```bind```, 
```findTightestCommonType``` could play the same role. But yeah, 
```findCommonTypeDifferentOnlyInNullFlags``` will be semantically more 
accurate. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209515431
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -231,6 +231,15 @@ object TypeCoercion {
   })
   }
 
+  /**
+   * Similar to [[findTightestCommonType]] but with string promotion.
+   */
+  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): 
Option[DataType] = {
--- End diff --

Thanks for both your PRs! I will submit changes once they get in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209533017
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,186 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (rightKeyType, rightValueType, 
rightValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val keyType =
+TypeCoercion.findTightestCommonType(leftKeyType, 
rightKeyType).getOrElse(NullType)
--- End diff --

Oh, I see. We also need to check the output data type of lambda functions 
for the expressions like ```ArrayFilter```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22075#discussion_r209643649
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -422,45 +425,49 @@ case class ArrayExists(
   """,
   since = "2.4.0")
 case class ArrayAggregate(
-input: Expression,
+argument: Expression,
 zero: Expression,
 merge: Expression,
 finish: Expression)
   extends HigherOrderFunction with CodegenFallback {
 
-  def this(input: Expression, zero: Expression, merge: Expression) = {
-this(input, zero, merge, LambdaFunction.identity)
+  def this(argument: Expression, zero: Expression, merge: Expression) = {
+this(argument, zero, merge, LambdaFunction.identity)
   }
 
-  override def inputs: Seq[Expression] = input :: zero :: Nil
+  override def arguments: Seq[Expression] = argument :: zero :: Nil
+
+  override def argumentTypes: Seq[AbstractDataType] = ArrayType :: 
AnyDataType :: Nil
 
   override def functions: Seq[Expression] = merge :: finish :: Nil
 
-  override def nullable: Boolean = input.nullable || finish.nullable
+  override def functionTypes: Seq[AbstractDataType] = zero.dataType :: 
AnyDataType :: Nil
+
+  override def nullable: Boolean = argument.nullable || finish.nullable
 
   override def dataType: DataType = finish.dataType
 
   override def checkInputDataTypes(): TypeCheckResult = {
-if (!ArrayType.acceptsType(input.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-s"argument 1 requires ${ArrayType.simpleString} type, " +
-  s"however, '${input.sql}' is of ${input.dataType.catalogString} 
type.")
-} else if (!DataType.equalsStructurally(
-zero.dataType, merge.dataType, ignoreNullability = true)) {
-  TypeCheckResult.TypeCheckFailure(
-s"argument 3 requires ${zero.dataType.simpleString} type, " +
-  s"however, '${merge.sql}' is of ${merge.dataType.catalogString} 
type.")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+checkArgumentDataTypes() match {
--- End diff --

just a quick question: Isn't calling of ```checkArgumentDataTypes``` extra 
here if ```checkArgumentDataTypes``` is called  as such before 
```checkInputDataTypes```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-14 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209876913
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -496,3 +496,194 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  def functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(leftKeyType, leftValueType, 
leftValueContainsNull) = left.dataType
+
+  @transient lazy val MapType(rightKeyType, rightValueType, 
rightValueContainsNull) = right.dataType
+
+  @transient lazy val keyType =
+TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, 
rightKeyType).get
--- End diff --

If ```leftKeyType``` is ```ArrayType(IntegerType, false)``` and 
```rightKeyType``` is ```ArrayType(IntegerType, true)``` for instance, the 
coercion rule is not executed ```leftKeyType.sameType(rightKeyType) == true```.

An array with nulls seems to be a valid key.:
```
scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show()
+---+
|map(array(1, 2, CAST(NULL AS INT)), 12)|
+---+
|[[1, 2,] -> 12]|
+---+
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-14 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r209920407
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
--- End diff --

This comment is not valid anymore. The method has been removed by 
[#22075](https://github.com/apache/spark/pull/22075).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/22110

[SPARK-25122][SQL] Deduplication of supports equals code

## What changes were proposed in this pull request?

The method ```*supportEquals``` determining whether elements of a data type 
could be used as items in a hash set or as keys in a hash map is duplicated 
across multiple collection and higher-order functions.

This PR suggests to deduplicate the method.

## How was this patch tested?

Run tests in:
- DataFrameFunctionsSuite
- CollectionExpressionsSuite
- HigherOrderExpressionsSuite


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-25122

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22110


commit dd292e8cf3ed1788793e626da3a136e9acb9d81c
Author: Marek Novotny 
Date:   2018-08-15T08:18:05Z

[SPARK-25122][SQL] Deduplication of supports equals code




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22110: [SPARK-25122][SQL] Deduplication of supports equals code

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22110
  
cc @ueshin @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210212586
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala 
---
@@ -115,6 +115,8 @@ protected[sql] abstract class AtomicType extends 
DataType {
   private[sql] type InternalType
   private[sql] val tag: TypeTag[InternalType]
   private[sql] val ordering: Ordering[InternalType]
+
+  private[spark] override def supportsEquals: Boolean = true
--- End diff --

Not all of the expressions utilize ```OpenHashSet``` or ```OpenHashMap```. 
What about ```TypeUtils``` that contains methods like 
```getInterpretedOrdering```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210350632
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
--- End diff --

I'm open to any changes :) But if you want to explicitly mention the 
```equals``` method, I would also mention ```hashCode``` generally needed for 
usage in "hash" collections. But then this not 100% true for Spark's 
specialized ```OpenHashSets``` and ```OpenHashMaps``` since they calculate hash 
by themselves. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210357474
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
--- End diff --

What about changing the name of the method to ```typeCanBeHashed```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r210368929
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -497,6 +497,62 @@ case class ArrayAggregate(
   override def prettyName: String = "aggregate"
 }
 
+/**
+ * Transform Keys for every entry of the map by applying the 
transform_keys function.
+ * Returns map with transformed key entries
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+argument: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = argument.nullable
+
+  @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
+
+  override def dataType: DataType = {
+MapType(function.dataType, valueType, valueContainsNull)
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): TransformKeys = {
+copy(function = f(function, (keyType, false) :: (valueType, 
valueContainsNull) :: Nil))
+  }
+
+  @transient lazy val LambdaFunction(
+  _, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+
+
+  override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): 
Any = {
+val map = argumentValue.asInstanceOf[MapData]
+val f = functionForEval
--- End diff --

Can't we use ```functionForEval``` directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r210366383
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -497,6 +497,62 @@ case class ArrayAggregate(
   override def prettyName: String = "aggregate"
 }
 
+/**
+ * Transform Keys for every entry of the map by applying the 
transform_keys function.
+ * Returns map with transformed key entries
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+argument: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = argument.nullable
+
+  @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
+
+  override def dataType: DataType = {
+MapType(function.dataType, valueType, valueContainsNull)
--- End diff --

nit: just in one line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r210373675
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -2302,6 +2302,97 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 assert(ex5.getMessage.contains("function map_zip_with does not support 
ordering on type map"))
   }
 
+  test("transform keys function - test various primitive data types 
combinations") {
+val dfExample1 = Seq(
+  Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7)
+).toDF("i")
+
+val dfExample2 = Seq(
+  Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0)
+).toDF("j")
+
+val dfExample3 = Seq(
+  Map[Int, Boolean](25 -> true, 26 -> false)
+).toDF("x")
+
+val dfExample4 = Seq(
+  Map[Array[Int], Boolean](Array(1, 2) -> false)
+).toDF("y")
+
+
+def testMapOfPrimitiveTypesCombination(): Unit = {
+  checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + 
v)"),
+Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(j, " +
+"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 
'three'))[k])"),
+Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> 
CAST(v * 2 AS BIGINT) + k)"),
+Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> k + 
v)"),
+Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) ->  k % 
2 = 0 OR v)"),
+Seq(Row(Map(true -> true, true -> false
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(y, (k, v) -> 
array_contains(k, 3) AND v)"),
+Seq(Row(Map(false -> false
+}
+// Test with local relation, the Project will be evaluated without 
codegen
+testMapOfPrimitiveTypesCombination()
+dfExample1.cache()
+dfExample2.cache()
+dfExample3.cache()
+dfExample4.cache()
+// Test with cached relation, the Project will be evaluated with 
codegen
+testMapOfPrimitiveTypesCombination()
+  }
+
+  test("transform keys function - Invalid lambda functions and 
exceptions") {
+val dfExample1 = Seq(
+  Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7)
+).toDF("i")
+
+val dfExample2 = Seq(
+  Map[String, String]("a" -> "b")
+).toDF("j")
+
+val dfExample3 = Seq(
+  Map[String, String]("a" -> null)
+).toDF("x")
+
+def testInvalidLambdaFunctions(): Unit = {
+  val ex1 = intercept[AnalysisException] {
+dfExample1.selectExpr("transform_keys(i, k -> k )")
+  }
+  assert(ex1.getMessage.contains("The number of lambda function 
arguments '1' does not match"))
+
+  val ex2 = intercept[AnalysisException] {
+dfExample2.selectExpr("transform_keys(j, (k, v, x) -> k + 1)")
+  }
+  assert(ex2.getMessage.contains(
+  "The number of lambda function arguments '3' does not match"))
+
+  val ex3 = intercept[RuntimeException] {
+dfExample3.selectExpr("transform_keys(x, (k, v) -> v)").show()
+  }
+  assert(ex3.getMessage.contains("Cannot use null as map key!"))
+}
+
+testInvalidLambdaFunctions()
+dfExample1.cache()
+dfExample2.cache()
+testInvalidLambdaFunctions()
--- End diff --

@ueshin I would like to ask you a generic question regarding higher-order 
functions. Is it necessary to perform checks with codegen paths if all the 
newly added functions extends from ```CodegenFallback```? Eventually, is there 
a plan to add coden for these functions in future? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210493260
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
+   * of a hash map.
+   */
+  def typeCanBeHashed(dataType: DataType): Boolean = dataType match {
--- End diff --

I will change it :)

Just one question to ```hashCode```. If ```case classes``` are used, 
```equals``` and ```hashCode``` are generated by compiler. But if we define 
```equals``` manually, shouldn't also hold ```a.equals(b) == true``` => 
```a.hashCode == b.hashCode```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-16 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r210561102
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -497,6 +497,53 @@ case class ArrayAggregate(
   override def prettyName: String = "aggregate"
 }
 
+/**
+ * Returns a map that applies the function to each value of the map.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> v + 
1);
+map(array(1, 2, 3), array(2, 3, 4))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+  since = "2.4.0")
+case class TransformValues(
+argument: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = argument.nullable
+
+  @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
valueContainsNull)
--- End diff --

Shouldn't the ```dataType``` be defined as ```MapType(keyType, 
function.dataType, function.nullable)```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-16 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210563516
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
+   * of a hash map.
+   */
+  def typeCanBeHashed(dataType: DataType): Boolean = dataType match {
--- End diff --

Ok, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22126: [SPARK-23938][SQL][FOLLOW-UP][TEST] Nullabilities...

2018-08-16 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22126#discussion_r210724650
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
 ---
@@ -363,9 +363,9 @@ class HigherOrderFunctionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 left: Expression,
 right: Expression,
 f: (Expression, Expression, Expression) => Expression): Expression 
= {
-  val MapType(kt, vt1, vcn1) = left.dataType.asInstanceOf[MapType]
-  val MapType(_, vt2, vcn2) = right.dataType.asInstanceOf[MapType]
-  MapZipWith(left, right, createLambda(kt, false, vt1, vcn1, vt2, 
vcn2, f))
+  val MapType(kt, vt1, _) = left.dataType.asInstanceOf[MapType]
--- End diff --

Optional suggestion: Maybe we could remove```asInstanceOf[MapType]```  here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22131: [SPARK-25141][SQL][TEST] Modify tests for higher-order f...

2018-08-17 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22131
  
LGTM too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22243: [MINOR] Avoid code duplication for nullable in Hi...

2018-08-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22243#discussion_r213022884
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -155,6 +155,8 @@ trait HigherOrderFunction extends Expression with 
ExpectsInputTypes {
  */
 trait SimpleHigherOrderFunction extends HigherOrderFunction  {
 
+  override def nullable: Boolean = argument.nullable
--- End diff --

If we moved the definition of ```nullable``` straight to 
```HigherOrderFunction``` as ```arguments.exists(_.nullable)```, we could also 
avoid the duplicities in ```ZipWith``` and ```MapZipWith```. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3