[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r223460909 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,248 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. --- End diff -- It sounds like our null handling is incorrect. NULL does not equal to NULL. ``` SELECT array_intersect(ARRAY(NULL), ARRAY(NULL)); ``` This should return an empty set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207967923 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,248 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +ArrayType(elementType, + left.dataType.asInstanceOf[ArrayType].containsNull && +right.dataType.asInstanceOf[ArrayType].containsNull) + } + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +if (array1.numElements() != 0 && array2.numElements() != 0) { + val hs = new OpenHashSet[Any] + val hsResult = new OpenHashSet[Any] + var foundNullElement = false + var i = 0 + while (i < array2.numElements()) { +if (array2.isNullAt(i)) { + foundNullElement = true +} else { + val elem = array2.get(i, elementType) + hs.add(elem) +} +i += 1 + } + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + i = 0 + while (i < array1.numElements()) { +if (array1.isNullAt(i)) { + if (foundNullElement) { +arrayBuffer += null +foundNullElement = false + } +} else { + val elem = array1.get(i, elementType) + if (hs.contains(elem) && !hsResult.contains(elem)) { +arrayBuffer += elem +hsResult.add(elem) + } +} +i += 1 + } + new GenericArrayData(arrayBuffer) +} else { + new GenericArrayData(Array.emptyObjectArray) +} +} else { + (array1, array2) => +if (array1.numElements() != 0 && array2.numElements() != 0) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadySeenNull = false + var i = 0 + while (i < array1.numElements()) { +var found = false +val elem1 = array1.get(i, elementType) +if (array1.isNullAt(i)) { + if (!alreadySeenNull) { +var j = 0 +while (!found && j < array2.numElements()) { + found = array2.isNullAt(j) + j += 1 +} +// array2 is scanned only once for null element +alreadySeenNull = true + } +} else { + var j = 0 + while (!found && j < array2.numElements()) { +if (!array2.isNullAt(j)) { + val elem2 = array2.get(j, elementType) + if (ordering.equiv(elem1, elem2)) { +// check whether elem1 is already stored in arrayBuffer +var foundArrayBuffer = false +var k = 0 +while (!foundArrayBuffer && k < arrayBuffer.size) { + val va = arrayBuffer(k) + foundArrayBuffer = (va != null) && ordering.equiv(va, elem1) + k += 1 +} +found = !foundArrayBuffer + } +} +j += 1 + } +} +if (found) { + arrayBuffer += elem1 +} +i += 1 + } + new GenericArrayData(arrayBuffer) +} else { + new GenericArrayData(Array.emptyObjectArray) +} +} + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalIntersect(array1, array2) + } + + override def doGenCode(ctx: CodegenContext, ev
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207945648 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,248 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +ArrayType(elementType, + left.dataType.asInstanceOf[ArrayType].containsNull && +right.dataType.asInstanceOf[ArrayType].containsNull) + } + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +if (array1.numElements() != 0 && array2.numElements() != 0) { + val hs = new OpenHashSet[Any] + val hsResult = new OpenHashSet[Any] + var foundNullElement = false + var i = 0 + while (i < array2.numElements()) { +if (array2.isNullAt(i)) { + foundNullElement = true +} else { + val elem = array2.get(i, elementType) + hs.add(elem) +} +i += 1 + } + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + i = 0 + while (i < array1.numElements()) { +if (array1.isNullAt(i)) { + if (foundNullElement) { +arrayBuffer += null +foundNullElement = false + } +} else { + val elem = array1.get(i, elementType) + if (hs.contains(elem) && !hsResult.contains(elem)) { +arrayBuffer += elem +hsResult.add(elem) + } +} +i += 1 + } + new GenericArrayData(arrayBuffer) +} else { + new GenericArrayData(Array.emptyObjectArray) +} +} else { + (array1, array2) => +if (array1.numElements() != 0 && array2.numElements() != 0) { + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + var alreadySeenNull = false + var i = 0 + while (i < array1.numElements()) { +var found = false +val elem1 = array1.get(i, elementType) +if (array1.isNullAt(i)) { + if (!alreadySeenNull) { +var j = 0 +while (!found && j < array2.numElements()) { + found = array2.isNullAt(j) + j += 1 +} +// array2 is scanned only once for null element +alreadySeenNull = true + } +} else { + var j = 0 + while (!found && j < array2.numElements()) { +if (!array2.isNullAt(j)) { + val elem2 = array2.get(j, elementType) + if (ordering.equiv(elem1, elem2)) { +// check whether elem1 is already stored in arrayBuffer +var foundArrayBuffer = false +var k = 0 +while (!foundArrayBuffer && k < arrayBuffer.size) { + val va = arrayBuffer(k) + foundArrayBuffer = (va != null) && ordering.equiv(va, elem1) + k += 1 +} +found = !foundArrayBuffer + } +} +j += 1 + } +} +if (found) { + arrayBuffer += elem1 +} +i += 1 + } + new GenericArrayData(arrayBuffer) +} else { + new GenericArrayData(Array.emptyObjectArray) +} +} + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalIntersect(array1, array2) + } + + override def doGenCode(ctx: CodegenContext
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21102 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207781744 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,248 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +ArrayType(elementType, + left.dataType.asInstanceOf[ArrayType].containsNull && +right.dataType.asInstanceOf[ArrayType].containsNull) + } + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +if (array1.numElements() != 0 && array2.numElements() != 0) { + val hs = new OpenHashSet[Any] + val hsResult = new OpenHashSet[Any] + var foundNullElement = false + var i = 0 + while (i < array2.numElements()) { +if (array2.isNullAt(i)) { + foundNullElement = true +} else { + val elem = array2.get(i, elementType) + hs.add(elem) +} +i += 1 + } + val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] + i = 0 + while (i < array1.numElements()) { +if (array1.isNullAt(i)) { + if (foundNullElement) { +arrayBuffer += null +foundNullElement = false + } +} else { + val elem = array1.get(i, elementType) + if (hs.contains(elem) && !hsResult.contains(elem)) { +arrayBuffer += elem +hsResult.add(elem) + } +} +i += 1 + } + new GenericArrayData(arrayBuffer) +} else { + new GenericArrayData(Seq.empty) --- End diff -- nit: `Array.empty` or `Array.emptyObjectArray`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207767226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,242 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +ArrayType(elementType, + left.dataType.asInstanceOf[ArrayType].containsNull && +right.dataType.asInstanceOf[ArrayType].containsNull) + } + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] +val hsResult = new OpenHashSet[Any] +var foundNullElement = false +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +foundNullElement = true + } else { +val elem = array2.get(i, elementType) +hs.add(elem) + } + i += 1 +} +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (foundNullElement) { + arrayBuffer += null + foundNullElement = false +} + } else { +val elem = array1.get(i, elementType) +if (hs.contains(elem) && !hsResult.contains(elem)) { + arrayBuffer += elem + hsResult.add(elem) +} + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} else { + (array1, array2) => +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +var alreadySeenNull = false +var i = 0 +while (i < array1.numElements()) { + var found = false + val elem1 = array1.get(i, elementType) + if (array1.isNullAt(i)) { +if (!alreadySeenNull) { + var j = 0 + while (!found && j < array2.numElements()) { +found = array2.isNullAt(j) +j += 1 + } + // array2 is scanned only once for null element + alreadySeenNull = true +} + } else { +var j = 0 +while (!found && j < array2.numElements()) { + if (!array2.isNullAt(j)) { +val elem2 = array2.get(j, elementType) +if (ordering.equiv(elem1, elem2)) { + // check whether elem1 is already stored in arrayBuffer + var foundArrayBuffer = false + var k = 0 + while (!foundArrayBuffer && k < arrayBuffer.size) { +val va = arrayBuffer(k) +foundArrayBuffer = (va != null) && ordering.equiv(va, elem1) +k += 1 + } + found = !foundArrayBuffer +} + } + j += 1 +} + } + if (found) { +arrayBuffer += elem1 + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalIntersect(array1, array2) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val arrayData = classOf[ArrayData].getName +val i = ctx.freshName("i") +val value = ctx.freshName("value") +val size = ctx.freshName("size") +if (canUseSpecializedHashSet) { + val jt = CodeGenerator.javaType(elementType) + val ptName = CodeGenerator.primitiveTypeName(jt) + + nullSafeCodeGen(ctx, ev, (array1, array2) => { +val foundNullElement = ctx.freshName("foundNullEleme
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207766511 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,242 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +ArrayType(elementType, + left.dataType.asInstanceOf[ArrayType].containsNull && +right.dataType.asInstanceOf[ArrayType].containsNull) + } + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] +val hsResult = new OpenHashSet[Any] +var foundNullElement = false +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +foundNullElement = true + } else { +val elem = array2.get(i, elementType) +hs.add(elem) + } + i += 1 +} +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (foundNullElement) { + arrayBuffer += null + foundNullElement = false +} + } else { +val elem = array1.get(i, elementType) +if (hs.contains(elem) && !hsResult.contains(elem)) { + arrayBuffer += elem + hsResult.add(elem) +} + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} else { + (array1, array2) => +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +var alreadySeenNull = false +var i = 0 +while (i < array1.numElements()) { + var found = false + val elem1 = array1.get(i, elementType) + if (array1.isNullAt(i)) { +if (!alreadySeenNull) { + var j = 0 + while (!found && j < array2.numElements()) { +found = array2.isNullAt(j) +j += 1 + } + // array2 is scanned only once for null element + alreadySeenNull = true +} + } else { +var j = 0 +while (!found && j < array2.numElements()) { + if (!array2.isNullAt(j)) { +val elem2 = array2.get(j, elementType) +if (ordering.equiv(elem1, elem2)) { + // check whether elem1 is already stored in arrayBuffer + var foundArrayBuffer = false + var k = 0 + while (!foundArrayBuffer && k < arrayBuffer.size) { +val va = arrayBuffer(k) +foundArrayBuffer = (va != null) && ordering.equiv(va, elem1) +k += 1 + } + found = !foundArrayBuffer +} + } + j += 1 +} + } + if (found) { +arrayBuffer += elem1 + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalIntersect(array1, array2) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val arrayData = classOf[ArrayData].getName +val i = ctx.freshName("i") +val value = ctx.freshName("value") +val size = ctx.freshName("size") +if (canUseSpecializedHashSet) { + val jt = CodeGenerator.javaType(elementType) + val ptName = CodeGenerator.primitiveTypeName(jt) + + nullSafeCodeGen(ctx, ev, (array1, array2) => { +val foundNullElement = ctx.freshName("foundNullEleme
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207765490 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3965,6 +4034,242 @@ object ArrayUnion { } } +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike + with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck +ArrayType(elementType, + left.dataType.asInstanceOf[ArrayType].containsNull && +right.dataType.asInstanceOf[ArrayType].containsNull) + } + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] --- End diff -- How about shortcutting to return an empty array when we find one of the two is empty? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207758427 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -1647,6 +1647,60 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(result10.first.schema(0).dataType === expectedType10) } + test("array_intersect functions") { +val df1 = Seq((Array(1, 2, 4), Array(4, 2))).toDF("a", "b") +val ans1 = Row(Seq(2, 4)) +checkAnswer(df1.select(array_intersect($"a", $"b")), ans1) +checkAnswer(df1.selectExpr("array_intersect(a, b)"), ans1) + +val df2 = Seq((Array[Integer](1, 2, null, 4, 5), Array[Integer](-5, 4, null, 2, -1))) + .toDF("a", "b") +val ans2 = Row(Seq(2, null, 4)) +checkAnswer(df2.select(array_intersect($"a", $"b")), ans2) +checkAnswer(df2.selectExpr("array_intersect(a, b)"), ans2) + +val df3 = Seq((Array(1L, 2L, 4L), Array(4L, 2L))).toDF("a", "b") +val ans3 = Row(Seq(2L, 4L)) +checkAnswer(df3.select(array_intersect($"a", $"b")), ans3) +checkAnswer(df3.selectExpr("array_intersect(a, b)"), ans3) + +val df4 = Seq( + (Array[java.lang.Long](1L, 2L, null, 4L, 5L), Array[java.lang.Long](-5L, 4L, null, 2L, -1L))) + .toDF("a", "b") +val ans4 = Row(Seq(2L, null, 4L)) +checkAnswer(df4.select(array_intersect($"a", $"b")), ans4) +checkAnswer(df4.selectExpr("array_intersect(a, b)"), ans4) + +val df5 = Seq((Array("c", null, "a", "f"), Array("b", "a", null, "g"))).toDF("a", "b") +val ans5 = Row(Seq(null, "a")) +checkAnswer(df5.select(array_intersect($"a", $"b")), ans5) +checkAnswer(df5.selectExpr("array_intersect(a, b)"), ans5) + +val df6 = Seq((null, null)).toDF("a", "b") +intercept[AnalysisException] { --- End diff -- Could you also check the error message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207723142 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3801,339 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun --- End diff -- This is back again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205959224 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,234 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = ArrayType(elementType, +left.dataType.asInstanceOf[ArrayType].containsNull && + right.dataType.asInstanceOf[ArrayType].containsNull) + + @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = { +if (elementTypeSupportEquals) { + (array1, array2) => +val hs = new OpenHashSet[Any] +val hsResult = new OpenHashSet[Any] +var foundNullElement = false +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +foundNullElement = true + } else { +val elem = array2.get(i, elementType) +hs.add(elem) + } + i += 1 +} +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (foundNullElement) { + arrayBuffer += null + foundNullElement = false +} + } else { +val elem = array1.get(i, elementType) +if (hs.contains(elem) && !hsResult.contains(elem)) { + arrayBuffer += elem + hsResult.add(elem) +} + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} else { + (array1, array2) => +val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] +var alreadySeenNull = false +var i = 0 +while (i < array1.numElements()) { + var found = false + val elem1 = array1.get(i, elementType) + if (array1.isNullAt(i)) { +if (!alreadySeenNull) { + var j = 0 + while (!found && j < array2.numElements()) { +found = array2.isNullAt(j) +j += 1 + } + // array2 is scanned only once for null element + alreadySeenNull = true +} + } else { +var j = 0 +while (!found && j < array2.numElements()) { + if (!array2.isNullAt(j)) { +val elem2 = array2.get(j, elementType) +if (ordering.equiv(elem1, elem2)) { + // check whether elem1 is already stored in arrayBuffer + var foundArrayBuffer = false + var k = 0 + while (!foundArrayBuffer && k < arrayBuffer.size) { +val va = arrayBuffer(k) +foundArrayBuffer = (va != null) && ordering.equiv(va, elem1) +k += 1 + } + found = !foundArrayBuffer +} + } + j += 1 +} + } + if (found) { +arrayBuffer += elem1 + } + i += 1 +} +new GenericArrayData(arrayBuffer) +} + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +evalIntersect(array1, array2) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val arrayData = classOf[ArrayData].getName +val i = ctx.freshName("i") --- End diff -- It would be good to refactor as a method from L4077 to L4124 since this part can be used among `union`, `except`, and `intersect`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205930801 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -272,7 +272,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( private def nextPowerOf2(n: Int): Int = { if (n == 0) { - 1 + 2 --- End diff -- Oh, good catch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205930794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3801,339 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = ArrayType(elementType, +left.dataType.asInstanceOf[ArrayType].containsNull && + right.dataType.asInstanceOf[ArrayType].containsNull) + + var hsInt: OpenHashSet[Int] = _ + var hsResultInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + var hsResultLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (hsInt.contains(elem) && !hsResultInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsResultInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (hsLong.contains(elem) && !hsResultLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsResultLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + initFoundNullElement: Boolean, + isLongType: Boolean): (Int, Boolean) = { +// store elements into resultArray +var i = 0 +var foundNullElement = initFoundNullElement +if (resultArray == null) { + // hsInt or hsLong is updated only once since it is not changed + while (i < array1.numElements()) { --- End diff -- You are right, fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205853523 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -272,7 +272,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( private def nextPowerOf2(n: Int): Int = { if (n == 0) { - 1 + 2 --- End diff -- Good catch, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205341581 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3801,339 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = ArrayType(elementType, +left.dataType.asInstanceOf[ArrayType].containsNull && + right.dataType.asInstanceOf[ArrayType].containsNull) + + var hsInt: OpenHashSet[Int] = _ + var hsResultInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + var hsResultLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (hsInt.contains(elem) && !hsResultInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsResultInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (hsLong.contains(elem) && !hsResultLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsResultLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + initFoundNullElement: Boolean, + isLongType: Boolean): (Int, Boolean) = { +// store elements into resultArray +var i = 0 +var foundNullElement = initFoundNullElement +if (resultArray == null) { + // hsInt or hsLong is updated only once since it is not changed + while (i < array1.numElements()) { --- End diff -- `array1` and `array2` is opposite if we want to preserve the element order of the left array? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205342201 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3801,339 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(1, 3) + """, + since = "2.4.0") +case class ArrayIntersect(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = ArrayType(elementType, +left.dataType.asInstanceOf[ArrayType].containsNull && + right.dataType.asInstanceOf[ArrayType].containsNull) + + var hsInt: OpenHashSet[Int] = _ + var hsResultInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + var hsResultLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (hsInt.contains(elem) && !hsResultInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsResultInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (hsLong.contains(elem) && !hsResultLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsResultLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + initFoundNullElement: Boolean, + isLongType: Boolean): (Int, Boolean) = { +// store elements into resultArray +var i = 0 +var foundNullElement = initFoundNullElement +if (resultArray == null) { + // hsInt or hsLong is updated only once since it is not changed --- End diff -- I might miss something, but can we do the same thing for `array_except`? It would be good if we can skip traversing the right array. This is not urgent, maybe we can do it in the follow-up pr of `array_except` pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r205327639 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -272,7 +272,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( private def nextPowerOf2(n: Int): Int = { if (n == 0) { - 1 + 2 --- End diff -- Why changed this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r204349890 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3801,339 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun --- End diff -- Just ```Examples:```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203322643 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- For accuracy sake - my example snippet above will fail much earlier - due to OpenHashSet. MAX_CAPACITY. Though that is probably not the point anyway :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203322056 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- There is no explicitly entry here - it is simply unoccupied slots in an array. The slot is free, it can be used by some other (new) entry when insert is called. It must be trivial to see how very bad behavior can happen with actual size of set being very small - with a series of add/remove's : resulting in unending growth of the set. something like this, for example, is enough to cause set to blow to 2B entries: ``` var i = 0 while (i < Int.MaxValue) { set.add(1) set.remove(1) assert (0 == set.size) i += 1 } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203319710 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- When 'remove' is called, '_size' is decremented. But, an entry is not released. This is a motivation to introduce 'occupied'. I will try to use another implementation without 'remove' while it may introduce some overhead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203318288 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _capacity = nextPowerOf2(initialCapacity) protected var _mask = _capacity - 1 protected var _size = 0 + protected var _occupied = 0 protected var _growThreshold = (loadFactor * _capacity).toInt + def g: Int = _growThreshold + def o: Int = _occupied --- End diff -- Oh, sorry for putting this. This is used only for my debugging. This should be removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203315417 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -114,6 +118,21 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( rehashIfNeeded(k, grow, move) } + /** + * Remove an element from the set. If an element does not exists in the set, nothing is done. + */ + def remove(k: T): Unit = { --- End diff -- If we need to keep an order without duplication, we can implement this by inroducing another hashset or searching a result array when we try to add an new element. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203311755 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _capacity = nextPowerOf2(initialCapacity) protected var _mask = _capacity - 1 protected var _size = 0 + protected var _occupied = 0 protected var _growThreshold = (loadFactor * _capacity).toInt + def g: Int = _growThreshold + def o: Int = _occupied protected var _bitset = new BitSet(_capacity) + protected var _bitsetDeleted: BitSet = null --- End diff -- Why protected ? Make it private instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203314045 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( * to a new position (in the new data array). */ def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) { -if (_size > _growThreshold) { +if (_occupied > _growThreshold) { --- End diff -- I dont see any value in _occupied - on contrary it can cause very bad behavior if there is a lot of remove's expected. `_size` is a better metric to decide to rehash and grow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203311109 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( protected var _capacity = nextPowerOf2(initialCapacity) protected var _mask = _capacity - 1 protected var _size = 0 + protected var _occupied = 0 protected var _growThreshold = (loadFactor * _capacity).toInt + def g: Int = _growThreshold + def o: Int = _occupied --- End diff -- Also, please use more descriptive and comprehensible names --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r203299689 --- Diff: core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala --- @@ -114,6 +118,21 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( rehashIfNeeded(k, grow, move) } + /** + * Remove an element from the set. If an element does not exists in the set, nothing is done. + */ + def remove(k: T): Unit = { --- End diff -- Maybe we should not add `remove` method unless we can add it by a simple way. This is used in many places and this might affect their performance. How about using other implementation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r202782954 --- Diff: core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala --- @@ -73,6 +73,46 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(set.contains(50)) assert(set.contains(999)) assert(!set.contains(1)) + +set.add(1132) // Cause hash contention with 999 +assert(set.size === 4) +assert(set.contains(10)) +assert(set.contains(50)) +assert(set.contains(999)) +assert(set.contains(1132)) +assert(!set.contains(1)) + +set.remove(1132) +assert(set.size === 3) +assert(set.contains(10)) +assert(set.contains(50)) +assert(set.contains(999)) +assert(!set.contains(1132)) +assert(!set.contains(1)) + +set.remove(999) --- End diff -- good catch, I addressed this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r202621143 --- Diff: core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala --- @@ -73,6 +73,46 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(set.contains(50)) assert(set.contains(999)) assert(!set.contains(1)) + +set.add(1132) // Cause hash contention with 999 +assert(set.size === 4) +assert(set.contains(10)) +assert(set.contains(50)) +assert(set.contains(999)) +assert(set.contains(1132)) +assert(!set.contains(1)) + +set.remove(1132) +assert(set.size === 3) +assert(set.contains(10)) +assert(set.contains(50)) +assert(set.contains(999)) +assert(!set.contains(1132)) +assert(!set.contains(1)) + +set.remove(999) --- End diff -- What if we remove `999` before `1132`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/21102 [SPARK-23913][SQL] Add array_intersect function ## What changes were proposed in this pull request? The PR adds the SQL function `array_intersect`. The behavior of the function is based on Presto's one. This function returns returns an array of the elements in the intersection of array1 and array2. Note: The order of elements in the result is not defined. ## How was this patch tested? Added UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-23913 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21102.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21102 commit 548a4b804472e062e36308274d1aff8909621131 Author: Kazuaki Ishizaki Date: 2018-04-18T16:01:50Z initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org