[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-08-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21103


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206398377
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,285 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+if (elementTypeSupportEquals) {
+  val ptName = CodeGenerator.primitiveTypeName(elementType)
+  val unsafeArray = ctx.freshName("unsafeArray")
+  val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+   getter, setter, javaTypeName, primitiveTypeName, 
arrayDataBuilder) =
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
--- End diff --

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206397708
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,285 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+if (elementTypeSupportEquals) {
+  val ptName = CodeGenerator.primitiveTypeName(elementType)
+  val unsafeArray = ctx.freshName("unsafeArray")
+  val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+   getter, setter, javaTypeName, primitiveTypeName, 
arrayDataBuilder) =
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
--- End diff --

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206228544
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression)
 }
 
 /**
- * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
ArrayExcept.
+ * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
[[ArrayExcept]].
  */
 abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
--- End diff --

I'm not sure to what level Presto is considered as a reference. Just FYI 
these operations in Presto can accept arrays of different types.
```
presto:default> SELECT array_except(ARRAY[5, 1, 7], ARRAY[7.0, 1.0, 3.0]);
 _col0 
---
 [5.0] 
(1 row)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206008046
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", "Int", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206004228
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206002948
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", "Int", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206001326
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206000549
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", "Int", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r20525
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205999733
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205999688
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", "Int", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205999519
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205999325
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,267 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  arrayBuffer += null
+  notFoundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (!hs.contains(elem)) {
+  arrayBuffer += elem
+  hs.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var scannedNullElements = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (elem1 == null) {
+if (!scannedNullElements) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  scannedNullElements = true
+} else {
+  found = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  val elem2 = array2.get(j, elementType)
+  if (elem2 != null) {
+found = ordering.equiv(elem1, elem2)
+  }
+  j += 1
+}
+if (!found) {
+  // check whether elem1 is already stored in arrayBuffer
+  var k = 0
+  while (!found && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+found = (va != null) && ordering.equiv(va, elem1)
+k += 1
+  }
+}
+  }
+  if (!found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+  }
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalExcept(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val pos = ctx.freshName("pos")
+val value = ctx.freshName("value")
+val hsValue = ctx.freshName("hsValue")
+val size = ctx.freshName("size")
+val (postFix, openHashElementType, hsJavaTypeName, genHsValue,
+ getter, setter, javaTypeName, arrayBuilder) =
+  if (elementTypeSupportEquals) {
+elementType match {
+  case BooleanType | ByteType | ShortType | IntegerType =>
+val ptName = CodeGenerator.primitiveTypeName(elementType)
+val unsafeArray = ctx.freshName("unsafeArray")
+("$mcI$sp", 

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205967625
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,242 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
--- End diff --

I would be strongly in favor of just using `MemoryBlock` for binary types, 
or something similar.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205930539
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,242 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
--- End diff --

Yeah, to use `byte[]` for binary type looks awkward and inefficient. It 
would be good to introduce a new class for binary type like `UTF8String`.

On the other hand, it is not a task for Spark 2.4 due to a lot of changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205929135
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,242 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
--- End diff --

This makes me think that it's a bad idea to make `byte[]` the internal data 
class for binary type. IIRC we have a lot of places to work around the equality 
issue of `byte[]`, and introduce extra copy when reading binary type column 
from off-heap. We should consider something like `UTF8String` for binary type.

This is not related to this PR, but is something we can do in Spark 3.0. cc 
@rxin @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205928966
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
--- End diff --

My motivation is to let these array functions to go through the type 
coercion rules for `ComplexTypeMergingExpression`. This check is like `assert` 
and I don't need we need it if we override `dataType`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205928882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression)
 }
 
 /**
- * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
ArrayExcept.
+ * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
[[ArrayExcept]].
  */
 abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
--- End diff --

I think the question is, shall we apply type coercion rules to these array 
functions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205928822
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression)
 }
 
 /**
- * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
ArrayExcept.
+ * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
[[ArrayExcept]].
  */
 abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
--- End diff --

shall we extend `ComplexTypeMergingExpression` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205847180
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205848094
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
--- End diff --

Why do we need this check (see the question 
[here](https://github.com/apache/spark/pull/21103#discussion_r205806876))?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205806876
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

Maybe I'm missing something, but why we need to apply these checks if there 
won't be any ```null``` flag merging performed?

If ```left.dataType``` and ```right.dataType``` are different, will be 
casted according to the ```ImplicitTypeCasts``` coercion rule. If they differ 
only in ```null``` flags, ```left.dataType``` could be directly returned since 
there won't be any array elements from ```right``` present in the result.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205733831
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

we can overwrite `dataType` while still extending 
`ComplexTypeMergingExpression` to use the checks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205685897
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205677242
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
--- End diff --

Let me try support float/double in `OpenHashSet`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205674364
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

The check at `dataType` in `ComplexTypeMergingExpression` should be useful.
Based on [the 
discussion](https://github.com/apache/spark/pull/21103#discussion_r203746577), 
the result of `dataType should be only `left.dataType`. Will we use only the 
checks?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205674062
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
--- End diff --

Ah, if we usually use the generated code, we will optimize them only in the 
generated code. In the interpreted path, we leave only the generic case as a 
backup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205672980
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
--- End diff --

Yeah, since the generated code also uses this path, we can optimize 
byte/short/boolean. However, it is not easy to optimize float and double with 
`OpenHashSet`. Let me look for hashset than can store primitive float/double.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205668821
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
--- End diff --

if they all worth to optimize, I feel we should not optimize it in the 
interpreted code path and leave it to codegen path. That's a strong reason to 
add codegen support.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205668541
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
--- End diff --

why we only optimize for int and long? how about byte, short, float, double?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205667881
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

does this need to extend `ComplexTypeMergingExpression`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205663032
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,230 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  val exceptEquals: (ArrayData, ArrayData) => ArrayData = {
--- End diff --

Ah, you are right. Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205650766
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,230 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  val exceptEquals: (ArrayData, ArrayData) => ArrayData = {
--- End diff --

Btw, `@transient lazy val`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205649948
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,230 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  val exceptEquals: (ArrayData, ArrayData) => ArrayData = {
--- End diff --

Maybe:

```scala
val exceptEquals: (ArrayData, ArrayData) => ArrayData = {
  if (elementTypeSupportEquals) {
elementType match {
  case IntegerType =>
(array1, array2) =>
  ...
  case LongType =>
(array1, array2) =>
  ...
  ...
  else {
(array1, array2) =>
  ...
  }
}
```

to avoid the per-row pattern match.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205495042
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+  

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205488729
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+   

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205486499
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+   

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205484762
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
--- End diff --

can we avoid this per-row pattern match by using a function object?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205395435
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205397226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205397802
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
--- End diff --

WDYT about using an ```Option``` instead of ```null``` value?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205392257
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-25 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205310335
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+  

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205186167
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+  

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205186241
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+  

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-25 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205186075
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.LONG_ARRAY_OFFSET, elements, LongType.defaultSize)
+  }
+  

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r204613831
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,331 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType =
+ArrayType(elementType, 
left.dataType.asInstanceOf[ArrayType].containsNull)
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var exceptNullElement = true
--- End diff --

Ah, I mean `nullElement` for Array`except`. I will rename this to a better 
name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r204307100
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,331 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType =
+ArrayType(elementType, 
left.dataType.asInstanceOf[ArrayType].containsNull)
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var exceptNullElement = true
--- End diff --

The meaning is opposite?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-23 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r204306424
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,331 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType =
+ArrayType(elementType, 
left.dataType.asInstanceOf[ArrayType].containsNull)
--- End diff --

`left.dataType`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-21 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r204218477
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
--- End diff --

I see. It makes sense.
I added a test for this into `DataFrameFunctionsSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-21 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r204214620
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
--- End diff --

Yeah, good catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-20 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203964623
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
--- End diff --

Yeah this case is valid. But ```containsNull``` flag is defined for the 
whole column (accross multiple rows). Since this flag could cause removal of 
null safe check in expressions that will use ```ArrayExcept``` as a child, it 
could lead to failures with ```NullPointerException``` for the cases as in the 
second row of the example dataset. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203960424
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
--- End diff --

How about the case of left=[1, null], right[2,null], and result=[1]?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203746577
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
--- End diff --

What about ```ArrayType(elementType, 
left.dataType.asInstanceOf[ArrayType].containsNull)```?

Example df:

left | right | result
--- | --- | ---
[1,2] | [1, null] | [2]
[1, null, 3] | [1,2] | **[null, 3]**


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203741325
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
--- End diff --

just "Examples:"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-04-18 Thread kiszk
GitHub user kiszk opened a pull request:

https://github.com/apache/spark/pull/21103

[SPARK-23915][SQL] Add array_except function

## What changes were proposed in this pull request?

The PR adds the SQL function `array_except`. The behavior of the function 
is based on Presto's one.

This function returns returns an array of the elements in array1 but not in 
array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kiszk/spark SPARK-23915

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21103.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21103


commit ce18ce0cd4da3378bb9f83237b7f9165737e7760
Author: Kazuaki Ishizaki 
Date:   2018-04-18T18:03:55Z

initial commit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org