This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f1efc95  [SPARK-38412][SS] Fix the swapped sequence of from and to in 
StateSchemaCompatibilityChecker
f1efc95 is described below

commit f1efc955940176c9fa84cdf7b2c71563c5df47d2
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Wed Mar 9 11:03:57 2022 +0900

    [SPARK-38412][SS] Fix the swapped sequence of from and to in 
StateSchemaCompatibilityChecker
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the StateSchemaCompatibilityChecker which mistakenly swapped 
`from` (should be provided schema) and `to` (should be existing schema).
    
    ### Why are the changes needed?
    
    The bug mistakenly allows the case where it should not be allowed, and 
disallows the case where it should be allowed.
    
    That allows nullable column to be stored into non-nullable column, which 
should be prohibited. This is less likely making runtime problem since state 
schema is conceptual one and row can be stored even not respecting the state 
schema.
    
    The opposite case is worse, that disallows non-nullable column to be stored 
into nullable column, which should be allowed. Spark fails the query for this 
case.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, after the fix, storing non-nullable column into nullable column for 
state will be allowed, which should have been allowed.
    
    ### How was this patch tested?
    
    Modified UTs.
    
    Closes #35731 from HeartSaVioR/SPARK-38412.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 43c7824bba40ebfb64dcd50d8d0e84b5a4d3c8c7)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../state/StateSchemaCompatibilityChecker.scala    |  2 +-
 .../StateSchemaCompatibilityCheckerSuite.scala     | 51 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 20625e1..0c8cabb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -72,7 +72,7 @@ class StateSchemaCompatibilityChecker(
   }
 
   private def schemasCompatible(storedSchema: StructType, schema: StructType): 
Boolean =
-    DataType.equalsIgnoreNameAndCompatibleNullability(storedSchema, schema)
+    DataType.equalsIgnoreNameAndCompatibleNullability(schema, storedSchema)
 
   // Visible for testing
   private[sql] def readSchemaFile(): (StructType, StructType) = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index a9cc90c..1539341 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -63,6 +63,8 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
   private val valueSchema65535Bytes = new StructType()
     .add(StructField("v" * (65535 - 87), IntegerType, nullable = true))
 
+  // Checks on adding/removing (nested) field.
+
   test("adding field to key should fail") {
     val fieldAddedKeySchema = keySchema.add(StructField("newKey", IntegerType))
     verifyException(keySchema, valueSchema, fieldAddedKeySchema, valueSchema)
@@ -107,6 +109,8 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     verifyException(keySchema, valueSchema, keySchema, newValueSchema)
   }
 
+  // Checks on changing type of (nested) field.
+
   test("changing the type of field in key should fail") {
     val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = 
TimestampType)))
     verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema)
@@ -129,28 +133,59 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     verifyException(keySchema, valueSchema, keySchema, newValueSchema)
   }
 
-  test("changing the nullability of nullable to non-nullable in key should 
fail") {
+  // Checks on changing nullability of (nested) field.
+  // Note that these tests have different format of the test name compared to 
others, since it was
+  // misleading to understand the assignment as the opposite way.
+
+  test("storing non-nullable column into nullable column in key should be 
allowed") {
     val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = 
false)))
-    verifyException(keySchema, valueSchema, nonNullChangedKeySchema, 
valueSchema)
+    verifySuccess(keySchema, valueSchema, nonNullChangedKeySchema, valueSchema)
   }
 
-  test("changing the nullability of nullable to non-nullable in value should 
fail") {
+  test("storing non-nullable column into nullable column in value schema 
should be allowed") {
     val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable 
= false)))
-    verifyException(keySchema, valueSchema, keySchema, 
nonNullChangedValueSchema)
+    verifySuccess(keySchema, valueSchema, keySchema, nonNullChangedValueSchema)
   }
 
-  test("changing the nullability of nullable to nonnullable in nested field in 
key should fail") {
+  test("storing non-nullable into nullable in nested field in key should be 
allowed") {
     val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable 
= false)))
     val newKeySchema = 
applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema)
-    verifyException(keySchema, valueSchema, newKeySchema, valueSchema)
+    verifySuccess(keySchema, valueSchema, newKeySchema, valueSchema)
   }
 
-  test("changing the nullability of nullable to nonnullable in nested field in 
value should fail") {
+  test("storing non-nullable into nullable in nested field in value should be 
allowed") {
     val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable 
= false)))
     val newValueSchema = 
applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema)
-    verifyException(keySchema, valueSchema, keySchema, newValueSchema)
+    verifySuccess(keySchema, valueSchema, keySchema, newValueSchema)
+  }
+
+  test("storing nullable column into non-nullable column in key should fail") {
+    val nonNullChangedKeySchema = StructType(keySchema.map(_.copy(nullable = 
false)))
+    verifyException(nonNullChangedKeySchema, valueSchema, keySchema, 
valueSchema)
+  }
+
+  test("storing nullable column into non-nullable column in value schema 
should fail") {
+    val nonNullChangedValueSchema = StructType(valueSchema.map(_.copy(nullable 
= false)))
+    verifyException(keySchema, nonNullChangedValueSchema, keySchema, 
valueSchema)
+  }
+
+  test("storing nullable column into non-nullable column in nested field in 
key should fail") {
+    val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable 
= false)))
+    val newKeySchema = 
applyNewSchemaToNestedFieldInKey(typeChangedNestedSchema)
+    verifyException(newKeySchema, valueSchema, keySchema, valueSchema)
   }
 
+  test("storing nullable column into non-nullable column in nested field in 
value should fail") {
+    val typeChangedNestedSchema = StructType(structSchema.map(_.copy(nullable 
= false)))
+    val newValueSchema = 
applyNewSchemaToNestedFieldInValue(typeChangedNestedSchema)
+    verifyException(keySchema, newValueSchema, keySchema, valueSchema)
+  }
+
+  // Checks on changing name of (nested) field.
+  // Changing the name is allowed since it may be possible Spark can make 
relevant changes from
+  // operators/functions by chance. This opens a risk that end users swap two 
fields having same
+  // data type, but there is no way to address both.
+
   test("changing the name of field in key should be allowed") {
     val newName: StructField => StructField = f => f.copy(name = f.name + 
"_new")
     val fieldNameChangedKeySchema = StructType(keySchema.map(newName))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to