This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new daa140d  [SPARK-31019][SQL] make it clear that people can deduplicate 
map keys
daa140d is described below

commit daa140df726f8521ae7bbaf4586277ae4f4aea7c
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu Mar 5 20:43:52 2020 +0900

    [SPARK-31019][SQL] make it clear that people can deduplicate map keys
    
    rename the config and make it non-internal.
    
    Now we fail the query if duplicated map keys are detected, and provide a 
legacy config to deduplicate it. However, we must provide a way to get users 
out of this situation, instead of just rejecting to run the query. This exit 
strategy should always be there, while legacy config indicates that it may be 
removed someday.
    
    no, just rename a config which was added in 3.0
    
    add more tests for the fail behavior.
    
    Closes #27772 from cloud-fan/map.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 docs/sql-migration-guide.md                        |  2 +-
 .../sql/catalyst/util/ArrayBasedMapBuilder.scala   | 20 +++---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 25 +++++---
 .../expressions/CollectionExpressionsSuite.scala   | 14 ++++-
 .../catalyst/expressions/ComplexTypeSuite.scala    | 15 ++++-
 .../expressions/HigherOrderFunctionsSuite.scala    |  5 +-
 .../catalyst/util/ArrayBasedMapBuilderSuite.scala  | 71 ++++++++++++++--------
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 13 +++-
 8 files changed, 112 insertions(+), 53 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 0050061..6c73038 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -49,7 +49,7 @@ license: |
 
   - In Spark version 2.4 and earlier, float/double -0.0 is semantically equal 
to 0.0, but -0.0 and 0.0 are considered as different values when used in 
aggregate grouping keys, window partition keys and join keys. Since Spark 3.0, 
this bug is fixed. For example, `Seq(-0.0, 0.0).toDF("d").groupBy("d").count()` 
returns `[(0.0, 2)]` in Spark 3.0, and `[(0.0, 1), (-0.0, 1)]` in Spark 2.4 and 
earlier.
 
-  - In Spark version 2.4 and earlier, users can create a map with duplicated 
keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior 
of map with duplicated keys is undefined, e.g. map look up respects the 
duplicated key appears first, `Dataset.collect` only keeps the duplicated key 
appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, new 
config `spark.sql.legacy.allowDuplicatedMapKeys` was added, with the default 
value `false`, Spark will throw Ru [...]
+  - In Spark version 2.4 and earlier, users can create a map with duplicated 
keys via built-in functions like `CreateMap`, `StringToMap`, etc. The behavior 
of map with duplicated keys is undefined, e.g. map look up respects the 
duplicated key appears first, `Dataset.collect` only keeps the duplicated key 
appears last, `MapKeys` returns duplicated keys, etc. Since Spark 3.0, Spark 
will throw RuntimeException while duplicated keys are found. Users can set 
`spark.sql.mapKeyDedupPolicy` to L [...]
 
   - In Spark version 2.4 and earlier, partition column value is converted as 
null if it can't be casted to corresponding user provided schema. Since 3.0, 
partition column value is validated with user provided schema. An exception is 
thrown if the validation fails. You can disable such validation by setting 
`spark.sql.sources.validatePartitionColumns` to `false`.
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
index 40e75b5..0185b57 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.array.ByteArrayMethods
 
@@ -49,8 +48,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: 
DataType) extends Seria
   private lazy val keyGetter = InternalRow.getAccessor(keyType)
   private lazy val valueGetter = InternalRow.getAccessor(valueType)
 
-  private val allowDuplicatedMapKey =
-    SQLConf.get.getConf(LEGACY_ALLOW_DUPLICATED_MAP_KEY)
+  private val mapKeyDedupPolicy = 
SQLConf.get.getConf(SQLConf.MAP_KEY_DEDUP_POLICY)
 
   def put(key: Any, value: Any): Unit = {
     if (key == null) {
@@ -67,13 +65,17 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: 
DataType) extends Seria
       keys.append(key)
       values.append(value)
     } else {
-      if (!allowDuplicatedMapKey) {
-        throw new RuntimeException(s"Duplicate map key $key was founded, 
please check the input " +
-          "data. If you want to remove the duplicated keys with last-win 
policy, you can set " +
-          s"${LEGACY_ALLOW_DUPLICATED_MAP_KEY.key} to true.")
+      if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.EXCEPTION.toString) {
+        throw new RuntimeException(s"Duplicate map key $key was found, please 
check the input " +
+          "data. If you want to remove the duplicated keys, you can set " +
+          s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to 
${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " +
+          "the key inserted at last takes precedence.")
+      } else if (mapKeyDedupPolicy == 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
+        // Overwrite the previous value, as the policy is last wins.
+        values(index) = value
+      } else {
+        throw new IllegalStateException("Unknown map key dedup policy: " + 
mapKeyDedupPolicy)
       }
-      // Overwrite the previous value, as the policy is last wins.
-      values(index) = value
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e39d066..68a89b2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2095,6 +2095,21 @@ object SQLConf {
       .stringConf
       .createOptional
 
+  object MapKeyDedupPolicy extends Enumeration {
+    val EXCEPTION, LAST_WIN = Value
+  }
+
+  val MAP_KEY_DEDUP_POLICY = buildConf("spark.sql.mapKeyDedupPolicy")
+    .doc("The policy to deduplicate map keys in builtin function: CreateMap, 
MapFromArrays, " +
+      "MapFromEntries, StringToMap, MapConcat and TransformKeys. When 
EXCEPTION, the query " +
+      "fails if duplicated map keys are detected. When LAST_WIN, the map key 
that is inserted " +
+      "at last takes precedence.")
+    .version("3.0.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(MapKeyDedupPolicy.values.map(_.toString))
+    .createWithDefault(MapKeyDedupPolicy.EXCEPTION.toString)
+
   val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.doLooseUpcast")
     .internal()
     .doc("When true, the upcast will be loose and allows string to atomic 
types.")
@@ -2202,16 +2217,6 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
-  val LEGACY_ALLOW_DUPLICATED_MAP_KEY =
-    buildConf("spark.sql.legacy.allowDuplicatedMapKeys")
-      .doc("When true, use last wins policy to remove duplicated map keys in 
built-in functions, " +
-        "this config takes effect in below build-in functions: CreateMap, 
MapFromArrays, " +
-        "MapFromEntries, StringToMap, MapConcat and TransformKeys. Otherwise, 
if this is false, " +
-        "which is the default, Spark will throw an exception when duplicated 
map keys are " +
-        "detected.")
-      .booleanConf
-      .createWithDefault(false)
-
   val LEGACY_ALLOW_HASH_ON_MAPTYPE = 
buildConf("spark.sql.legacy.allowHashOnMapType")
     .doc("When set to true, hash expressions can be applied on elements of 
MapType. Otherwise, " +
       "an analysis exception will be thrown.")
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index 01df667..3cfc66f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -139,7 +139,9 @@ class CollectionExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper
       MapType(IntegerType, IntegerType, valueContainsNull = true))
     val mNull = Literal.create(null, MapType(StringType, StringType))
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      MapConcat(Seq(m0, m1)), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // overlapping maps should remove duplicated map keys w.r.t. last win 
policy.
       checkEvaluation(MapConcat(Seq(m0, m1)), create_map("a" -> "4", "b" -> 
"2", "c" -> "3"))
     }
@@ -274,7 +276,10 @@ class CollectionExpressionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
     checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> 
null))
     checkEvaluation(MapFromEntries(ai2), Map.empty)
     checkEvaluation(MapFromEntries(ai3), null)
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+
+    checkExceptionInExpression[RuntimeException](
+      MapFromEntries(ai4), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(MapFromEntries(ai4), create_map(1 -> 20))
     }
@@ -298,7 +303,10 @@ class CollectionExpressionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
     checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", 
"c" -> null))
     checkEvaluation(MapFromEntries(as2), Map.empty)
     checkEvaluation(MapFromEntries(as3), null)
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+
+    checkExceptionInExpression[RuntimeException](
+      MapFromEntries(as4), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(MapFromEntries(as4), create_map("a" -> "bb"))
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
index 2c1e0c8..3df7d02 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala
@@ -217,7 +217,9 @@ class ComplexTypeSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       CreateMap(interlace(strWithNull, intSeq.map(Literal(_)))),
       "Cannot use null as map key")
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))), 
"Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(
         CreateMap(Seq(Literal(1), Literal(2), Literal(1), Literal(3))),
@@ -284,7 +286,12 @@ class ComplexTypeSuite extends SparkFunSuite with 
ExpressionEvalHelper {
       MapFromArrays(intWithNullArray, strArray),
       "Cannot use null as map key")
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      MapFromArrays(
+        Literal.create(Seq(1, 1), ArrayType(IntegerType)),
+        Literal.create(Seq(2, 3), ArrayType(IntegerType))),
+      "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(
         MapFromArrays(
@@ -404,7 +411,9 @@ class ComplexTypeSuite extends SparkFunSuite with 
ExpressionEvalHelper {
     val m5 = Map("a" -> null)
     checkEvaluation(new StringToMap(s5), m5)
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    checkExceptionInExpression[RuntimeException](
+      new StringToMap(Literal("a:1,b:2,a:3")), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(
         new StringToMap(Literal("a:1,b:2,a:3")),
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
index b343853..c07f06e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
@@ -465,7 +465,10 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with 
ExpressionEvalHelper
     checkEvaluation(
       transformKeys(transformKeys(ai0, plusOne), plusValue),
       create_map(3 -> 1, 5 -> 2, 7 -> 3, 9 -> 4))
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+
+    checkExceptionInExpression[RuntimeException](
+      transformKeys(ai0, modKey), "Duplicate map key")
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       // Duplicated map keys will be removed w.r.t. the last wins policy.
       checkEvaluation(transformKeys(ai0, modKey), create_map(1 -> 4, 2 -> 2, 0 
-> 3))
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
index 87bbdb7..6e07cd5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
@@ -48,11 +48,11 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with 
SQLHelper {
     val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
     builder.put(1, 1)
     val e = intercept[RuntimeException](builder.put(1, 2))
-    assert(e.getMessage.contains("Duplicate map key 1 was founded"))
+    assert(e.getMessage.contains("Duplicate map key 1 was found"))
   }
 
   test("remove duplicated keys with last wins policy") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
       builder.put(1, 1)
       builder.put(2, 2)
@@ -63,8 +63,15 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with 
SQLHelper {
     }
   }
 
-  test("binary type key") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+  test("binary type key with duplication") {
+    val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
+    builder.put(Array(1.toByte), 1)
+    builder.put(Array(2.toByte), 2)
+    val e = intercept[RuntimeException](builder.put(Array(1.toByte), 3))
+    // By default duplicated map key fails the query.
+    assert(e.getMessage.contains("Duplicate map key"))
+
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(BinaryType, IntegerType)
       builder.put(Array(1.toByte), 1)
       builder.put(Array(2.toByte), 2)
@@ -79,18 +86,26 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with 
SQLHelper {
     }
   }
 
-  test("struct type key") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+  test("struct type key with duplication") {
+    val unsafeRow = {
+      val row = new UnsafeRow(1)
+      val bytes = new Array[Byte](16)
+      row.pointTo(bytes, 16)
+      row.setInt(0, 1)
+      row
+    }
+
+    val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), 
IntegerType)
+    builder.put(InternalRow(1), 1)
+    builder.put(InternalRow(2), 2)
+    val e = intercept[RuntimeException](builder.put(unsafeRow, 3))
+    // By default duplicated map key fails the query.
+    assert(e.getMessage.contains("Duplicate map key"))
+
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(new StructType().add("i", "int"), 
IntegerType)
       builder.put(InternalRow(1), 1)
       builder.put(InternalRow(2), 2)
-      val unsafeRow = {
-        val row = new UnsafeRow(1)
-        val bytes = new Array[Byte](16)
-        row.pointTo(bytes, 16)
-        row.setInt(0, 1)
-        row
-      }
       builder.put(unsafeRow, 3)
       val map = builder.build()
       assert(map.numElements() == 2)
@@ -98,20 +113,28 @@ class ArrayBasedMapBuilderSuite extends SparkFunSuite with 
SQLHelper {
     }
   }
 
-  test("array type key") {
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+  test("array type key with duplication") {
+    val unsafeArray = {
+      val array = new UnsafeArrayData()
+      val bytes = new Array[Byte](24)
+      Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
+      array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
+      array.setInt(0, 1)
+      array.setInt(1, 1)
+      array
+    }
+
+    val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), IntegerType)
+    builder.put(new GenericArrayData(Seq(1, 1)), 1)
+    builder.put(new GenericArrayData(Seq(2, 2)), 2)
+    val e = intercept[RuntimeException](builder.put(unsafeArray, 3))
+    // By default duplicated map key fails the query.
+    assert(e.getMessage.contains("Duplicate map key"))
+
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), 
IntegerType)
       builder.put(new GenericArrayData(Seq(1, 1)), 1)
       builder.put(new GenericArrayData(Seq(2, 2)), 2)
-      val unsafeArray = {
-        val array = new UnsafeArrayData()
-        val bytes = new Array[Byte](24)
-        Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
-        array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
-        array.setInt(0, 1)
-        array.setInt(1, 1)
-        array
-      }
       builder.put(unsafeArray, 3)
       val map = builder.build()
       assert(map.numElements() == 2)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index b4b9a48..a613c33 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -23,6 +23,7 @@ import java.util.TimeZone
 
 import scala.util.Random
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
@@ -651,7 +652,9 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
       Row(null)
     )
 
-    withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+    intercept[SparkException](df1.selectExpr("map_concat(map1, 
map2)").collect())
+    intercept[SparkException](df1.select(map_concat($"map1", 
$"map2")).collect())
+    withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
       checkAnswer(df1.selectExpr("map_concat(map1, map2)"), expected1a)
       checkAnswer(df1.select(map_concat($"map1", $"map2")), expected1a)
     }
@@ -3070,7 +3073,13 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
       checkAnswer(dfExample2.select(transform_keys(col("j"), (k, v) => k + v)),
         Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7))))
 
-      withSQLConf(SQLConf.LEGACY_ALLOW_DUPLICATED_MAP_KEY.key -> "true") {
+      intercept[SparkException] {
+        dfExample3.selectExpr("transform_keys(x, (k, v) ->  k % 2 = 0 OR 
v)").collect()
+      }
+      intercept[SparkException] {
+        dfExample3.select(transform_keys(col("x"), (k, v) => k % 2 === 0 || 
v)).collect()
+      }
+      withSQLConf(SQLConf.MAP_KEY_DEDUP_POLICY.key -> 
SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
         checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) ->  k % 2 
= 0 OR v)"),
           Seq(Row(Map(true -> true, true -> false))))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to