cloud-fan commented on code in PR #55427:
URL: https://github.com/apache/spark/pull/55427#discussion_r3247636588


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -367,7 +369,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         reordered
       }
     } else {
-      Nil
+      cannotFindOutputData(tableName, colPath, expectedCols.map(_.name))

Review Comment:
   The latest commit (`2c56adef`, "Use enforceFullOutput for MERGE 
resolveUpdate as well") removed the `enforceFullOutput` parameter and made this 
throw — plus the matching throws in `resolveStructType` (line 504), 
`resolveArrayType` (line 545), `resolveMapType` (line 608) — unconditional. The 
previous commit `0be4cc8d951` had `enforceFullOutput=true` for INSERT (via 
`resolveOutputColumns`) and `false` for MERGE (via `resolveUpdate`), so MERGE 
could return `Nil` here, the recursive `resolveStructType` could return `None`, 
`resolveUpdate`'s `.getOrElse(value)` could fall back, and 
`AssignmentUtils.alignUpdateAssignments` would throw 
`DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS` with the specific 
`addError` message (e.g. "Cannot safely cast string to int").
   
   Now the throw fires for both callers and the `addError`-collected 
cast-failure detail is replaced by a generic 
`INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA` for MERGE/UPDATE. Concrete tests 
that assert the prior behavior:
   
   - `AlignMergeAssignmentsSuite` lines 738-744 (ANSI mode, `UPDATE SET s.n_s = 
named_struct('dn_i', 'string-value', 'dn_l', 1L)` on target `struct<dn_i:int, 
dn_l:bigint>`) — `assertAnalysisException(…, "Cannot safely cast")`. Same 
pattern at lines 920-925 (strict mode).
   - `AlignUpdateAssignmentsSuite` lines 546-548 and 599-601, same pattern.
   
   Repro path: `applyAssignments` → `resolveUpdate` (struct value, struct col) 
→ `resolveStructType` → `reorderColumnsByName` with `NONE` mode. For field 
`dn_i`: matched but types `(string, int)` ⇒ `checkField`'s `canWrite` fails, 
`addError("Cannot safely cast string to int")`, returns `None`. The flatMap 
drops it, `reordered.length=1 != expectedCols.length=2`, and HEAD's `else` 
branch on line 372 throws — bypassing 
`AssignmentUtils.alignUpdateAssignments`'s `errors.nonEmpty` check. 
`assertAnalysisException` does substring matching, so it fails on "Cannot find 
data" vs the expected "Cannot safely cast".
   
   The commit message says MergeInto schema evolution suites and 
`MergeIntoDataFrameSuite` nested struct tests pass; those don't exercise the 
type-mismatch path that goes through `reorderColumnsByName`'s `addError` 
accumulation.
   
   Side effect: `resolveStructType` / `resolveArrayType` / `resolveMapType` 
still declare `Option[NamedExpression]` but `None` is now unreachable. 
`resolveUpdate`'s `.getOrElse(value)` is dead code and no longer protects MERGE 
error reporting.
   
   Two reasonable resolutions:
   - **Restore** `enforceFullOutput` as in `0be4cc8d951` — surgical, MERGE 
behavior preserved.
   - **Commit to** the new MERGE behavior — but then it belongs in the PR 
description (this is scope beyond "INSERT WITH SCHEMA EVOLUTION"), the 
AlignMerge/AlignUpdate assertions need to be updated, and ideally the thrown 
`Cannot find data` error should still carry the `addError` cast-failure detail 
so MERGE users don't lose actionable error information.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -1298,4 +1318,712 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       assert(spark.table(t1).schema("id").dataType === IntegerType)
     }
   }
+
+  // 
---------------------------------------------------------------------------
+  // Tests for source with fewer columns/fields than target
+  // 
---------------------------------------------------------------------------
+
+  test("Insert schema evolution: source missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("salary", IntegerType),
+        StructField("dep", StringType)))
+      val data = Seq(Row(0, 100, "sales"))
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position") 
{
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // By position: source col 1 maps to target col 1, source col 2 maps to 
target col 2,
+      // trailing target col 3 is filled with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) 
USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 
'unknown') USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing deeply nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: mixed null and non-null structs with missing 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, 
null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, 
null)))
+    }
+  }
+
+  test("Insert schema evolution: null deeply nested struct with missing field 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+    }
+  }
+
+  test("Insert schema evolution: null struct in array with missing field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Source has "active" (extra) but is missing "salary". Column count is 
the same (3)
+      // but names differ; by-name resolution should add "active" via schema 
evolution
+      // and fill "salary" with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+          byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, salary, dep, active FROM $t1"),
+        Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field 
count is the same
+      // (3) but names differ; by-name resolution should add "c4" via schema 
evolution and fill
+      // "c3" with null.
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c4", DoubleType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+        Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Negative tests: missing columns/fields should fail WITHOUT schema 
evolution
+  // 
---------------------------------------------------------------------------
+
+  test("Insert without evolution: source missing top-level column by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Without explicit DEFAULT on `salary`, missing by-name data only 
errors when null-fill
+      // for missing defaults is disabled; otherwise FILL mode inserts null 
for `salary`.
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        checkError(
+          exception = intercept[AnalysisException] {
+            doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+          },
+          condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          parameters = Map(
+            "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+            "colName" -> "`salary`")
+        )
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position 
fails " +
+      "when null default disabled and column has no explicit DEFAULT") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1,
+                Seq((1, 200)).toDF("id", "salary"))
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`dep`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position fails " +
+      "when null default disabled") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1, sourceData)
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`s`.`c3`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert without evolution: source missing top-level column by position 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+        },
+        condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "tableColumns" -> "`id`, `salary`, `dep`",
+          "dataColumns" -> "`id`, `salary`")
+      )
+    }
+  }
+
+  test("Insert without evolution: source missing nested struct field by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      val ex = intercept[AnalysisException] {
+        doInsertByName(t1, sourceData)
+      }
+      assert(ex.getMessage.contains("Cannot find data"))

Review Comment:
   The other negative tests added in this patch (e.g. lines 1879-1892, 
1905-1918) use `checkError(condition = 
"INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map(...))`, which 
pins `tableName` and `colName` and would catch a future change that drops the 
column path. Suggest aligning here (and at line 1989-1992 in the next test):
   
   ```suggestion
         checkError(
           exception = intercept[AnalysisException] {
             doInsertByName(t1, sourceData)
           },
           condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
             "colName" -> "`s`.`c3`")
         )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to