szehon-ho commented on code in PR #55427:
URL: https://github.com/apache/spark/pull/55427#discussion_r3260789703


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -367,7 +369,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         reordered
       }
     } else {
-      Nil
+      cannotFindOutputData(tableName, colPath, expectedCols.map(_.name))

Review Comment:
   Done in ca22b549: reverted the unconditional throws from `2c56adef` and 
restored `enforceFullOutput` as in `0be4cc8` — `true` for INSERT 
(`resolveOutputColumns`) and `false` for MERGE (`resolveUpdate`), so `addError` 
cast-failure detail still surfaces via 
`AssignmentUtils.alignUpdateAssignments`. Verified `AlignMergeAssignmentsSuite` 
and `AlignUpdateAssignmentsSuite` (including the ANSI `named_struct('dn_i', 
'string-value', ...)` cases).



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -1298,4 +1318,712 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       assert(spark.table(t1).schema("id").dataType === IntegerType)
     }
   }
+
+  // 
---------------------------------------------------------------------------
+  // Tests for source with fewer columns/fields than target
+  // 
---------------------------------------------------------------------------
+
+  test("Insert schema evolution: source missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("salary", IntegerType),
+        StructField("dep", StringType)))
+      val data = Seq(Row(0, 100, "sales"))
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position") 
{
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // By position: source col 1 maps to target col 1, source col 2 maps to 
target col 2,
+      // trailing target col 3 is filled with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) 
USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 
'unknown') USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing deeply nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: mixed null and non-null structs with missing 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, 
null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, 
null)))
+    }
+  }
+
+  test("Insert schema evolution: null deeply nested struct with missing field 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+    }
+  }
+
+  test("Insert schema evolution: null struct in array with missing field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Source has "active" (extra) but is missing "salary". Column count is 
the same (3)
+      // but names differ; by-name resolution should add "active" via schema 
evolution
+      // and fill "salary" with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+          byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, salary, dep, active FROM $t1"),
+        Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field 
count is the same
+      // (3) but names differ; by-name resolution should add "c4" via schema 
evolution and fill
+      // "c3" with null.
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c4", DoubleType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+        Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Negative tests: missing columns/fields should fail WITHOUT schema 
evolution
+  // 
---------------------------------------------------------------------------
+
+  test("Insert without evolution: source missing top-level column by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Without explicit DEFAULT on `salary`, missing by-name data only 
errors when null-fill
+      // for missing defaults is disabled; otherwise FILL mode inserts null 
for `salary`.
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        checkError(
+          exception = intercept[AnalysisException] {
+            doInsertByName(t1, Seq((1, "engineering")).toDF("id", "dep"))
+          },
+          condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          parameters = Map(
+            "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+            "colName" -> "`salary`")
+        )
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position 
fails " +
+      "when null default disabled and column has no explicit DEFAULT") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1,
+                Seq((1, 200)).toDF("id", "salary"))
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`dep`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position fails " +
+      "when null default disabled") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"false") {
+        withInsertNestedTypeCoercion {
+          checkError(
+            exception = intercept[AnalysisException] {
+              doInsertWithSchemaEvolution(t1, sourceData)
+            },
+            condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+            parameters = Map(
+              "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+              "colName" -> "`s`.`c3`")
+          )
+        }
+      }
+    }
+  }
+
+  test("Insert without evolution: source missing top-level column by position 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, Seq((1, 200)).toDF("id", "salary"))
+        },
+        condition = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> toSQLId(s"${catalogAndNamespace}tbl"),
+          "tableColumns" -> "`id`, `salary`, `dep`",
+          "dataColumns" -> "`id`, `salary`")
+      )
+    }
+  }
+
+  test("Insert without evolution: source missing nested struct field by name 
fails") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      val ex = intercept[AnalysisException] {
+        doInsertByName(t1, sourceData)
+      }
+      assert(ex.getMessage.contains("Cannot find data"))

Review Comment:
   Done in ca22b549: both tests now use `checkError` with 
`INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA` and pin `tableName` / `` 
`s`.`c3` ``.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to