szehon-ho commented on code in PR #55239:
URL: https://github.com/apache/spark/pull/55239#discussion_r3047681156


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala:
##########
@@ -1435,63 +1435,120 @@ trait 
MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests
     requiresNestedTypeCoercion = true
   )
 
-  test("schema evolution - aliased assignment value should evolve nested 
struct fields") {
-    val targetSchema = StructType(Seq(
-      StructField("pk", IntegerType, nullable = false),
-      StructField("info", StructType(Seq(
-        StructField("a", IntegerType)
-      )))
-    ))
-    val sourceSchema = StructType(Seq(
-      StructField("pk", IntegerType, nullable = false),
-      StructField("info", StructType(Seq(
-        StructField("a", IntegerType),
-        StructField("b", IntegerType) // new field
-      )))
-    ))
-
-    def readJson(json: Seq[String], schema: StructType): DataFrame =
-      spark.createDataFrame(spark.read.schema(schema).json(json.toDS()).rdd, 
schema)
-
+  private def withNestedTestData(body: => Unit): Unit = {
     withTable(tableNameAsString) {
       withTempView("source") {
+        val targetSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("info", StructType(Seq(
+            StructField("a", IntegerType)
+          )))
+        ))
         createTable(CatalogV2Util.structTypeToV2Columns(targetSchema), 
Seq.empty)
-        readJson(Seq(
-          """{ "pk": 1, "info": { "a": 10 } }""",
-          """{ "pk": 2, "info": { "a": 20 } }"""
-        ), targetSchema).writeTo(tableNameAsString).append()
+        val targetDf = 
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+          Row(1, Row(10)),
+          Row(2, Row(20))
+        )), targetSchema)
+        targetDf.writeTo(tableNameAsString).append()
+
+        val sourceSchema = StructType(Seq(
+          StructField("pk", IntegerType, nullable = false),
+          StructField("info", StructType(Seq(
+            StructField("a", IntegerType),
+            StructField("b", IntegerType)
+          )))
+        ))
+        val sourceDf = 
spark.createDataFrame(spark.sparkContext.parallelize(Seq(
+          Row(2, Row(30, 50)),
+          Row(3, Row(40, 75))
+        )), sourceSchema)
+        sourceDf.createOrReplaceTempView("source")
 
-        readJson(Seq(
-          """{ "pk": 2, "info": { "a": 30, "b": 50 } }""",
-          """{ "pk": 3, "info": { "a": 40, "b": 75 } }"""
-        ), sourceSchema).createOrReplaceTempView("source")
+        body
+      }
+    }
+  }
 
-        // Use DataFrame merge API and alias the source. The alias shouldn't 
prevent schema
-        // evolution.
+  test("schema evolution - top-level aliased struct column is not evolved") {
+    withNestedTestData {
+      val ex = intercept[AnalysisException] {
         spark.table("source")
           .mergeInto(tableNameAsString,
             col(s"$tableNameAsString.pk") === col("source.pk"))
           .whenMatched().update(Map("info" -> col("source.info").as("info")))
-          .whenNotMatched().insert(Map(
-            "pk" -> col("source.pk").as("pk"),
-            "info" -> col("source.info").as("info")))
           .withSchemaEvolution()
           .merge()
+      }
+      assert(ex.getCondition === 
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+    }
+  }
 
-        val result = sql(s"SELECT * FROM $tableNameAsString")
-        assert(result.schema === StructType(Seq(
-          StructField("pk", IntegerType, nullable = false),
-          StructField("info", StructType(Seq(
-            StructField("a", IntegerType),
-            // Field `b` is correctly added during schema evolution.
-            StructField("b", IntegerType)
-          )))
-        )))
-        checkAnswer(result, Seq(
-          Row(1, Row(10, null)),
-          Row(2, Row(30, 50)),
-          Row(3, Row(40, 75))))
+  // Same as above with a mismatched alias name.
+  test("schema evolution - top-level aliased struct column with mismatched 
name is not evolved") {
+    withNestedTestData {
+      val ex = intercept[AnalysisException] {
+        spark.table("source")
+          .mergeInto(tableNameAsString,
+            col(s"$tableNameAsString.pk") === col("source.pk"))
+          .whenMatched().update(Map(
+            "info" -> col("source.info").as("something_else")))
+          .withSchemaEvolution()
+          .merge()
+      }
+      assert(ex.getCondition === 
"INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS")
+    }
+  }
+
+  test("schema evolution - nested field through aliased struct column is not 
evolved") {
+    withNestedTestData {
+      val ex = intercept[AnalysisException] {
+        spark.table("source")
+          .mergeInto(tableNameAsString,
+            col(s"$tableNameAsString.pk") === col("source.pk"))
+          .whenMatched().update(Map(
+            "info.b" -> col("source.info").as("x").getField("b")))
+          .withSchemaEvolution()
+          .merge()
       }
+      assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+    }
+  }
+
+  test("schema evolution - complex expression value is not considered for 
evolution") {
+    withNestedTestData {
+      val ex = intercept[AnalysisException] {
+        spark.table("source")
+          .mergeInto(tableNameAsString,
+            col(s"$tableNameAsString.pk") === col("source.pk"))
+          .whenMatched().update(Map(
+            "info.b" -> (col("source.info.b") + 1).as("b")))
+          .withSchemaEvolution()
+          .merge()
+      }
+      assert(ex.getCondition === "UNRESOLVED_COLUMN.WITH_SUGGESTION")
+    }
+  }
+
+  test("schema evolution - direct nested field assignment triggers evolution") 
{

Review Comment:
   i think this test is already covered
   
   
   MergeIntoSchemaEvolutionTypeWideningAndExtraFieldTests.scala
   Lines 72-119
   
     testNestedStructsEvolution("source has extra struct fields -" +
       "only evolve referenced struct field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to