This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new daa140d [SPARK-31019][SQL] make it clear that people can deduplicate map keys daa140d is described below commit daa140df726f8521ae7bbaf4586277ae4f4aea7c Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu Mar 5 20:43:52 2020 +0900 [SPARK-31019][SQL] make it clear that people can deduplicate map keys rename the config and make it non-internal. Now we fail the query if duplicated map keys are detected, and provide a legacy config to deduplicate it. However, we must provide a way to get users out of this situation, instead of just rejecting to run the query. This exit strategy should always be there, while legacy config indicates that it may be removed someday. no, just rename a config which was added in 3.0 add more tests for the fail behavior. Closes #27772 from cloud-fan/map. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- docs/sql-migration-guide.md | 2 +- .../sql/catalyst/util/ArrayBasedMapBuilder.scala | 20 +++--- .../org/apache/spark/sql/internal/SQLConf.scala | 25 +++++--- .../expressions/CollectionExpressionsSuite.scala | 14 ++++- .../catalyst/expressions/ComplexTypeSuite.scala | 15 ++++- .../expressions/HigherOrderFunctionsSuite.scala | 5 +- .../catalyst/util/ArrayBasedMapBuilderSuite.scala | 71 ++++++++++++++-------- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 13 +++- 8 files changed, 112 insertions(+), 53 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 0050061..6c73038 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -49,7 +49,7 @@ license: | - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal to 0.0, but -0.0 and 0.0 are considered as different values when used in aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and earlier. - - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, new config `spark.sql.legacy.allowDuplicatedMapKeys` was added, with the default value `false`, Spark will throw Ru [...] + - In Spark version 2.4 and earlier, users can create a map with duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior of map with duplicated keys is undefined, e.g. map look up respects the duplicated key appears first, `Dataset.collect` only keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, Spark will throw RuntimeException while duplicated keys are found. Users can set `spark.sql.mapKeyDedupPolicy` to L [...] - In Spark version 2.4 and earlier, partition column value is converted as null if it can't be casted to corresponding user provided schema. Since 3.0, partition column value is validated with user provided schema. An exception is thrown if the validation fails. You can disable such validation by setting `spark.sql.sources.validatePartitionColumns` to `false`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala index 40e75b5..0185b57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY import org.apache.spark.sql.types._ import org.apache.spark.unsafe.array.ByteArrayMethods @@ -49,8 +48,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria private lazy val keyGetter = InternalRow.getAccessor(keyType) private lazy val valueGetter = InternalRow.getAccessor(valueType) - private val allowDuplicatedMapKey = - SQLConf.get.getConf(LEGACY_ALLOW_DUPLICATED_MAP_KEY) + private val mapKeyDedupPolicy = SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY) def put(key: Any, value: Any): Unit = { if (key == null) { @@ -67,13 +65,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria keys.append(key) values.append(value) } else { - if (!allowDuplicatedMapKey) { - throw new RuntimeException(s"Duplicate map key $key was founded, please check the input " + - "data. If you want to remove the duplicated keys with last-win policy, you can set " + - s"${LEGACY_ALLOW_DUPLICATED_MAP_KEY.key} to true.") + if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.EXCEPTION.toString) { + throw new RuntimeException(s"Duplicate map key $key was found, please check the input " + + "data. If you want to remove the duplicated keys, you can set " + + s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to ${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " + + "the key inserted at last takes precedence.") + } else if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { + // Overwrite the previous value, as the policy is last wins. + values(index) = value + } else { + throw new IllegalStateException("Unknown map key dedup policy: " + mapKeyDedupPolicy) } - // Overwrite the previous value, as the policy is last wins. - values(index) = value } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e39d066..68a89b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2095,6 +2095,21 @@ object SQLConf { .stringConf .createOptional + object MapKeyDedupPolicy extends Enumeration { + val EXCEPTION, LAST_WIN = Value + } + + val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy") + .doc("The policy to deduplicate map keys in builtin function: CreateMap, MapFromArrays, " + + "MapFromEntries, StringToMap, MapConcat and TransformKeys. When EXCEPTION, the query " + + "fails if duplicated map keys are detected. When LAST_WIN, the map key that is inserted " + + "at last takes precedence.") + .version("3.0.0") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(MapKeyDedupPolicy.values.map(_.toString)) + .createWithDefault(MapKeyDedupPolicy.EXCEPTION.toString) + val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.doLooseUpcast") .internal() .doc("When true, the upcast will be loose and allows string to atomic types.") @@ -2202,16 +2217,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val LEGACY_ALLOW_DUPLICATED_MAP_KEY = - buildConf("spark.sql.legacy.allowDuplicatedMapKeys") - .doc("When true, use last wins policy to remove duplicated map keys in built-in functions, " + - "this config takes effect in below build-in functions: CreateMap, MapFromArrays, " + - "MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, if this is false, " + - "which is the default, Spark will throw an exception when duplicated map keys are " + - "detected.") - .booleanConf - .createWithDefault(false) - val LEGACY_ALLOW_HASH_ON_MAPTYPE = buildConf("spark.sql.legacy.allowHashOnMapType") .doc("When set to true, hash expressions can be applied on elements of MapType. Otherwise, " + "an analysis exception will be thrown.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index 01df667..3cfc66f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -139,7 +139,9 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper MapType(IntegerType, IntegerType, valueContainsNull = true)) val mNull = Literal.create(null, MapType(StringType, StringType)) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + MapConcat(Seq(m0, m1)), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // overlapping maps should remove duplicated map keys w.r.t. last win policy. checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> "2", "c" -> "3")) } @@ -274,7 +276,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) checkEvaluation(MapFromEntries(ai2), Map.empty) checkEvaluation(MapFromEntries(ai3), null) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + + checkExceptionInExpression[RuntimeException]( + MapFromEntries(ai4), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20)) } @@ -298,7 +303,10 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) checkEvaluation(MapFromEntries(as2), Map.empty) checkEvaluation(MapFromEntries(as3), null) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + + checkExceptionInExpression[RuntimeException]( + MapFromEntries(as4), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala index 2c1e0c8..3df7d02 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala @@ -217,7 +217,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { CreateMap(interlace(strWithNull, intSeq.map(Literal(_)))), "Cannot use null as map key") - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation( CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), @@ -284,7 +286,12 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { MapFromArrays(intWithNullArray, strArray), "Cannot use null as map key") - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + MapFromArrays( + Literal.create(Seq(1, 1), ArrayType(IntegerType)), + Literal.create(Seq(2, 3), ArrayType(IntegerType))), + "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation( MapFromArrays( @@ -404,7 +411,9 @@ class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { val m5 = Map("a" -> null) checkEvaluation(new StringToMap(s5), m5) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + checkExceptionInExpression[RuntimeException]( + new StringToMap(Literal("a:1,b:2,a:3")), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation( new StringToMap(Literal("a:1,b:2,a:3")), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala index b343853..c07f06e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala @@ -465,7 +465,10 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation( transformKeys(transformKeys(ai0, plusOne), plusValue), create_map(3 -> 1, 5 -> 2, 7 -> 3, 9 -> 4)) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + + checkExceptionInExpression[RuntimeException]( + transformKeys(ai0, modKey), "Duplicate map key") + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { // Duplicated map keys will be removed w.r.t. the last wins policy. checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 -> 3)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala index 87bbdb7..6e07cd5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala @@ -48,11 +48,11 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) builder.put(1, 1) val e = intercept[RuntimeException](builder.put(1, 2)) - assert(e.getMessage.contains("Duplicate map key 1 was founded")) + assert(e.getMessage.contains("Duplicate map key 1 was found")) } test("remove duplicated keys with last wins policy") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType) builder.put(1, 1) builder.put(2, 2) @@ -63,8 +63,15 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { } } - test("binary type key") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + test("binary type key with duplication") { + val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType) + builder.put(Array(1.toByte), 1) + builder.put(Array(2.toByte), 2) + val e = intercept[RuntimeException](builder.put(Array(1.toByte), 3)) + // By default duplicated map key fails the query. + assert(e.getMessage.contains("Duplicate map key")) + + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType) builder.put(Array(1.toByte), 1) builder.put(Array(2.toByte), 2) @@ -79,18 +86,26 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { } } - test("struct type key") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + test("struct type key with duplication") { + val unsafeRow = { + val row = new UnsafeRow(1) + val bytes = new Array[Byte](16) + row.pointTo(bytes, 16) + row.setInt(0, 1) + row + } + + val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType) + builder.put(InternalRow(1), 1) + builder.put(InternalRow(2), 2) + val e = intercept[RuntimeException](builder.put(unsafeRow, 3)) + // By default duplicated map key fails the query. + assert(e.getMessage.contains("Duplicate map key")) + + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), IntegerType) builder.put(InternalRow(1), 1) builder.put(InternalRow(2), 2) - val unsafeRow = { - val row = new UnsafeRow(1) - val bytes = new Array[Byte](16) - row.pointTo(bytes, 16) - row.setInt(0, 1) - row - } builder.put(unsafeRow, 3) val map = builder.build() assert(map.numElements() == 2) @@ -98,20 +113,28 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with SQLHelper { } } - test("array type key") { - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + test("array type key with duplication") { + val unsafeArray = { + val array = new UnsafeArrayData() + val bytes = new Array[Byte](24) + Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2) + array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24) + array.setInt(0, 1) + array.setInt(1, 1) + array + } + + val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType) + builder.put(new GenericArrayData(Seq(1, 1)), 1) + builder.put(new GenericArrayData(Seq(2, 2)), 2) + val e = intercept[RuntimeException](builder.put(unsafeArray, 3)) + // By default duplicated map key fails the query. + assert(e.getMessage.contains("Duplicate map key")) + + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType) builder.put(new GenericArrayData(Seq(1, 1)), 1) builder.put(new GenericArrayData(Seq(2, 2)), 2) - val unsafeArray = { - val array = new UnsafeArrayData() - val bytes = new Array[Byte](24) - Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2) - array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24) - array.setInt(0, 1) - array.setInt(1, 1) - array - } builder.put(unsafeArray, 3) val map = builder.build() assert(map.numElements() == 2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index b4b9a48..a613c33 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -23,6 +23,7 @@ import java.util.TimeZone import scala.util.Random +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback @@ -651,7 +652,9 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { Row(null) ) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + intercept[SparkException](df1.selectExpr("map_concat(map1, map2)").collect()) + intercept[SparkException](df1.select(map_concat($"map1", $"map2")).collect()) + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a) checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a) } @@ -3070,7 +3073,13 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(dfExample2.select(transform_keys(col("j"), (k, v) => k + v)), Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7)))) - withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") { + intercept[SparkException] { + dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)").collect() + } + intercept[SparkException] { + dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || v)).collect() + } + withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) { checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"), Seq(Row(Map(true -> true, true -> false)))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org