[spark] branch master updated (e43b9e8 -> 5a9d4c1)

2021-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e43b9e8  [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a 
struct has many fields
 add 5a9d4c1  [SPARK-36660][SQL][FOLLOW-UP] Add cot to pyspark.sql.rst

No new revisions were added by this update.

Summary of changes:
 python/docs/source/reference/pyspark.sql.rst | 1 +
 1 file changed, 1 insertion(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a struct has many fields

2021-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e43b9e8  [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a 
struct has many fields
e43b9e8 is described below

commit e43b9e8520bd4ea5bc3693beb496893b17e79054
Author: Kousuke Saruta 
AuthorDate: Wed Sep 15 10:33:58 2021 +0900

[SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a struct has many 
fields

### What changes were proposed in this pull request?

This PR fixes a perf issue in `SchemaPruning` when a struct has many fields 
(e.g. >10K fields).
The root cause is `SchemaPruning.sortLeftFieldsByRight` does N * M order 
searching.
```
 val filteredRightFieldNames = rightStruct.fieldNames
.filter(name => leftStruct.fieldNames.exists(resolver(_, name)))
```

To fix this issue, this PR proposes to use `HashMap` to expect a constant 
order searching.
This PR also adds `case _ if left == right => left` to the method as a 
short-circuit code.

### Why are the changes needed?

To fix a perf issue.

### Does this PR introduce _any_ user-facing change?

No. The logic should be identical.

### How was this patch tested?

I confirmed that the following micro benchmark finishes within a few 
seconds.
```
import org.apache.spark.sql.catalyst.expressions.SchemaPruning
import org.apache.spark.sql.types._

var struct1 = new StructType()
(1 to 5).foreach { i =>
  struct1 = struct1.add(new StructField(i + "", IntegerType))
}

var struct2 = new StructType()
(50001 to 10).foreach { i =>
  struct2 = struct2.add(new StructField(i + "", IntegerType))
}

SchemaPruning.sortLeftFieldsByRight(struct1, struct2)
SchemaPruning.sortLeftFieldsByRight(struct2, struct2)
```

The correctness should be checked by existing tests.

Closes #33981 from sarutak/improve-schemapruning-performance.

Authored-by: Kousuke Saruta 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/catalyst/expressions/SchemaPruning.scala   | 32 +++---
 1 file changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
index 9aa2766..2a182b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import java.util.Locale
+
+import scala.collection.immutable.HashMap
+
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.types._
 
@@ -54,6 +58,7 @@ object SchemaPruning extends SQLConfHelper {
*/
   private def sortLeftFieldsByRight(left: DataType, right: DataType): DataType 
=
 (left, right) match {
+  case _ if left == right => left
   case (ArrayType(leftElementType, containsNull), 
ArrayType(rightElementType, _)) =>
 ArrayType(
   sortLeftFieldsByRight(leftElementType, rightElementType),
@@ -65,16 +70,23 @@ object SchemaPruning extends SQLConfHelper {
   sortLeftFieldsByRight(leftValueType, rightValueType),
   containsNull)
   case (leftStruct: StructType, rightStruct: StructType) =>
-val resolver = conf.resolver
-val filteredRightFieldNames = rightStruct.fieldNames
-  .filter(name => leftStruct.fieldNames.exists(resolver(_, name)))
-val sortedLeftFields = filteredRightFieldNames.map { fieldName =>
-  val resolvedLeftStruct = leftStruct.find(p => resolver(p.name, 
fieldName)).get
-  val leftFieldType = resolvedLeftStruct.dataType
-  val rightFieldType = rightStruct(fieldName).dataType
-  val sortedLeftFieldType = sortLeftFieldsByRight(leftFieldType, 
rightFieldType)
-  StructField(fieldName, sortedLeftFieldType, nullable = 
resolvedLeftStruct.nullable,
-metadata = resolvedLeftStruct.metadata)
+val formatFieldName: String => String =
+  if (conf.caseSensitiveAnalysis) identity else 
_.toLowerCase(Locale.ROOT)
+
+val leftStructHashMap =
+  HashMap(leftStruct.map(f => 
formatFieldName(f.name)).zip(leftStruct): _*)
+val sortedLeftFields = rightStruct.fieldNames.flatMap { fieldName =>
+  val formattedFieldName = formatFieldName(fieldName)
+  if (leftStructHashMap.contains(formattedFieldName)) {
+val resolvedLeftStruct = leftStructHashMap(formattedFieldName)
+val leftFieldType = resolvedLeftStruct.dataType
+val rightFieldType = 

[spark] branch master updated (3e5d3d1 -> 0aaf86b)

2021-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 3e5d3d1  [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in 
Jenkins
 add 0aaf86b  [SPARK-36709][PYTHON] Support new syntax for specifying index 
type and name in pandas API on Spark

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/frame.py |  57 +-
 python/pyspark/pandas/series.py|  18 +-
 python/pyspark/pandas/typedef/typehints.py | 289 ++---
 3 files changed, 272 insertions(+), 92 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in Jenkins

2021-09-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3e5d3d1  [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in 
Jenkins
3e5d3d1 is described below

commit 3e5d3d1cfe9c66f9bb729ecd82299c5242b1b01b
Author: Kevin Su 
AuthorDate: Wed Sep 15 09:24:50 2021 +0900

[SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in Jenkins

### What changes were proposed in this pull request?

Upgrade flake8 to 3.8.0 or above in Jenkins

### Why are the changes needed?

In flake8 < 3.8.0, F401 error occurs for imports in if statements when 
TYPE_CHECKING is True. However, TYPE_CHECKING is always False at runtime, so 
there is no need to treat it as an error in static analysis.

Since this behavior is fixed In flake8 >= 3.8.0, we should upgrade the 
flake8 installed in Jenkins to 3.8.0 or above. Otherwise, it occurs F401 error 
for several lines in pandas-on-PySpark that use TYPE_CHECKING

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass the CI

Closes #32749 from pingsutw/SPARK-34943.

Lead-authored-by: Kevin Su 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 dev/lint-python | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/dev/lint-python b/dev/lint-python
index e54e391..72438f7 100755
--- a/dev/lint-python
+++ b/dev/lint-python
@@ -18,7 +18,7 @@
 # define test binaries + versions
 FLAKE8_BUILD="flake8"
 # TODO(SPARK-34943): minimum version should be 3.8+
-MINIMUM_FLAKE8="3.5.0"
+MINIMUM_FLAKE8="3.8.0"
 MINIMUM_MYPY="0.910"
 MYPY_BUILD="mypy"
 PYCODESTYLE_BUILD="pycodestyle"

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml

2021-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 6a1dacb  [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid 
breaking pom.xml
6a1dacb is described below

commit 6a1dacb6b6c4e33ceed942d4b7c2724e6e6b713c
Author: Dongjoon Hyun 
AuthorDate: Tue Sep 14 16:26:50 2021 -0700

[SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml

### What changes were proposed in this pull request?

This PR aims to fix the regex to avoid breaking `pom.xml`.

### Why are the changes needed?

**BEFORE**
```
$ dev/change-scala-version.sh 2.12
$ git diff | head -n10
diff --git a/core/pom.xml b/core/pom.xml
index dbde22f2bf..6ed368353b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
 -35,7 +35,7
   

   
-\)\(\)/\1\2/' 
$f
-  sed_i 's/^\([[:space:]]*\)/\1\)$/\1\)/\1-->\2/' $f
 done
 

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml

2021-09-14 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d730ef2  [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid 
breaking pom.xml
d730ef2 is described below

commit d730ef24fee49b32d9289fd203cbc7eb3b715017
Author: Dongjoon Hyun 
AuthorDate: Tue Sep 14 16:26:50 2021 -0700

[SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml

### What changes were proposed in this pull request?

This PR aims to fix the regex to avoid breaking `pom.xml`.

### Why are the changes needed?

**BEFORE**
```
$ dev/change-scala-version.sh 2.12
$ git diff | head -n10
diff --git a/core/pom.xml b/core/pom.xml
index dbde22f2bf..6ed368353b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
 -35,7 +35,7
   

   
-\)\(\)/\1\2/' 
$f
-  sed_i 's/^\([[:space:]]*\)/\1\)$/\1\)/\1-->\2/' $f
 done
 

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456

2021-09-14 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 119ddd7  [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io 
to 2.11.0 and revert change of SPARK-36456
119ddd7 is described below

commit 119ddd7e9526ed899f88a944babb74af693297f5
Author: yangjie01 
AuthorDate: Tue Sep 14 21:16:58 2021 +0900

[SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and 
revert change of SPARK-36456

### What changes were proposed in this pull request?
SPARK-36456 change to use `JavaUtils. closeQuietly` instead of 
`IOUtils.closeQuietly`, but there is slightly different from the 2 methods in 
default behavior: swallowing IOException is same, but the former logs it as 
ERROR while the latter doesn't log by default.

`Apache commons-io` community decided to retain the `IOUtils.closeQuietly` 
method in the [new 
version](https://github.com/apache/commons-io/blob/75f20dca72656225d0dc8e7c982e40caa9277d42/src/main/java/org/apache/commons/io/IOUtils.java#L465-L467)
 and removed deprecated annotation,  the change has been released in version 
2.11.0.

So the change of this pr is to upgrade `Apache commons-io` to 2.11.0 and 
revert change of SPARK-36456 to maintain original behavior(don't print error 
log).

### Why are the changes needed?

1. Upgrade `Apache commons-io` to 2.11.0 to use non-deprecated 
`closeQuietly` API, other changes related to `Apache commons-io are detailed in 
[commons-io/changes-report](https://commons.apache.org/proper/commons-io/changes-report.html#a2.11.0)

2. Revert change of SPARK-36737 to maintain original `IOUtils.closeQuietly` 
API behavior(don't print error log).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #33977 from LuciferYang/upgrade-commons-io.

Authored-by: yangjie01 
Signed-off-by: Jungtaek Lim 
---
 core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 5 +++--
 .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala  | 5 +++--
 core/src/main/scala/org/apache/spark/util/Utils.scala   | 4 ++--
 .../scala/org/apache/spark/util/logging/RollingFileAppender.scala   | 5 ++---
 core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala   | 3 +--
 core/src/test/scala/org/apache/spark/util/UtilsSuite.scala  | 6 +++---
 dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +-
 pom.xml | 2 +-
 .../org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala  | 4 ++--
 .../org/apache/spark/sql/execution/streaming/StreamMetadata.scala   | 4 ++--
 .../execution/streaming/state/HDFSBackedStateStoreProvider.scala| 4 ++--
 .../spark/sql/execution/streaming/state/RocksDBFileManager.scala| 5 ++---
 12 files changed, 24 insertions(+), 25 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b537060..cbb4e9c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -35,6 +35,7 @@ import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
 import com.google.common.cache.CacheBuilder
+import org.apache.commons.io.IOUtils
 
 import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
@@ -51,7 +52,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle._
 import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper}
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
-import org.apache.spark.network.util.{JavaUtils, TransportConf}
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
@@ -341,7 +342,7 @@ private[spark] class BlockManager(
 false
 }
   } finally {
-JavaUtils.closeQuietly(inputStream)
+IOUtils.closeQuietly(inputStream)
   }
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index b1713ec..eaecf65 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, Queue}
 import scala.util.{Failure, 

[spark] branch branch-3.1 updated: [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan

2021-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 1042481  [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN 
and Float.Nan
1042481 is described below

commit 104248120094e3f2356023744a8efc0df22b0da6
Author: Angerszh 
AuthorDate: Tue Sep 14 18:25:47 2021 +0800

[SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan

### What changes were proposed in this pull request?
For query
```
select array_union(array(cast('nan' as double), cast('nan' as double)), 
array())
```
This returns [NaN, NaN], but it should return [NaN].
This issue is caused by `OpenHashSet` can't handle `Double.NaN` and 
`Float.NaN` too.
In this pr we add a wrap for OpenHashSet that can handle `null`, 
`Double.NaN`, `Float.NaN` together

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
ArrayUnion won't show duplicated `NaN` value

### How was this patch tested?
Added UT

Closes #33955 from AngersZh/SPARK-36702-WrapOpenHashSet.

Lead-authored-by: Angerszh 
Co-authored-by: AngersZh 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f71f37755d581017f549ecc8683fb7afc2852c67)
Signed-off-by: Wenchen Fan 
---
 .../expressions/collectionOperations.scala | 61 +-
 .../org/apache/spark/sql/util/SQLOpenHashSet.scala | 72 ++
 .../expressions/CollectionExpressionsSuite.scala   | 17 +
 3 files changed, 133 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index bb2163c..b829ac0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SQLOpenHashSet
 import org.apache.spark.unsafe.UTF8StringBuilder
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
@@ -3367,24 +3368,31 @@ case class ArrayUnion(left: Expression, right: 
Expression) extends ArrayBinaryLi
 if (TypeUtils.typeWithProperEquals(elementType)) {
   (array1, array2) =>
 val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
-val hs = new OpenHashSet[Any]
-var foundNullElement = false
+val hs = new SQLOpenHashSet[Any]()
+val isNaN = SQLOpenHashSet.isNaN(elementType)
 Seq(array1, array2).foreach { array =>
   var i = 0
   while (i < array.numElements()) {
 if (array.isNullAt(i)) {
-  if (!foundNullElement) {
+  if (!hs.containsNull) {
+hs.addNull
 arrayBuffer += null
-foundNullElement = true
   }
 } else {
   val elem = array.get(i, elementType)
-  if (!hs.contains(elem)) {
-if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-  
ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size)
+  if (isNaN(elem)) {
+if (!hs.containsNaN) {
+  arrayBuffer += elem
+  hs.addNaN
+}
+  } else {
+if (!hs.contains(elem)) {
+  if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+
ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size)
+  }
+  arrayBuffer += elem
+  hs.add(elem)
 }
-arrayBuffer += elem
-hs.add(elem)
   }
 }
 i += 1
@@ -3441,13 +3449,12 @@ case class ArrayUnion(left: Expression, right: 
Expression) extends ArrayBinaryLi
   val ptName = CodeGenerator.primitiveTypeName(jt)
 
   nullSafeCodeGen(ctx, ev, (array1, array2) => {
-val foundNullElement = ctx.freshName("foundNullElement")
 val nullElementIndex = ctx.freshName("nullElementIndex")
 val builder = ctx.freshName("builder")
 val array = ctx.freshName("array")
 val arrays = ctx.freshName("arrays")
 val arrayDataIdx = ctx.freshName("arrayDataIdx")
-val openHashSet = classOf[OpenHashSet[_]].getName
+   

[spark] branch branch-3.2 updated: [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan

2021-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new a472612  [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN 
and Float.Nan
a472612 is described below

commit a472612eb84805de590406e8af810ec6fa00d114
Author: Angerszh 
AuthorDate: Tue Sep 14 18:25:47 2021 +0800

[SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan

### What changes were proposed in this pull request?
For query
```
select array_union(array(cast('nan' as double), cast('nan' as double)), 
array())
```
This returns [NaN, NaN], but it should return [NaN].
This issue is caused by `OpenHashSet` can't handle `Double.NaN` and 
`Float.NaN` too.
In this pr we add a wrap for OpenHashSet that can handle `null`, 
`Double.NaN`, `Float.NaN` together

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
ArrayUnion won't show duplicated `NaN` value

### How was this patch tested?
Added UT

Closes #33955 from AngersZh/SPARK-36702-WrapOpenHashSet.

Lead-authored-by: Angerszh 
Co-authored-by: AngersZh 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f71f37755d581017f549ecc8683fb7afc2852c67)
Signed-off-by: Wenchen Fan 
---
 .../expressions/collectionOperations.scala | 61 +-
 .../org/apache/spark/sql/util/SQLOpenHashSet.scala | 72 ++
 .../expressions/CollectionExpressionsSuite.scala   | 17 +
 3 files changed, 133 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index ce17231..e5620a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SQLOpenHashSet
 import org.apache.spark.unsafe.UTF8StringBuilder
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
@@ -3575,24 +3576,31 @@ case class ArrayUnion(left: Expression, right: 
Expression) extends ArrayBinaryLi
 if (TypeUtils.typeWithProperEquals(elementType)) {
   (array1, array2) =>
 val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
-val hs = new OpenHashSet[Any]
-var foundNullElement = false
+val hs = new SQLOpenHashSet[Any]()
+val isNaN = SQLOpenHashSet.isNaN(elementType)
 Seq(array1, array2).foreach { array =>
   var i = 0
   while (i < array.numElements()) {
 if (array.isNullAt(i)) {
-  if (!foundNullElement) {
+  if (!hs.containsNull) {
+hs.addNull
 arrayBuffer += null
-foundNullElement = true
   }
 } else {
   val elem = array.get(i, elementType)
-  if (!hs.contains(elem)) {
-if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-  
ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size)
+  if (isNaN(elem)) {
+if (!hs.containsNaN) {
+  arrayBuffer += elem
+  hs.addNaN
+}
+  } else {
+if (!hs.contains(elem)) {
+  if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+
ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size)
+  }
+  arrayBuffer += elem
+  hs.add(elem)
 }
-arrayBuffer += elem
-hs.add(elem)
   }
 }
 i += 1
@@ -3649,13 +3657,12 @@ case class ArrayUnion(left: Expression, right: 
Expression) extends ArrayBinaryLi
   val ptName = CodeGenerator.primitiveTypeName(jt)
 
   nullSafeCodeGen(ctx, ev, (array1, array2) => {
-val foundNullElement = ctx.freshName("foundNullElement")
 val nullElementIndex = ctx.freshName("nullElementIndex")
 val builder = ctx.freshName("builder")
 val array = ctx.freshName("array")
 val arrays = ctx.freshName("arrays")
 val arrayDataIdx = ctx.freshName("arrayDataIdx")
-val openHashSet = classOf[OpenHashSet[_]].getName
+

[spark] branch master updated: [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan

2021-09-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f71f377  [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN 
and Float.Nan
f71f377 is described below

commit f71f37755d581017f549ecc8683fb7afc2852c67
Author: Angerszh 
AuthorDate: Tue Sep 14 18:25:47 2021 +0800

[SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan

### What changes were proposed in this pull request?
For query
```
select array_union(array(cast('nan' as double), cast('nan' as double)), 
array())
```
This returns [NaN, NaN], but it should return [NaN].
This issue is caused by `OpenHashSet` can't handle `Double.NaN` and 
`Float.NaN` too.
In this pr we add a wrap for OpenHashSet that can handle `null`, 
`Double.NaN`, `Float.NaN` together

### Why are the changes needed?
Fix bug

### Does this PR introduce _any_ user-facing change?
ArrayUnion won't show duplicated `NaN` value

### How was this patch tested?
Added UT

Closes #33955 from AngersZh/SPARK-36702-WrapOpenHashSet.

Lead-authored-by: Angerszh 
Co-authored-by: AngersZh 
Signed-off-by: Wenchen Fan 
---
 .../expressions/collectionOperations.scala | 61 +-
 .../org/apache/spark/sql/util/SQLOpenHashSet.scala | 72 ++
 .../expressions/CollectionExpressionsSuite.scala   | 17 +
 3 files changed, 133 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index ce17231..e5620a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SQLOpenHashSet
 import org.apache.spark.unsafe.UTF8StringBuilder
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
@@ -3575,24 +3576,31 @@ case class ArrayUnion(left: Expression, right: 
Expression) extends ArrayBinaryLi
 if (TypeUtils.typeWithProperEquals(elementType)) {
   (array1, array2) =>
 val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
-val hs = new OpenHashSet[Any]
-var foundNullElement = false
+val hs = new SQLOpenHashSet[Any]()
+val isNaN = SQLOpenHashSet.isNaN(elementType)
 Seq(array1, array2).foreach { array =>
   var i = 0
   while (i < array.numElements()) {
 if (array.isNullAt(i)) {
-  if (!foundNullElement) {
+  if (!hs.containsNull) {
+hs.addNull
 arrayBuffer += null
-foundNullElement = true
   }
 } else {
   val elem = array.get(i, elementType)
-  if (!hs.contains(elem)) {
-if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-  
ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size)
+  if (isNaN(elem)) {
+if (!hs.containsNaN) {
+  arrayBuffer += elem
+  hs.addNaN
+}
+  } else {
+if (!hs.contains(elem)) {
+  if (arrayBuffer.size > 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+
ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size)
+  }
+  arrayBuffer += elem
+  hs.add(elem)
 }
-arrayBuffer += elem
-hs.add(elem)
   }
 }
 i += 1
@@ -3649,13 +3657,12 @@ case class ArrayUnion(left: Expression, right: 
Expression) extends ArrayBinaryLi
   val ptName = CodeGenerator.primitiveTypeName(jt)
 
   nullSafeCodeGen(ctx, ev, (array1, array2) => {
-val foundNullElement = ctx.freshName("foundNullElement")
 val nullElementIndex = ctx.freshName("nullElementIndex")
 val builder = ctx.freshName("builder")
 val array = ctx.freshName("array")
 val arrays = ctx.freshName("arrays")
 val arrayDataIdx = ctx.freshName("arrayDataIdx")
-val openHashSet = classOf[OpenHashSet[_]].getName
+val openHashSet = classOf[SQLOpenHashSet[_]].getName
 val classTag =