This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fdcd1405a84b [SPARK-53629][SQL] Implement type widening for MERGE INTO 
WITH SCHEMA EVOLUTION
fdcd1405a84b is described below

commit fdcd1405a84b6c11dab601a3d4fc7890a7b67311
Author: Szehon Ho <[email protected]>
AuthorDate: Tue Sep 23 15:38:28 2025 +0800

    [SPARK-53629][SQL] Implement type widening for MERGE INTO WITH SCHEMA 
EVOLUTION
    
    ### What changes were proposed in this pull request?
    MERGE INTO WITH SCHEMA EVOLUTION already support adding new column, and 
also some type widening (if structs are missing some fields)
    
    It should support type widening for primitive data types.  Spark will call 
the V2DataSource TableCatalog to alter the schema, so the V2DataSource can 
decide whether it is acceptable or not.
    
    This change also fixes InMemoryDataSource to support this case
    
    ### Why are the changes needed?
    Support more use case for MERGE INTO WITH SCHEMA EVOLUTION.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52377 from szehon-ho/merge_type_evolution.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/plans/logical/v2Commands.scala    |   7 +-
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  24 +-
 .../sql/connector/MergeIntoTableSuiteBase.scala    | 418 ++++++++++++++++++++-
 3 files changed, 440 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index db56d4967c0d..682a234e17e4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ArrayType, BooleanType, DataType, 
IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, 
DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, 
StructType}
 import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
 
@@ -967,12 +967,15 @@ object MergeIntoTable {
           schemaChanges(currentElementType, updateElementType,
             originalTarget, originalSource, fieldPath ++ Seq("value"))
 
+      case (currentType: AtomicType, newType: AtomicType) if currentType != 
newType =>
+        Array(TableChange.updateColumnType(fieldPath, newType))
+
       case (currentType, newType) if currentType == newType =>
         // No change needed
         Array.empty[TableChange]
 
       case _ =>
-        // For now do not support type widening
+        // Do not support change between atomic and complex types for now
         throw QueryExecutionErrors.failedToMergeIncompatibleSchemasError(
           originalTarget, originalSource, null)
     }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index bd74da022ce1..14941de1cc2c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -28,11 +28,12 @@ import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, MetadataStructFieldWithLogicalName}
+import org.apache.spark.sql.catalyst.expressions.{Cast, EvalMode, 
GenericInternalRow, JoinedRow, Literal, MetadataStructFieldWithLogicalName}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
CaseInsensitiveMap, CharVarcharUtils, DateTimeUtils, GenericArrayData, MapData, 
ResolveDefaultColumns}
 import org.apache.spark.sql.connector.catalog.constraints.Constraint
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions._
+import org.apache.spark.sql.connector.expressions.{Literal => V2Literal}
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric, 
CustomTaskMetric}
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.read.colstats.{ColumnStatistics, 
Histogram, HistogramBin}
@@ -146,7 +147,7 @@ abstract class InMemoryBaseTable(
     case _: BucketTransform =>
     case _: SortedBucketTransform =>
     case _: ClusterByTransform =>
-    case NamedTransform("truncate", Seq(_: NamedReference, _: Literal[_])) =>
+    case NamedTransform("truncate", Seq(_: NamedReference, _: V2Literal[_])) =>
     case t if !allowUnsupportedTransforms =>
       throw new IllegalArgumentException(s"Transform $t is not a supported 
transform")
   }
@@ -244,7 +245,7 @@ abstract class InMemoryBaseTable(
         var dataTypeHashCode = 0
         valueTypePairs.foreach(dataTypeHashCode += _._2.hashCode())
         ((valueHashCode + 31 * dataTypeHashCode) & Integer.MAX_VALUE) % 
numBuckets
-      case NamedTransform("truncate", Seq(ref: NamedReference, length: 
Literal[_])) =>
+      case NamedTransform("truncate", Seq(ref: NamedReference, length: 
V2Literal[_])) =>
         extractor(ref.fieldNames, cleanedSchema, row) match {
           case (str: UTF8String, StringType) =>
             str.substring(0, length.value.asInstanceOf[Int])
@@ -910,7 +911,7 @@ private class BufferedRowsReader(
       arrayData: ArrayData,
       readType: DataType,
       writeType: DataType): ArrayData = {
-    val elements = arrayData.toArray[Any](readType)
+    val elements = arrayData.toArray[Any](writeType)
     val convertedElements = extractCollection(elements, readType, writeType)
     new GenericArrayData(convertedElements)
   }
@@ -921,8 +922,8 @@ private class BufferedRowsReader(
       readValueType: DataType,
       writeKeyType: DataType,
       writeValueType: DataType): MapData = {
-    val keys = mapData.keyArray().toArray[Any](readKeyType)
-    val values = mapData.valueArray().toArray[Any](readValueType)
+    val keys = mapData.keyArray().toArray[Any](writeKeyType)
+    val values = mapData.valueArray().toArray[Any](writeValueType)
 
     val convertedKeys = extractCollection(keys, readKeyType, writeKeyType)
     val convertedValues = extractCollection(values, readValueType, 
writeValueType)
@@ -962,9 +963,20 @@ private class BufferedRowsReader(
               wKeyType, wValueType)
           }
         }
+      case (readType: AtomicType, writeType: AtomicType) if readType != 
writeType =>
+        elements.map { elem =>
+          if (elem == null) {
+            null
+          } else {
+            castElement(elem, readType, writeType)
+          }
+        }
       case (_, _) => elements
     }
   }
+
+  private def castElement(elem: Any, toType: DataType, fromType: DataType): 
Any =
+    Cast(Literal(elem, fromType), toType, None, EvalMode.TRY).eval(null)
 }
 
 private class BufferedRowsWriterFactory(schema: StructType)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index 13669982e3eb..7d879e2c9a5e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.v2.MergeRowsExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, CartesianProductExec}
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{ArrayType, BooleanType, IntegerType, 
MapType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, BooleanType, IntegerType, 
LongType, MapType, StringType, StructField, StructType}
 
 abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase
   with AdaptiveSparkPlanHelper {
@@ -2450,6 +2450,422 @@ abstract class MergeIntoTableSuiteBase extends 
RowLevelOperationSuiteBase
     }
   }
 
+  test("merge into schema evolution type widening from short to int") {
+    Seq((true, true), (false, true), (true, false)).foreach {
+      case (withSchemaEvolution, schemaEvolutionEnabled) =>
+        withTable(tableNameAsString) {
+          withTempView("source") {
+            createAndInitTable("pk INT NOT NULL, salary SMALLINT, dep STRING",
+              """{ "pk": 1, "salary": 100, "dep": "hr" }
+                |{ "pk": 2, "salary": 200, "dep": "finance" }
+                |{ "pk": 3, "salary": 300, "dep": "engineering" }
+                |""".stripMargin)
+
+            if (!schemaEvolutionEnabled) {
+              sql(
+                s"""ALTER TABLE $tableNameAsString SET TBLPROPERTIES
+                   | ('auto-schema-evolution' = 'false')""".stripMargin)
+            }
+
+            // Source data with int salary values that would exceed short range
+            val sourceRows = Seq(
+              (1, 50000, "hr"),
+              (4, 40000, "sales"),
+              (5, 500, "marketing"))
+            sourceRows.toDF("pk", "salary", 
"dep").createOrReplaceTempView("source")
+
+            val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+            val mergeStmt =
+              s"""MERGE $schemaEvolutionClause
+                 |INTO $tableNameAsString t
+                 |USING source s
+                 |ON t.pk = s.pk
+                 |WHEN MATCHED THEN
+                 | UPDATE SET salary = s.salary
+                 |WHEN NOT MATCHED THEN
+                 | INSERT (pk, salary, dep) VALUES (s.pk, s.salary, s.dep)
+                 |""".stripMargin
+
+            if (withSchemaEvolution && schemaEvolutionEnabled) {
+              // Schema evolution should allow type widening from SMALLINT to 
INT for salary column
+              sql(mergeStmt)
+              checkAnswer(
+                sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"),
+                Seq(
+                  Row(1, 50000, "hr"),
+                  Row(2, 200, "finance"),
+                  Row(3, 300, "engineering"),
+                  Row(4, 40000, "sales"),
+                  Row(5, 500, "marketing")))
+              val tableSchema = sql(s"SELECT * FROM $tableNameAsString").schema
+              val salaryField = tableSchema.find(_.name == "salary").get
+              assert(salaryField.dataType == IntegerType)
+            } else {
+              val exception = intercept[Exception] {
+                sql(mergeStmt)
+              }
+              assert(exception.getMessage.contains(
+                "Fail to assign a value of \"INT\" type to the \"SMALLINT\" " +
+                  "type column or variable `salary` due to an overflow"))
+            }
+          }
+        }
+    }
+  }
+
+  test("merge into schema evolution type widening nested struct from int to 
long") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTable(tableNameAsString) {
+        withTempView("source") {
+          // Create table with nested struct containing int field
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |employee STRUCT<salary: INT, details: STRUCT<bonus: INT, 
years: INT>>,
+               |dep STRING""".stripMargin,
+            """{ "pk": 1, "employee": { "salary": 50000, "details":
+              |{ "bonus": 5000, "years": 2 } }, "dep": "hr" 
}""".stripMargin.replace("\n", "")
+              + "\n" +
+              """{ "pk": 2, "employee": { "salary": 60000, "details":
+                |{ "bonus": 6000, "years": 3 } }, "dep": "finance" }"""
+                .stripMargin.replace("\n", "")
+          )
+
+          // Source data with long values that exceed int range for nested 
fields
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("employee", StructType(Seq(
+              StructField("salary", IntegerType),
+              StructField("details", StructType(Seq(
+                StructField("bonus", LongType), // Changed from INT to LONG
+                StructField("years", IntegerType)
+              )))
+            ))),
+            StructField("dep", StringType)
+          ))
+
+          val data = Seq(
+            Row(1, Row(75000, Row(3000000000L, 5)), "hr"),
+            Row(3, Row(80000, Row(4000000000L, 1)), "engineering")
+          )
+
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
+
+          val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+          val mergeStmt =
+            s"""MERGE $schemaEvolutionClause
+               |INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT (pk, employee, dep) VALUES (s.pk, s.employee, s.dep)
+               |""".stripMargin
+
+          if (withSchemaEvolution) {
+            // Schema evolution should allow type widening from INT to LONG 
for nested bonus field
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"),
+              Seq(
+                Row(1, Row(75000, Row(3000000000L, 5)), "hr"),
+                Row(2, Row(60000, Row(6000, 3)), "finance"),
+                Row(3, Row(80000, Row(4000000000L, 1)), "engineering")
+              ))
+
+            val tableSchema = sql(s"SELECT * FROM $tableNameAsString").schema
+            val employeeField = tableSchema.find(_.name == 
"employee").get.dataType
+              .asInstanceOf[StructType]
+            val detailsField = employeeField.find(_.name == 
"details").get.dataType
+              .asInstanceOf[StructType]
+            val bonusField = detailsField.find(_.name == "bonus").get
+            assert(bonusField.dataType == LongType)
+          } else {
+            val exception = intercept[Exception] {
+              sql(mergeStmt)
+            }
+            assert(exception.getMessage.contains("Fail to assign a value of 
\"BIGINT\" type " +
+              "to the \"INT\" type column or variable 
`employee`.`details`.`bonus`" +
+              " due to an overflow"))
+          }
+        }
+      }
+    }
+  }
+
+  test("merge into schema evolution type widening in array from int to long") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTable(tableNameAsString) {
+        withTempView("source") {
+          // Create table with array of int values
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |scores ARRAY<INT>,
+               |dep STRING""".stripMargin,
+            """{ "pk": 1, "scores": [1000, 2000, 3000], "dep": "hr" }
+              |{ "pk": 2, "scores": [4000, 5000, 6000], "dep": "finance" }
+              |""".stripMargin)
+
+          // Source data with array of long values that exceed int range
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("scores", ArrayType(LongType)), // Changed from INT to 
LONG
+            StructField("dep", StringType)
+          ))
+
+          val data = Seq(
+            Row(1, Array(3000000000L, 4000000000L), "hr"),
+            Row(3, Array(5000000000L, 6000000000L), "engineering")
+          )
+
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
+
+          val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+          val mergeStmt =
+            s"""MERGE $schemaEvolutionClause
+               |INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT (pk, scores, dep) VALUES (s.pk, s.scores, s.dep)
+               |""".stripMargin
+
+          if (withSchemaEvolution) {
+            // Schema evolution should allow type widening from ARRAY<INT> to 
ARRAY<LONG>
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"),
+              Seq(
+                Row(1, Array(3000000000L, 4000000000L), "hr"),
+                Row(2, Array(4000, 5000, 6000), "finance"),
+                Row(3, Array(5000000000L, 6000000000L), "engineering")
+              ))
+
+            val tableSchema = sql(s"SELECT * FROM $tableNameAsString").schema
+            val scoresField = tableSchema.find(_.name == "scores").get
+            val arrayElementType = 
scoresField.dataType.asInstanceOf[ArrayType].elementType
+            assert(arrayElementType == LongType)
+          } else {
+            val exception = intercept[Exception] {
+              sql(mergeStmt)
+            }
+            assert(exception.getMessage.contains("Fail to assign a value of 
\"BIGINT\" type " +
+              "to the \"INT\" type column or variable `scores`.`element`" +
+              " due to an overflow"))
+          }
+        }
+      }
+    }
+  }
+
+  test("merge into schema evolution type widening in map from int to long") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTable(tableNameAsString) {
+        withTempView("source") {
+          // Create table with map of string to int values
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |metrics MAP<STRING, INT>,
+               |dep STRING""".stripMargin,
+            """{ "pk": 1, "metrics": {"revenue": 100000, "profit": 50000}, 
"dep": "hr" }
+              |{ "pk": 2, "metrics": {"revenue": 200000, "profit": 80000}, 
"dep": "finance" }
+              |""".stripMargin)
+
+          // Source data with map of string to long values that exceed int 
range
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("metrics", MapType(StringType, LongType)),
+            StructField("dep", StringType)
+          ))
+
+          val data = Seq(
+            Row(1, Map("revenue" -> 3000000000L, "profit" -> 1500000000L), 
"hr"),
+            Row(3, Map("revenue" -> 4000000000L, "profit" -> 2000000000L), 
"engineering")
+          )
+
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
+
+          val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+          val mergeStmt =
+            s"""MERGE $schemaEvolutionClause
+               |INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT (pk, metrics, dep) VALUES (s.pk, s.metrics, s.dep)
+               |""".stripMargin
+
+          if (withSchemaEvolution) {
+            // Schema evolution should allow type widening from MAP<_, INT> to 
MAP<_, LONG>
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"),
+              Seq(
+                Row(1, Map("revenue" -> 3000000000L, "profit" -> 1500000000L), 
"hr"),
+                Row(2, Map("revenue" -> 200000L, "profit" -> 80000L), 
"finance"),
+                Row(3, Map("revenue" -> 4000000000L, "profit" -> 2000000000L), 
"engineering")
+              ))
+
+            val tableSchema = sql(s"SELECT * FROM $tableNameAsString").schema
+            val metricsField = tableSchema.find(_.name == "metrics").get
+            val mapValueType = 
metricsField.dataType.asInstanceOf[MapType].valueType
+            assert(mapValueType == LongType)
+          } else {
+            val exception = intercept[Exception] {
+              sql(mergeStmt)
+            }
+            assert(exception.getMessage.contains("Fail to assign a value of 
\"BIGINT\" type " +
+              "to the \"INT\" type column or variable `metrics`.`value`" +
+              " due to an overflow"))
+          }
+        }
+      }
+    }
+  }
+
+  test("merge into schema evolution type widening two types and adding two 
columns") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTable(tableNameAsString) {
+        withTempView("source") {
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |score INT,
+               |rating SHORT,
+               |dep STRING""".stripMargin,
+            """{ "pk": 1, "score": 100, "rating": 45, "dep": "premium" }
+              |{ "pk": 2, "score": 85, "rating": 38, "dep": "standard" }
+              |""".stripMargin)
+
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("score", LongType), // Widened from INT to LONG
+            StructField("rating", IntegerType), // Widened from SHORT to INT
+            StructField("dep", StringType),
+            StructField("priority", StringType), // New column 1
+            StructField("region", StringType) // New column 2
+          ))
+
+          val data = Seq(
+            Row(1, 5000000000L, 485, "premium", "high", "west"),
+            Row(3, 7500000000L, 495, "enterprise", "critical", "east")
+          )
+
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
+
+          val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+          val mergeStmt =
+            s"""MERGE $schemaEvolutionClause
+               |INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
+
+          if (withSchemaEvolution) {
+            sql(mergeStmt)
+            checkAnswer(
+              sql(s"SELECT * FROM $tableNameAsString ORDER BY pk"),
+              Seq(
+                Row(1, 5000000000L, 485, "premium", "high", "west"),
+                Row(2, 85L, 38, "standard", null, null),
+                Row(3, 7500000000L, 495, "enterprise", "critical", "east")
+              ))
+
+            val tableSchema = sql(s"SELECT * FROM $tableNameAsString").schema
+            val scoreField = tableSchema.find(_.name == "score").get
+            val ratingField = tableSchema.find(_.name == "rating").get
+            val priorityField = tableSchema.find(_.name == "priority")
+            val regionField = tableSchema.find(_.name == "region")
+
+            // Verify type widening
+            assert(scoreField.dataType == LongType)
+            assert(ratingField.dataType == IntegerType)
+
+            // Verify new columns added
+            assert(priorityField.isDefined)
+            assert(regionField.isDefined)
+            assert(priorityField.get.dataType == StringType)
+            assert(regionField.get.dataType == StringType)
+          } else {
+            val exception = intercept[Exception] {
+              sql(mergeStmt)
+            }
+            assert(exception.getMessage.contains("Fail to assign a value of 
\"BIGINT\" type " +
+              "to the \"INT\" type column or variable `score` due to an 
overflow."))
+          }
+        }
+      }
+    }
+  }
+
+  test("merge into schema evolution type promotion from int to struct not 
allowed") {
+    Seq(true, false).foreach { withSchemaEvolution =>
+      withTable(tableNameAsString) {
+        withTempView("source") {
+          createAndInitTable(
+            s"""pk INT NOT NULL,
+               |data INT,
+               |dep STRING""".stripMargin,
+            """{ "pk": 1, "data": 100, "dep": "test" }
+              |{ "pk": 2, "data": 200, "dep": "sample" }
+              |""".stripMargin)
+
+          val sourceTableSchema = StructType(Seq(
+            StructField("pk", IntegerType),
+            StructField("data", StructType(Seq(
+              StructField("value", IntegerType),
+              StructField("timestamp", LongType)
+            ))),
+            StructField("dep", StringType)
+          ))
+
+          val data = Seq(
+            Row(1, Row(150, 1634567890L), "test"),
+            Row(3, Row(300, 1634567900L), "new")
+          )
+
+          spark.createDataFrame(spark.sparkContext.parallelize(data), 
sourceTableSchema)
+            .createOrReplaceTempView("source")
+
+          val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA 
EVOLUTION" else ""
+          val mergeStmt =
+            s"""MERGE $schemaEvolutionClause
+               |INTO $tableNameAsString t
+               |USING source s
+               |ON t.pk = s.pk
+               |WHEN MATCHED THEN
+               | UPDATE SET *
+               |WHEN NOT MATCHED THEN
+               | INSERT *
+               |""".stripMargin
+
+          // Even with schema evolution, int to struct promotion should not be 
allowed
+          val exception = intercept[Exception] {
+            sql(mergeStmt)
+          }
+
+          if (withSchemaEvolution) {
+            assert(exception.getMessage.contains("Failed to merge incompatible 
schemas"))
+          } else {
+            assert(exception.getMessage.contains(
+              """Cannot write incompatible data for the table 
``""".stripMargin))
+          }
+        }
+      }
+    }
+  }
+
   test("merge into schema evolution add column with nested field and set 
explicit columns") {
     Seq(true, false).foreach { withSchemaEvolution =>
       withTempView("source") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to