szehon-ho commented on code in PR #55427:
URL: https://github.com/apache/spark/pull/55427#discussion_r3141608757


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -468,7 +484,7 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
         defaultValueMode)
     } else {
       resolveColumnsByPosition(
-        tableName, fields, toAttributes(expectedType), conf, addError, colPath)
+        tableName, fields, toAttributes(expectedType), conf, addError, 
colPath, fillDefaultValue)

Review Comment:
   Done in the latest commit: `resolveArrayType` and `resolveMapType` now pass 
`fillDefaultValue` into `resolveColumnsByPosition` for the by-position branches 
(matching `resolveStructType`). Added `Insert schema evolution: source missing 
field in struct nested in array/map value by position` tests in 
`InsertIntoTests`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -407,25 +412,36 @@ object TableOutputResolver extends SQLConfHelper with 
Logging {
       }
     }
 
-    inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
+    val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol, 
expectedCol) =>
       val newColPath = colPath :+ expectedCol.name
       (inputCol.dataType, expectedCol.dataType) match {
         case (inputType: StructType, expectedType: StructType) =>
           resolveStructType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue)
         case (inputType: ArrayType, expectedType: ArrayType) =>
           resolveArrayType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue)
         case (inputType: MapType, expectedType: MapType) =>
           resolveMapType(
             tableName, inputCol, inputType, expectedCol, expectedType,
-            byName = false, conf, addError, newColPath, fillDefaultValue = 
false)
+            byName = false, conf, addError, newColPath, fillDefaultValue)
         case _ =>
           checkField(tableName, expectedCol, inputCol, byName = false, conf, 
addError, newColPath)
       }
     }
+
+    val defaults = if (fillDefaultValue) {
+      actualExpectedCols.drop(inputCols.size).flatMap { expectedCol =>
+        getDefaultValueExprOrNullLit(expectedCol, 
conf.useNullsForMissingDefaultColumnValues)
+          .map(expr => Alias(expr, expectedCol.name)())

Review Comment:
   Done: the trailing-fill branch now uses `applyColumnMetadata(expr, 
expectedCol)` like the by-name path.



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -1298,4 +1318,582 @@ trait InsertIntoSchemaEvolutionTests { this: 
InsertIntoTests =>
       assert(spark.table(t1).schema("id").dataType === IntegerType)
     }
   }
+
+  // 
---------------------------------------------------------------------------
+  // Tests for source with fewer columns/fields than target
+  // 
---------------------------------------------------------------------------
+
+  test("Insert schema evolution: source missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val schema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("salary", IntegerType),
+        StructField("dep", StringType)))
+      val data = Seq(Row(0, 100, "sales"))
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column by position") 
{
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // By position: source col 1 maps to target col 1, source col 2 maps to 
target col 2,
+      // trailing target col 3 is filled with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string) 
USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      doInsertWithSchemaEvolution(t1,
+        Seq((1, "engineering")).toDF("id", "dep"),
+        byName = true)
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+    }
+  }
+
+  test("Insert schema evolution: source missing top-level column with DEFAULT 
by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT 
'unknown') USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, 200)).toDF("id", "salary"))
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing nested struct field by 
position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in 
array by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+    }
+  }
+
+  test("Insert schema evolution: source missing deeply nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: source with null struct and missing nested 
field by position") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", IntegerType))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+    }
+  }
+
+  test("Insert schema evolution: mixed null and non-null structs with missing 
field by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2, 
null))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2, 
null)))
+    }
+  }
+
+  test("Insert schema evolution: null deeply nested struct with missing field 
by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", BooleanType)))))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StructType(Seq(
+            StructField("a", IntegerType)))))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+    }
+  }
+
+  test("Insert schema evolution: null struct in array with missing field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("a", ArrayType(StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+    }
+  }
+
+  test("Insert schema evolution: source missing field in struct nested in map 
value by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", BooleanType)))))))
+      sql(s"CREATE TABLE $t1 (id int, " +
+        s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+        targetSchema)
+      doInsert(t1, targetData)
+
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("m", MapType(StringType, StructType(Seq(
+          StructField("c1", IntegerType)))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+        sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT * FROM $t1"),
+        Seq(
+          Row(0, Map("x" -> Row(1, true))),
+          Row(1, Map("y" -> Row(10, null)))))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing top-level column by name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+      doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+      // Source has "active" (extra) but is missing "salary". Column count is 
the same (3)
+      // but names differ; by-name resolution should add "active" via schema 
evolution
+      // and fill "salary" with null.
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1,
+          Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+          byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, salary, dep, active FROM $t1"),
+        Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+    }
+  }
+
+  test("Insert schema evolution: extra and missing nested struct field by 
name") {
+    val t1 = s"${catalogAndNamespace}tbl"
+    withTable(t1) {
+      val targetSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c3", BooleanType))))))
+      sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>) 
USING $v2Format")
+      val targetData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))), 
targetSchema)
+      doInsert(t1, targetData)
+
+      // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field 
count is the same
+      // (3) but names differ; by-name resolution should add "c4" via schema 
evolution and fill
+      // "c3" with null.
+      val sourceSchema = StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("s", StructType(Seq(
+          StructField("c1", IntegerType),
+          StructField("c2", StringType),
+          StructField("c4", DoubleType))))))
+      val sourceData = spark.createDataFrame(
+        spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))), 
sourceSchema)
+      withInsertNestedTypeCoercion {
+        doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+      }
+      checkAnswer(
+        sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+        Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+    }
+  }
+
+  // 
---------------------------------------------------------------------------
+  // Negative tests: missing columns/fields should fail WITHOUT schema 
evolution
+  // 
---------------------------------------------------------------------------
+
+  test("Insert without evolution: source missing top-level column by name 
fails") {

Review Comment:
   Done: switched to `doInsertByName` and assert 
`INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA` for missing `salary`. Wrapped in 
`withSQLConf(USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES -> false)` so FILL 
mode does not silently insert null under the test session defaults (otherwise 
the insert succeeds and no exception is thrown).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3653,9 +3654,16 @@ class Analyzer(
         validateStoreAssignmentPolicy()
         TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
           expected = v2Write.table.output, queryOutput = v2Write.query.output)
+        // With schema evolution, allow the source to have fewer 
columns/fields than the target
+        // and fill missing ones with default values or nulls (RECURSE mode). 
Without schema
+        // evolution, only top-level default column values are filled (FILL 
mode) and any
+        // missing columns will cause a schema enforcement error.

Review Comment:
   Done: updated the comment to your suggested wording (RECURSE vs FILL and 
nested vs top-level).



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -7108,6 +7108,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED =
+    buildConf("spark.sql.insertNestedTypeCoercion.enabled")
+      .internal()
+      .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill 
missing nested " +

Review Comment:
   Done: extended the config doc to mention by-position trailing top-level fill 
as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to