szehon-ho commented on code in PR #55239:
URL: https://github.com/apache/spark/pull/55239#discussion_r3047681156
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala:
##########
@@ -1435,63 +1435,120 @@ trait
MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests
requiresNestedTypeCoercion = true
)
- test("schema evolution - aliased assignment value should evolve nested
struct fields") {
- val targetSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("info", StructType(Seq(
- StructField("a", IntegerType)
- )))
- ))
- val sourceSchema = StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("info", StructType(Seq(
- StructField("a", IntegerType),
- StructField("b", IntegerType) // new field
- )))
- ))
-
- def readJson(json: Seq[String], schema: StructType): DataFrame =
- spark.createDataFrame(spark.read.schema(schema).json(json.toDS()).rdd,
schema)
-
+ private def withNestedTestData(body: => Unit): Unit = {
withTable(tableNameAsString) {
withTempView("source") {
+ val targetSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("a", IntegerType)
+ )))
+ ))
createTable(CatalogV2Util.structTypeToV2Columns(targetSchema),
Seq.empty)
- readJson(Seq(
- """{ "pk": 1, "info": { "a": 10 } }""",
- """{ "pk": 2, "info": { "a": 20 } }"""
- ), targetSchema).writeTo(tableNameAsString).append()
+ val targetDf =
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(1, Row(10)),
+ Row(2, Row(20))
+ )), targetSchema)
+ targetDf.writeTo(tableNameAsString).append()
+
+ val sourceSchema = StructType(Seq(
+ StructField("pk", IntegerType, nullable = false),
+ StructField("info", StructType(Seq(
+ StructField("a", IntegerType),
+ StructField("b", IntegerType)
+ )))
+ ))
+ val sourceDf =
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+ Row(2, Row(30, 50)),
+ Row(3, Row(40, 75))
+ )), sourceSchema)
+ sourceDf.createOrReplaceTempView("source")
- readJson(Seq(
- """{ "pk": 2, "info": { "a": 30, "b": 50 } }""",
- """{ "pk": 3, "info": { "a": 40, "b": 75 } }"""
- ), sourceSchema).createOrReplaceTempView("source")
+ body
+ }
+ }
+ }
- // Use DataFrame merge API and alias the source. The alias shouldn't
prevent schema
- // evolution.
+ test("schema evolution - top-level aliased struct column is not evolved") {
+ withNestedTestData {
+ val ex = intercept[AnalysisException] {
spark.table("source")
.mergeInto(tableNameAsString,
col(s"$tableNameAsString.pk") === col("source.pk"))
.whenMatched().update(Map("info" -> col("source.info").as("info")))
- .whenNotMatched().insert(Map(
- "pk" -> col("source.pk").as("pk"),
- "info" -> col("source.info").as("info")))
.withSchemaEvolution()
.merge()
+ }
+ assert(ex.getCondition ===
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ }
+ }
- val result = sql(s"SELECT * FROM $tableNameAsString")
- assert(result.schema === StructType(Seq(
- StructField("pk", IntegerType, nullable = false),
- StructField("info", StructType(Seq(
- StructField("a", IntegerType),
- // Field `b` is correctly added during schema evolution.
- StructField("b", IntegerType)
- )))
- )))
- checkAnswer(result, Seq(
- Row(1, Row(10, null)),
- Row(2, Row(30, 50)),
- Row(3, Row(40, 75))))
+ // Same as above with a mismatched alias name.
+ test("schema evolution - top-level aliased struct column with mismatched
name is not evolved") {
+ withNestedTestData {
+ val ex = intercept[AnalysisException] {
+ spark.table("source")
+ .mergeInto(tableNameAsString,
+ col(s"$tableNameAsString.pk") === col("source.pk"))
+ .whenMatched().update(Map(
+ "info" -> col("source.info").as("something_else")))
+ .withSchemaEvolution()
+ .merge()
+ }
+ assert(ex.getCondition ===
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+ }
+ }
+
+ test("schema evolution - nested field through aliased struct column is not
evolved") {
+ withNestedTestData {
+ val ex = intercept[AnalysisException] {
+ spark.table("source")
+ .mergeInto(tableNameAsString,
+ col(s"$tableNameAsString.pk") === col("source.pk"))
+ .whenMatched().update(Map(
+ "info.b" -> col("source.info").as("x").getField("b")))
+ .withSchemaEvolution()
+ .merge()
}
+ assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ }
+ }
+
+ test("schema evolution - complex expression value is not considered for
evolution") {
+ withNestedTestData {
+ val ex = intercept[AnalysisException] {
+ spark.table("source")
+ .mergeInto(tableNameAsString,
+ col(s"$tableNameAsString.pk") === col("source.pk"))
+ .whenMatched().update(Map(
+ "info.b" -> (col("source.info.b") + 1).as("b")))
+ .withSchemaEvolution()
+ .merge()
+ }
+ assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+ }
+ }
+
+ test("schema evolution - direct nested field assignment triggers evolution")
{
Review Comment:
i think this test is already covered
MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala
Lines 72-119
testNestedStructsEvolution("source has extra struct fields -" +
"only evolve referenced struct field
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]