[spark] branch master updated (e43b9e8 -> 5a9d4c1)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e43b9e8 [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a struct has many fields add 5a9d4c1 [SPARK-36660][SQL][FOLLOW-UP] Add cot to pyspark.sql.rst No new revisions were added by this update. Summary of changes: python/docs/source/reference/pyspark.sql.rst | 1 + 1 file changed, 1 insertion(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a struct has many fields
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e43b9e8 [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a struct has many fields e43b9e8 is described below commit e43b9e8520bd4ea5bc3693beb496893b17e79054 Author: Kousuke Saruta AuthorDate: Wed Sep 15 10:33:58 2021 +0900 [SPARK-36733][SQL] Fix a perf issue in SchemaPruning when a struct has many fields ### What changes were proposed in this pull request? This PR fixes a perf issue in `SchemaPruning` when a struct has many fields (e.g. >10K fields). The root cause is `SchemaPruning.sortLeftFieldsByRight` does N * M order searching. ``` val filteredRightFieldNames = rightStruct.fieldNames .filter(name => leftStruct.fieldNames.exists(resolver(_, name))) ``` To fix this issue, this PR proposes to use `HashMap` to expect a constant order searching. This PR also adds `case _ if left == right => left` to the method as a short-circuit code. ### Why are the changes needed? To fix a perf issue. ### Does this PR introduce _any_ user-facing change? No. The logic should be identical. ### How was this patch tested? I confirmed that the following micro benchmark finishes within a few seconds. ``` import org.apache.spark.sql.catalyst.expressions.SchemaPruning import org.apache.spark.sql.types._ var struct1 = new StructType() (1 to 5).foreach { i => struct1 = struct1.add(new StructField(i + "", IntegerType)) } var struct2 = new StructType() (50001 to 10).foreach { i => struct2 = struct2.add(new StructField(i + "", IntegerType)) } SchemaPruning.sortLeftFieldsByRight(struct1, struct2) SchemaPruning.sortLeftFieldsByRight(struct2, struct2) ``` The correctness should be checked by existing tests. Closes #33981 from sarutak/improve-schemapruning-performance. Authored-by: Kousuke Saruta Signed-off-by: Hyukjin Kwon --- .../sql/catalyst/expressions/SchemaPruning.scala | 32 +++--- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala index 9aa2766..2a182b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.Locale + +import scala.collection.immutable.HashMap + import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.types._ @@ -54,6 +58,7 @@ object SchemaPruning extends SQLConfHelper { */ private def sortLeftFieldsByRight(left: DataType, right: DataType): DataType = (left, right) match { + case _ if left == right => left case (ArrayType(leftElementType, containsNull), ArrayType(rightElementType, _)) => ArrayType( sortLeftFieldsByRight(leftElementType, rightElementType), @@ -65,16 +70,23 @@ object SchemaPruning extends SQLConfHelper { sortLeftFieldsByRight(leftValueType, rightValueType), containsNull) case (leftStruct: StructType, rightStruct: StructType) => -val resolver = conf.resolver -val filteredRightFieldNames = rightStruct.fieldNames - .filter(name => leftStruct.fieldNames.exists(resolver(_, name))) -val sortedLeftFields = filteredRightFieldNames.map { fieldName => - val resolvedLeftStruct = leftStruct.find(p => resolver(p.name, fieldName)).get - val leftFieldType = resolvedLeftStruct.dataType - val rightFieldType = rightStruct(fieldName).dataType - val sortedLeftFieldType = sortLeftFieldsByRight(leftFieldType, rightFieldType) - StructField(fieldName, sortedLeftFieldType, nullable = resolvedLeftStruct.nullable, -metadata = resolvedLeftStruct.metadata) +val formatFieldName: String => String = + if (conf.caseSensitiveAnalysis) identity else _.toLowerCase(Locale.ROOT) + +val leftStructHashMap = + HashMap(leftStruct.map(f => formatFieldName(f.name)).zip(leftStruct): _*) +val sortedLeftFields = rightStruct.fieldNames.flatMap { fieldName => + val formattedFieldName = formatFieldName(fieldName) + if (leftStructHashMap.contains(formattedFieldName)) { +val resolvedLeftStruct = leftStructHashMap(formattedFieldName) +val leftFieldType = resolvedLeftStruct.dataType +val rightFieldType =
[spark] branch master updated (3e5d3d1 -> 0aaf86b)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3e5d3d1 [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in Jenkins add 0aaf86b [SPARK-36709][PYTHON] Support new syntax for specifying index type and name in pandas API on Spark No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py | 57 +- python/pyspark/pandas/series.py| 18 +- python/pyspark/pandas/typedef/typehints.py | 289 ++--- 3 files changed, 272 insertions(+), 92 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in Jenkins
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3e5d3d1 [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in Jenkins 3e5d3d1 is described below commit 3e5d3d1cfe9c66f9bb729ecd82299c5242b1b01b Author: Kevin Su AuthorDate: Wed Sep 15 09:24:50 2021 +0900 [SPARK-34943][BUILD] Upgrade flake8 to 3.8.0 or above in Jenkins ### What changes were proposed in this pull request? Upgrade flake8 to 3.8.0 or above in Jenkins ### Why are the changes needed? In flake8 < 3.8.0, F401 error occurs for imports in if statements when TYPE_CHECKING is True. However, TYPE_CHECKING is always False at runtime, so there is no need to treat it as an error in static analysis. Since this behavior is fixed In flake8 >= 3.8.0, we should upgrade the flake8 installed in Jenkins to 3.8.0 or above. Otherwise, it occurs F401 error for several lines in pandas-on-PySpark that use TYPE_CHECKING ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the CI Closes #32749 from pingsutw/SPARK-34943. Lead-authored-by: Kevin Su Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- dev/lint-python | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/lint-python b/dev/lint-python index e54e391..72438f7 100755 --- a/dev/lint-python +++ b/dev/lint-python @@ -18,7 +18,7 @@ # define test binaries + versions FLAKE8_BUILD="flake8" # TODO(SPARK-34943): minimum version should be 3.8+ -MINIMUM_FLAKE8="3.5.0" +MINIMUM_FLAKE8="3.8.0" MINIMUM_MYPY="0.910" MYPY_BUILD="mypy" PYCODESTYLE_BUILD="pycodestyle" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 6a1dacb [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml 6a1dacb is described below commit 6a1dacb6b6c4e33ceed942d4b7c2724e6e6b713c Author: Dongjoon Hyun AuthorDate: Tue Sep 14 16:26:50 2021 -0700 [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml ### What changes were proposed in this pull request? This PR aims to fix the regex to avoid breaking `pom.xml`. ### Why are the changes needed? **BEFORE** ``` $ dev/change-scala-version.sh 2.12 $ git diff | head -n10 diff --git a/core/pom.xml b/core/pom.xml index dbde22f2bf..6ed368353b 100644 --- a/core/pom.xml +++ b/core/pom.xml -35,7 +35,7 -\)\(\)/\1\2/' $f - sed_i 's/^\([[:space:]]*\)/\1\)$/\1\)/\1-->\2/' $f done - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d730ef2 [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml d730ef2 is described below commit d730ef24fee49b32d9289fd203cbc7eb3b715017 Author: Dongjoon Hyun AuthorDate: Tue Sep 14 16:26:50 2021 -0700 [SPARK-36712][BUILD][FOLLOWUP] Improve the regex to avoid breaking pom.xml ### What changes were proposed in this pull request? This PR aims to fix the regex to avoid breaking `pom.xml`. ### Why are the changes needed? **BEFORE** ``` $ dev/change-scala-version.sh 2.12 $ git diff | head -n10 diff --git a/core/pom.xml b/core/pom.xml index dbde22f2bf..6ed368353b 100644 --- a/core/pom.xml +++ b/core/pom.xml -35,7 +35,7 -\)\(\)/\1\2/' $f - sed_i 's/^\([[:space:]]*\)/\1\)$/\1\)/\1-->\2/' $f done - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 119ddd7 [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456 119ddd7 is described below commit 119ddd7e9526ed899f88a944babb74af693297f5 Author: yangjie01 AuthorDate: Tue Sep 14 21:16:58 2021 +0900 [SPARK-36737][BUILD][CORE][SQL][SS] Upgrade Apache commons-io to 2.11.0 and revert change of SPARK-36456 ### What changes were proposed in this pull request? SPARK-36456 change to use `JavaUtils. closeQuietly` instead of `IOUtils.closeQuietly`, but there is slightly different from the 2 methods in default behavior: swallowing IOException is same, but the former logs it as ERROR while the latter doesn't log by default. `Apache commons-io` community decided to retain the `IOUtils.closeQuietly` method in the [new version](https://github.com/apache/commons-io/blob/75f20dca72656225d0dc8e7c982e40caa9277d42/src/main/java/org/apache/commons/io/IOUtils.java#L465-L467) and removed deprecated annotation, the change has been released in version 2.11.0. So the change of this pr is to upgrade `Apache commons-io` to 2.11.0 and revert change of SPARK-36456 to maintain original behavior(don't print error log). ### Why are the changes needed? 1. Upgrade `Apache commons-io` to 2.11.0 to use non-deprecated `closeQuietly` API, other changes related to `Apache commons-io are detailed in [commons-io/changes-report](https://commons.apache.org/proper/commons-io/changes-report.html#a2.11.0) 2. Revert change of SPARK-36737 to maintain original `IOUtils.closeQuietly` API behavior(don't print error log). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #33977 from LuciferYang/upgrade-commons-io. Authored-by: yangjie01 Signed-off-by: Jungtaek Lim --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 5 +++-- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 5 +++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- .../scala/org/apache/spark/util/logging/RollingFileAppender.scala | 5 ++--- core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala | 3 +-- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 6 +++--- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +- pom.xml | 2 +- .../org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 4 ++-- .../org/apache/spark/sql/execution/streaming/StreamMetadata.scala | 4 ++-- .../execution/streaming/state/HDFSBackedStateStoreProvider.scala| 4 ++-- .../spark/sql/execution/streaming/state/RocksDBFileManager.scala| 5 ++--- 12 files changed, 24 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index b537060..cbb4e9c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -35,6 +35,7 @@ import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} import com.google.common.cache.CacheBuilder +import org.apache.commons.io.IOUtils import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors @@ -51,7 +52,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo -import org.apache.spark.network.util.{JavaUtils, TransportConf} +import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} @@ -341,7 +342,7 @@ private[spark] class BlockManager( false } } finally { -JavaUtils.closeQuietly(inputStream) +IOUtils.closeQuietly(inputStream) } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index b1713ec..eaecf65 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.util.{Failure,
[spark] branch branch-3.1 updated: [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 1042481 [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan 1042481 is described below commit 104248120094e3f2356023744a8efc0df22b0da6 Author: Angerszh AuthorDate: Tue Sep 14 18:25:47 2021 +0800 [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan ### What changes were proposed in this pull request? For query ``` select array_union(array(cast('nan' as double), cast('nan' as double)), array()) ``` This returns [NaN, NaN], but it should return [NaN]. This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too. In this pr we add a wrap for OpenHashSet that can handle `null`, `Double.NaN`, `Float.NaN` together ### Why are the changes needed? Fix bug ### Does this PR introduce _any_ user-facing change? ArrayUnion won't show duplicated `NaN` value ### How was this patch tested? Added UT Closes #33955 from AngersZh/SPARK-36702-WrapOpenHashSet. Lead-authored-by: Angerszh Co-authored-by: AngersZh Signed-off-by: Wenchen Fan (cherry picked from commit f71f37755d581017f549ecc8683fb7afc2852c67) Signed-off-by: Wenchen Fan --- .../expressions/collectionOperations.scala | 61 +- .../org/apache/spark/sql/util/SQLOpenHashSet.scala | 72 ++ .../expressions/CollectionExpressionsSuite.scala | 17 + 3 files changed, 133 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index bb2163c..b829ac0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SQLOpenHashSet import org.apache.spark.unsafe.UTF8StringBuilder import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH @@ -3367,24 +3368,31 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi if (TypeUtils.typeWithProperEquals(elementType)) { (array1, array2) => val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] -val hs = new OpenHashSet[Any] -var foundNullElement = false +val hs = new SQLOpenHashSet[Any]() +val isNaN = SQLOpenHashSet.isNaN(elementType) Seq(array1, array2).foreach { array => var i = 0 while (i < array.numElements()) { if (array.isNullAt(i)) { - if (!foundNullElement) { + if (!hs.containsNull) { +hs.addNull arrayBuffer += null -foundNullElement = true } } else { val elem = array.get(i, elementType) - if (!hs.contains(elem)) { -if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size) + if (isNaN(elem)) { +if (!hs.containsNaN) { + arrayBuffer += elem + hs.addNaN +} + } else { +if (!hs.contains(elem)) { + if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size) + } + arrayBuffer += elem + hs.add(elem) } -arrayBuffer += elem -hs.add(elem) } } i += 1 @@ -3441,13 +3449,12 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi val ptName = CodeGenerator.primitiveTypeName(jt) nullSafeCodeGen(ctx, ev, (array1, array2) => { -val foundNullElement = ctx.freshName("foundNullElement") val nullElementIndex = ctx.freshName("nullElementIndex") val builder = ctx.freshName("builder") val array = ctx.freshName("array") val arrays = ctx.freshName("arrays") val arrayDataIdx = ctx.freshName("arrayDataIdx") -val openHashSet = classOf[OpenHashSet[_]].getName +
[spark] branch branch-3.2 updated: [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new a472612 [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan a472612 is described below commit a472612eb84805de590406e8af810ec6fa00d114 Author: Angerszh AuthorDate: Tue Sep 14 18:25:47 2021 +0800 [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan ### What changes were proposed in this pull request? For query ``` select array_union(array(cast('nan' as double), cast('nan' as double)), array()) ``` This returns [NaN, NaN], but it should return [NaN]. This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too. In this pr we add a wrap for OpenHashSet that can handle `null`, `Double.NaN`, `Float.NaN` together ### Why are the changes needed? Fix bug ### Does this PR introduce _any_ user-facing change? ArrayUnion won't show duplicated `NaN` value ### How was this patch tested? Added UT Closes #33955 from AngersZh/SPARK-36702-WrapOpenHashSet. Lead-authored-by: Angerszh Co-authored-by: AngersZh Signed-off-by: Wenchen Fan (cherry picked from commit f71f37755d581017f549ecc8683fb7afc2852c67) Signed-off-by: Wenchen Fan --- .../expressions/collectionOperations.scala | 61 +- .../org/apache/spark/sql/util/SQLOpenHashSet.scala | 72 ++ .../expressions/CollectionExpressionsSuite.scala | 17 + 3 files changed, 133 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index ce17231..e5620a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SQLOpenHashSet import org.apache.spark.unsafe.UTF8StringBuilder import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH @@ -3575,24 +3576,31 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi if (TypeUtils.typeWithProperEquals(elementType)) { (array1, array2) => val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] -val hs = new OpenHashSet[Any] -var foundNullElement = false +val hs = new SQLOpenHashSet[Any]() +val isNaN = SQLOpenHashSet.isNaN(elementType) Seq(array1, array2).foreach { array => var i = 0 while (i < array.numElements()) { if (array.isNullAt(i)) { - if (!foundNullElement) { + if (!hs.containsNull) { +hs.addNull arrayBuffer += null -foundNullElement = true } } else { val elem = array.get(i, elementType) - if (!hs.contains(elem)) { -if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size) + if (isNaN(elem)) { +if (!hs.containsNaN) { + arrayBuffer += elem + hs.addNaN +} + } else { +if (!hs.contains(elem)) { + if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size) + } + arrayBuffer += elem + hs.add(elem) } -arrayBuffer += elem -hs.add(elem) } } i += 1 @@ -3649,13 +3657,12 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi val ptName = CodeGenerator.primitiveTypeName(jt) nullSafeCodeGen(ctx, ev, (array1, array2) => { -val foundNullElement = ctx.freshName("foundNullElement") val nullElementIndex = ctx.freshName("nullElementIndex") val builder = ctx.freshName("builder") val array = ctx.freshName("array") val arrays = ctx.freshName("arrays") val arrayDataIdx = ctx.freshName("arrayDataIdx") -val openHashSet = classOf[OpenHashSet[_]].getName +
[spark] branch master updated: [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f71f377 [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan f71f377 is described below commit f71f37755d581017f549ecc8683fb7afc2852c67 Author: Angerszh AuthorDate: Tue Sep 14 18:25:47 2021 +0800 [SPARK-36702][SQL] ArrayUnion handle duplicated Double.NaN and Float.Nan ### What changes were proposed in this pull request? For query ``` select array_union(array(cast('nan' as double), cast('nan' as double)), array()) ``` This returns [NaN, NaN], but it should return [NaN]. This issue is caused by `OpenHashSet` can't handle `Double.NaN` and `Float.NaN` too. In this pr we add a wrap for OpenHashSet that can handle `null`, `Double.NaN`, `Float.NaN` together ### Why are the changes needed? Fix bug ### Does this PR introduce _any_ user-facing change? ArrayUnion won't show duplicated `NaN` value ### How was this patch tested? Added UT Closes #33955 from AngersZh/SPARK-36702-WrapOpenHashSet. Lead-authored-by: Angerszh Co-authored-by: AngersZh Signed-off-by: Wenchen Fan --- .../expressions/collectionOperations.scala | 61 +- .../org/apache/spark/sql/util/SQLOpenHashSet.scala | 72 ++ .../expressions/CollectionExpressionsSuite.scala | 17 + 3 files changed, 133 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index ce17231..e5620a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SQLOpenHashSet import org.apache.spark.unsafe.UTF8StringBuilder import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH @@ -3575,24 +3576,31 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi if (TypeUtils.typeWithProperEquals(elementType)) { (array1, array2) => val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] -val hs = new OpenHashSet[Any] -var foundNullElement = false +val hs = new SQLOpenHashSet[Any]() +val isNaN = SQLOpenHashSet.isNaN(elementType) Seq(array1, array2).foreach { array => var i = 0 while (i < array.numElements()) { if (array.isNullAt(i)) { - if (!foundNullElement) { + if (!hs.containsNull) { +hs.addNull arrayBuffer += null -foundNullElement = true } } else { val elem = array.get(i, elementType) - if (!hs.contains(elem)) { -if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size) + if (isNaN(elem)) { +if (!hs.containsNaN) { + arrayBuffer += elem + hs.addNaN +} + } else { +if (!hs.contains(elem)) { + if (arrayBuffer.size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + ArrayBinaryLike.throwUnionLengthOverflowException(arrayBuffer.size) + } + arrayBuffer += elem + hs.add(elem) } -arrayBuffer += elem -hs.add(elem) } } i += 1 @@ -3649,13 +3657,12 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi val ptName = CodeGenerator.primitiveTypeName(jt) nullSafeCodeGen(ctx, ev, (array1, array2) => { -val foundNullElement = ctx.freshName("foundNullElement") val nullElementIndex = ctx.freshName("nullElementIndex") val builder = ctx.freshName("builder") val array = ctx.freshName("array") val arrays = ctx.freshName("arrays") val arrayDataIdx = ctx.freshName("arrayDataIdx") -val openHashSet = classOf[OpenHashSet[_]].getName +val openHashSet = classOf[SQLOpenHashSet[_]].getName val classTag =