cloud-fan commented on code in PR #55427:
URL: https://github.com/apache/spark/pull/55427#discussion_r3136815437
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -468,7 +484,7 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
defaultValueMode)
} else {
resolveColumnsByPosition(
- tableName, fields, toAttributes(expectedType), conf, addError, colPath)
+ tableName, fields, toAttributes(expectedType), conf, addError,
colPath, fillDefaultValue)
Review Comment:
`fillDefaultValue` is correctly propagated here, but the sibling paths in
`resolveArrayType` (line 522) and `resolveMapType` (lines 559, 571) still call
`resolveColumnsByPosition` without the flag. Consequence: `INSERT BY POSITION
... WITH SCHEMA EVOLUTION` into a column typed `array<struct<...>>` or `map<_,
struct<...>>` whose nested struct has missing fields still errors — while the
BY NAME counterpart succeeds. The PR's test matrix only covers by-name for
array / map, so this gap isn't exercised.
Either propagate `fillDefaultValue` in those two methods (consistent with
`resolveStructType`), or narrow the enum doc / PR description to say array /
map nested fill is by-name only — and skip the `fillDefaultValue` plumbing into
those methods entirely. If the former, please add by-position test coverage for
array-of-struct and map-of-struct symmetric to the existing by-name tests.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3653,9 +3654,16 @@ class Analyzer(
validateStoreAssignmentPolicy()
TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
expected = v2Write.table.output, queryOutput = v2Write.query.output)
+ // With schema evolution, allow the source to have fewer
columns/fields than the target
+ // and fill missing ones with default values or nulls (RECURSE mode).
Without schema
+ // evolution, only top-level default column values are filled (FILL
mode) and any
+ // missing columns will cause a schema enforcement error.
Review Comment:
The phrase "only top-level default column values are filled (FILL mode) and
any missing columns will cause a schema enforcement error" reads as
self-contradictory: FILL mode *does* fill missing top-level columns (with
explicit DEFAULT values, or null when
`spark.sql.defaultColumn.useNullsForMissingDefaultValues` is true). The
intended contrast is with nested fields, not top-level. Suggest:
```suggestion
// With schema evolution + coercion flag, missing top-level columns
AND missing nested
// struct fields are filled with defaults/null (RECURSE mode).
Otherwise, only missing
// top-level columns are filled via FILL mode; missing nested struct
fields still cause
// schema enforcement errors.
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -407,25 +412,36 @@ object TableOutputResolver extends SQLConfHelper with
Logging {
}
}
- inputCols.zip(actualExpectedCols).flatMap { case (inputCol, expectedCol) =>
+ val matched = inputCols.zip(actualExpectedCols).flatMap { case (inputCol,
expectedCol) =>
val newColPath = colPath :+ expectedCol.name
(inputCol.dataType, expectedCol.dataType) match {
case (inputType: StructType, expectedType: StructType) =>
resolveStructType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue)
case (inputType: ArrayType, expectedType: ArrayType) =>
resolveArrayType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue)
case (inputType: MapType, expectedType: MapType) =>
resolveMapType(
tableName, inputCol, inputType, expectedCol, expectedType,
- byName = false, conf, addError, newColPath, fillDefaultValue =
false)
+ byName = false, conf, addError, newColPath, fillDefaultValue)
case _ =>
checkField(tableName, expectedCol, inputCol, byName = false, conf,
addError, newColPath)
}
}
+
+ val defaults = if (fillDefaultValue) {
+ actualExpectedCols.drop(inputCols.size).flatMap { expectedCol =>
+ getDefaultValueExprOrNullLit(expectedCol,
conf.useNullsForMissingDefaultColumnValues)
+ .map(expr => Alias(expr, expectedCol.name)())
Review Comment:
The by-name path at line 327 routes the default-value expression through
`applyColumnMetadata(defaultExpr.get, expectedCol)`, which strips source
metadata and pins the target column's required metadata — including
`CharVarcharUtils.cleanMetadata` and the write-side metadata guarantees
documented on `applyColumnMetadata`. This trailing-fill branch just does
`Alias(expr, expectedCol.name)()`, so for a by-position insert where the
trailing target column is char / varchar, the filled column won't carry the
target-column metadata the way the by-name filled column does. Suggest aligning
with the by-name path:
```suggestion
getDefaultValueExprOrNullLit(expectedCol,
conf.useNullsForMissingDefaultColumnValues)
.map(expr => applyColumnMetadata(expr, expectedCol))
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -7108,6 +7108,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val INSERT_INTO_NESTED_TYPE_COERCION_ENABLED =
+ buildConf("spark.sql.insertNestedTypeCoercion.enabled")
+ .internal()
+ .doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill
missing nested " +
Review Comment:
The doc describes only the nested-struct-field effect, but enabling this
flag also loosens by-position trailing top-level fill (see the "Missing
top-level column (by position)" row in the PR description, which moves from
"error" to "fill trailing"). Users skimming the config doc might reasonably
expect the flag's scope to be limited to nested types. Suggest extending, e.g.:
```suggestion
.doc("If enabled, allow INSERT INTO WITH SCHEMA EVOLUTION to fill
missing nested " +
"struct fields with null when the source has fewer nested fields
than the target " +
"table. Also relaxes by-position column-count enforcement so
trailing missing " +
"top-level columns are filled with their default value (or null).
This is " +
"experimental and the semantics may change.")
```
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala:
##########
@@ -1298,4 +1318,582 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
assert(spark.table(t1).schema("id").dataType === IntegerType)
}
}
+
+ //
---------------------------------------------------------------------------
+ // Tests for source with fewer columns/fields than target
+ //
---------------------------------------------------------------------------
+
+ test("Insert schema evolution: source missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val schema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("salary", IntegerType),
+ StructField("dep", StringType)))
+ val data = Seq(Row(0, 100, "sales"))
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, spark.createDataFrame(spark.sparkContext.parallelize(data),
schema))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, null, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column by position")
{
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // By position: source col 1 maps to target col 1, source col 2 maps to
target col 2,
+ // trailing target col 3 is filled with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, null)))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int DEFAULT 200, dep string)
USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering")).toDF("id", "dep"),
+ byName = true)
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "engineering")))
+ }
+ }
+
+ test("Insert schema evolution: source missing top-level column with DEFAULT
by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string DEFAULT
'unknown') USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, 200)).toDF("id", "salary"))
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, 100, "sales"), Row(1, 200, "unknown")))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing nested struct field by
position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in
array by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:string,c3:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, "a", true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10, "b"))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, "a", true))), Row(1, Seq(Row(10, "b", null)))))
+ }
+ }
+
+ test("Insert schema evolution: source missing deeply nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, Row(30))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, Row(30, null)))))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: source with null struct and missing nested
field by position") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", IntegerType))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:string,c3:int>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", 10)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", 10)), Row(1, null)))
+ }
+ }
+
+ test("Insert schema evolution: mixed null and non-null structs with missing
field by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b")), Row(2,
null))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, "a", true)), Row(1, Row(10, "b", null)), Row(2,
null)))
+ }
+ }
+
+ test("Insert schema evolution: null deeply nested struct with missing field
by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", BooleanType)))))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"s struct<c1:int,c2:struct<a:int,b:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, Row(10, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StructType(Seq(
+ StructField("a", IntegerType)))))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(20, null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Row(1, Row(10, true))), Row(1, Row(20, null))))
+ }
+ }
+
+ test("Insert schema evolution: null struct in array with missing field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"a array<struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Seq(Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("a", ArrayType(StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Seq(Row(10), null)))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(Row(0, Seq(Row(1, true))), Row(1, Seq(Row(10, null), null))))
+ }
+ }
+
+ test("Insert schema evolution: source missing field in struct nested in map
value by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", BooleanType)))))))
+ sql(s"CREATE TABLE $t1 (id int, " +
+ s"m map<string, struct<c1:int,c2:boolean>>) USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Map("x" -> Row(1, true))))),
+ targetSchema)
+ doInsert(t1, targetData)
+
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("m", MapType(StringType, StructType(Seq(
+ StructField("c1", IntegerType)))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Map("y" -> Row(10))))),
+ sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT * FROM $t1"),
+ Seq(
+ Row(0, Map("x" -> Row(1, true))),
+ Row(1, Map("y" -> Row(10, null)))))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing top-level column by name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id int, salary int, dep string) USING $v2Format")
+ doInsert(t1, Seq((0, 100, "sales")).toDF("id", "salary", "dep"))
+ // Source has "active" (extra) but is missing "salary". Column count is
the same (3)
+ // but names differ; by-name resolution should add "active" via schema
evolution
+ // and fill "salary" with null.
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1,
+ Seq((1, "engineering", true)).toDF("id", "dep", "active"),
+ byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, salary, dep, active FROM $t1"),
+ Seq(Row(0, 100, "sales", null), Row(1, null, "engineering", true)))
+ }
+ }
+
+ test("Insert schema evolution: extra and missing nested struct field by
name") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTable(t1) {
+ val targetSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c3", BooleanType))))))
+ sql(s"CREATE TABLE $t1 (id int, s struct<c1:int,c2:string,c3:boolean>)
USING $v2Format")
+ val targetData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(0, Row(1, "a", true)))),
targetSchema)
+ doInsert(t1, targetData)
+
+ // Source struct has "c1", "c2", "c4" (extra) but is missing "c3". Field
count is the same
+ // (3) but names differ; by-name resolution should add "c4" via schema
evolution and fill
+ // "c3" with null.
+ val sourceSchema = StructType(Seq(
+ StructField("id", IntegerType),
+ StructField("s", StructType(Seq(
+ StructField("c1", IntegerType),
+ StructField("c2", StringType),
+ StructField("c4", DoubleType))))))
+ val sourceData = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, Row(10, "b", 3.14)))),
sourceSchema)
+ withInsertNestedTypeCoercion {
+ doInsertWithSchemaEvolution(t1, sourceData, byName = true)
+ }
+ checkAnswer(
+ sql(s"SELECT id, s.c1, s.c2, s.c3, s.c4 FROM $t1"),
+ Seq(Row(0, 1, "a", true, null), Row(1, 10, "b", null, 3.14)))
+ }
+ }
+
+ //
---------------------------------------------------------------------------
+ // Negative tests: missing columns/fields should fail WITHOUT schema
evolution
+ //
---------------------------------------------------------------------------
+
+ test("Insert without evolution: source missing top-level column by name
fails") {
Review Comment:
Test name says "by name" but the call uses `doInsert` (by-position). The
asserted error `INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS` is only
emitted on the by-position path — the by-name path would instead hit
`incompatibleDataToTableCannotFindDataError` (see the nested-field counterpart
a few tests below that uses `doInsertByName`). For symmetry with the positive
test that uses `byName = true`, please switch to `doInsertByName` and update
the expected error accordingly (or rename this test to "… by position fails",
but then it duplicates the existing by-position test at line 1791).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]