This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe536033bdd [SPARK-39650][SS] Fix incorrect value schema in streaming 
deduplication with backward compatibility
fe536033bdd is described below

commit fe536033bdd00d921b3c86af329246ca55a4f46a
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Sat Jul 2 22:46:03 2022 +0900

    [SPARK-39650][SS] Fix incorrect value schema in streaming deduplication 
with backward compatibility
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to fix the incorrect value schema in streaming 
deduplication. It stores the empty row having a single column with null (using 
NullType), but the value schema is specified as all columns, which leads 
incorrect behavior from state store schema compatibility checker.
    
    This PR proposes to set the schema of value as 
`StructType(Array(StructField("__dummy__", NullType)))` to fit with the empty 
row. With this change, the streaming queries creating the checkpoint after this 
fix would work smoothly.
    
    To not break the existing streaming queries having incorrect value schema, 
this PR proposes to disable the check for value schema on streaming 
deduplication. Disabling the value check was there for the format validation 
(we have two different checkers for state store), but it has been missing for 
state store schema compatibility check. To avoid adding more config, this PR 
leverages the existing config "format validation" is using.
    
    ### Why are the changes needed?
    
    This is a bug fix. Suppose the streaming query below:
    
    ```
    # df has the columns `a`, `b`, `c`
    val df = spark.readStream.format("...").load()
    val query = df.dropDuplicate("a").writeStream.format("...").start()
    ```
    
    while the query is running, df can produce a different set of columns (e.g. 
`a`, `b`, `c`, `d`) from the same source due to schema evolution. Since we only 
deduplicate the rows with column `a`, the change of schema should not matter 
for streaming deduplication, but state store schema checker throws error saying 
"value schema is not compatible" before this fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is basically a bug fix which end users wouldn't notice unless they 
encountered a bug.
    
    ### How was this patch tested?
    
    New tests.
    
    Closes #37041 from HeartSaVioR/SPARK-39650.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../state/StateSchemaCompatibilityChecker.scala    |  26 ++++++--
 .../sql/execution/streaming/state/StateStore.scala |   7 ++-
 .../execution/streaming/state/StateStoreConf.scala |   7 ++-
 .../execution/streaming/statefulOperators.scala    |   4 +-
 .../commits/.0.crc                                 | Bin 0 -> 12 bytes
 .../commits/.1.crc                                 | Bin 0 -> 12 bytes
 .../commits/0                                      |   2 +
 .../commits/1                                      |   2 +
 .../metadata                                       |   1 +
 .../offsets/.0.crc                                 | Bin 0 -> 16 bytes
 .../offsets/.1.crc                                 | Bin 0 -> 16 bytes
 .../offsets/0                                      |   3 +
 .../offsets/1                                      |   3 +
 .../state/0/0/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/0/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/0/1.delta                              | Bin 0 -> 77 bytes
 .../state/0/0/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/0/_metadata/.schema.crc                | Bin 0 -> 12 bytes
 .../state/0/0/_metadata/schema                     | Bin 0 -> 254 bytes
 .../state/0/1/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/1/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/1/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/1/2.delta                              | Bin 0 -> 77 bytes
 .../state/0/2/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/2/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/2/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/2/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/3/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/3/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/3/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/3/2.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/.1.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/4/.2.delta.crc                         | Bin 0 -> 12 bytes
 .../state/0/4/1.delta                              | Bin 0 -> 46 bytes
 .../state/0/4/2.delta                              | Bin 0 -> 46 bytes
 .../StateSchemaCompatibilityCheckerSuite.scala     |  49 +++++++++++----
 .../streaming/StreamingDeduplicationSuite.scala    |  70 +++++++++++++++++++++
 37 files changed, 152 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
index 0c8cabb75ed..80384f8cb3b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala
@@ -41,20 +41,34 @@ class StateSchemaCompatibilityChecker(
   fm.mkdirs(schemaFileLocation.getParent)
 
   def check(keySchema: StructType, valueSchema: StructType): Unit = {
+    check(keySchema, valueSchema, ignoreValueSchema = false)
+  }
+
+  def check(keySchema: StructType, valueSchema: StructType, ignoreValueSchema: 
Boolean): Unit = {
     if (fm.exists(schemaFileLocation)) {
       logDebug(s"Schema file for provider $providerId exists. Comparing with 
provided schema.")
       val (storedKeySchema, storedValueSchema) = readSchemaFile()
-      if (storedKeySchema.equals(keySchema) && 
storedValueSchema.equals(valueSchema)) {
+      if (storedKeySchema.equals(keySchema) &&
+        (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
         // schema is exactly same
       } else if (!schemasCompatible(storedKeySchema, keySchema) ||
-        !schemasCompatible(storedValueSchema, valueSchema)) {
+        (!ignoreValueSchema && !schemasCompatible(storedValueSchema, 
valueSchema))) {
+        val errorMsgForKeySchema = s"- Provided key schema: $keySchema\n" +
+          s"- Existing key schema: $storedKeySchema\n"
+
+        // If it is requested to skip checking the value schema, we also don't 
expose the value
+        // schema information to the error message.
+        val errorMsgForValueSchema = if (!ignoreValueSchema) {
+          s"- Provided value schema: $valueSchema\n" +
+            s"- Existing value schema: $storedValueSchema\n"
+        } else {
+          ""
+        }
         val errorMsg = "Provided schema doesn't match to the schema for 
existing state! " +
           "Please note that Spark allow difference of field name: check count 
of fields " +
           "and data type of each field.\n" +
-          s"- Provided key schema: $keySchema\n" +
-          s"- Provided value schema: $valueSchema\n" +
-          s"- Existing key schema: $storedKeySchema\n" +
-          s"- Existing value schema: $storedValueSchema\n" +
+          errorMsgForKeySchema +
+          errorMsgForValueSchema +
           s"If you want to force running query without schema validation, 
please set " +
           s"${SQLConf.STATE_SCHEMA_CHECK_ENABLED.key} to false.\n" +
           "Please note running query with incompatible schema could cause 
indeterministic" +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 64c9cd11f99..203cb2a2875 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -515,7 +515,12 @@ object StateStore extends Logging {
           val checker = new StateSchemaCompatibilityChecker(storeProviderId, 
hadoopConf)
           // regardless of configuration, we check compatibility to at least 
write schema file
           // if necessary
-          val ret = Try(checker.check(keySchema, 
valueSchema)).toEither.fold(Some(_), _ => None)
+          // if the format validation for value schema is disabled, we also 
disable the schema
+          // compatibility checker for value schema as well.
+          val ret = Try(
+            checker.check(keySchema, valueSchema,
+              ignoreValueSchema = !storeConf.formatValidationCheckValue)
+          ).toEither.fold(Some(_), _ => None)
           if (storeConf.stateSchemaCheckEnabled) {
             ret
           } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index 529db2609cd..66bb37d7a57 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -48,7 +48,12 @@ class StateStoreConf(
   /** Whether validate the underlying format or not. */
   val formatValidationEnabled: Boolean = 
sqlConf.stateStoreFormatValidationEnabled
 
-  /** Whether validate the value format when the format invalidation enabled. 
*/
+  /**
+   * Whether to validate the value side. This config is applied to both 
validators as below:
+   *
+   * - whether to validate the value format when the format validation is 
enabled.
+   * - whether to validate the value schema when the state schema check is 
enabled.
+   */
   val formatValidationCheckValue: Boolean =
     
extraOptions.getOrElse(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG, 
"true") == "true"
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index e2a05986442..2b8fc651561 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -783,13 +783,15 @@ case class StreamingDeduplicateExec(
       keyExpressions, getStateInfo, conf) :: Nil
   }
 
+  private val schemaForEmptyRow: StructType = 
StructType(Array(StructField("__dummy__", NullType)))
+
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
     child.execute().mapPartitionsWithStateStore(
       getStateInfo,
       keyExpressions.toStructType,
-      child.output.toStructType,
+      schemaForEmptyRow,
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator),
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc
new file mode 100644
index 00000000000..1aee7033161
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.0.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc
new file mode 100644
index 00000000000..1aee7033161
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/.1.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0
new file mode 100644
index 00000000000..9c1e3021c3e
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/0
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1
new file mode 100644
index 00000000000..9c1e3021c3e
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/commits/1
@@ -0,0 +1,2 @@
+v1
+{"nextBatchWatermarkMs":0}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata
new file mode 100644
index 00000000000..78bd74a789f
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/metadata
@@ -0,0 +1 @@
+{"id":"33e8de33-00b8-4b60-8246-df2f433257ff"}
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc
new file mode 100644
index 00000000000..726c678bc6a
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.0.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc
new file mode 100644
index 00000000000..790f681f1aa
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/.1.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0
new file mode 100644
index 00000000000..443c6824358
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/0
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1656644489789,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
 [...]
+0
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1
new file mode 100644
index 00000000000..67b42175563
--- /dev/null
+++ 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/offsets/1
@@ -0,0 +1,3 @@
+v1
+{"batchWatermarkMs":0,"batchTimestampMs":1656644492462,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.stateStore.rocksdb.formatVersion":"5","spark.sql.streaming.statefulOperator.useStrictDistribution":"true","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","s
 [...]
+1
\ No newline at end of file
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc
new file mode 100644
index 00000000000..1992982c58f
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta
new file mode 100644
index 00000000000..fec40e83a54
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc
new file mode 100644
index 00000000000..022717c6b50
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/.schema.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema
new file mode 100644
index 00000000000..f132f9601b7
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/0/_metadata/schema
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc
new file mode 100644
index 00000000000..d18b77b93af
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta
new file mode 100644
index 00000000000..fcbf8df80f5
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/1/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/2/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/3/2.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.1.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc
new file mode 100644
index 00000000000..cf1d68e2ace
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/.2.delta.crc
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/1.delta
 differ
diff --git 
a/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta
 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta
new file mode 100644
index 00000000000..63529780518
Binary files /dev/null and 
b/sql/core/src/test/resources/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/state/0/4/2.delta
 differ
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
index 15393413593..7ba18a81404 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala
@@ -231,6 +231,16 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema))
   }
 
+  test("SPARK-39650: ignore value schema on compatibility check") {
+    val typeChangedValueSchema = StructType(valueSchema.map(_.copy(dataType = 
TimestampType)))
+    verifySuccess(keySchema, valueSchema, keySchema, typeChangedValueSchema,
+      ignoreValueSchema = true)
+
+    val typeChangedKeySchema = StructType(keySchema.map(_.copy(dataType = 
TimestampType)))
+    verifyException(keySchema, valueSchema, typeChangedKeySchema, valueSchema,
+      ignoreValueSchema = true)
+  }
+
   private def applyNewSchemaToNestedFieldInKey(newNestedSchema: StructType): 
StructType = {
     applyNewSchemaToNestedField(keySchema, newNestedSchema, "key3")
   }
@@ -257,44 +267,57 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
       dir: String,
       queryId: UUID,
       newKeySchema: StructType,
-      newValueSchema: StructType): Unit = {
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean): Unit = {
     // in fact, Spark doesn't support online state schema change, so need to 
check
     // schema only once for each running of JVM
     val providerId = StateStoreProviderId(
       StateStoreId(dir, opId, partitionId), queryId)
 
     new StateSchemaCompatibilityChecker(providerId, hadoopConf)
-      .check(newKeySchema, newValueSchema)
+      .check(newKeySchema, newValueSchema, ignoreValueSchema = 
ignoreValueSchema)
   }
 
   private def verifyException(
       oldKeySchema: StructType,
       oldValueSchema: StructType,
       newKeySchema: StructType,
-      newValueSchema: StructType): Unit = {
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean = false): Unit = {
     val dir = newDir()
     val queryId = UUID.randomUUID()
-    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema)
+    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
+      ignoreValueSchema = ignoreValueSchema)
 
     val e = intercept[StateSchemaNotCompatible] {
-      runSchemaChecker(dir, queryId, newKeySchema, newValueSchema)
+      runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
+        ignoreValueSchema = ignoreValueSchema)
     }
 
-    e.getMessage.contains("Provided schema doesn't match to the schema for 
existing state!")
-    e.getMessage.contains(newKeySchema.json)
-    e.getMessage.contains(newValueSchema.json)
-    e.getMessage.contains(oldKeySchema.json)
-    e.getMessage.contains(oldValueSchema.json)
+    assert(e.getMessage.contains("Provided schema doesn't match to the schema 
for existing state!"))
+    assert(e.getMessage.contains(newKeySchema.toString()))
+    assert(e.getMessage.contains(oldKeySchema.toString()))
+
+    if (ignoreValueSchema) {
+      assert(!e.getMessage.contains(newValueSchema.toString()))
+      assert(!e.getMessage.contains(oldValueSchema.toString()))
+    } else {
+      assert(e.getMessage.contains(newValueSchema.toString()))
+      assert(e.getMessage.contains(oldValueSchema.toString()))
+    }
   }
 
   private def verifySuccess(
       oldKeySchema: StructType,
       oldValueSchema: StructType,
       newKeySchema: StructType,
-      newValueSchema: StructType): Unit = {
+      newValueSchema: StructType,
+      ignoreValueSchema: Boolean = false): Unit = {
     val dir = newDir()
     val queryId = UUID.randomUUID()
-    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema)
-    runSchemaChecker(dir, queryId, newKeySchema, newValueSchema)
+    runSchemaChecker(dir, queryId, oldKeySchema, oldValueSchema,
+      ignoreValueSchema = ignoreValueSchema)
+    runSchemaChecker(dir, queryId, newKeySchema, newValueSchema,
+      ignoreValueSchema = ignoreValueSchema)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index c1908d95f39..0315e03d187 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -17,11 +17,16 @@
 
 package org.apache.spark.sql.streaming
 
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class StreamingDeduplicationSuite extends StateStoreMetricsTest {
 
@@ -413,4 +418,69 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
       assertStateOperatorCustomMetric("numDroppedDuplicateRows", expected = 1)
     )
   }
+
+  test("SPARK-39650: duplicate with specific keys should allow input to change 
schema") {
+    withTempDir { checkpoint =>
+      val dedupeInputData = MemoryStream[(String, Int)]
+      val dedupe = dedupeInputData.toDS().dropDuplicates("_1")
+
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+
+        AddData(dedupeInputData, "a" -> 1),
+        CheckLastBatch("a" -> 1),
+
+        AddData(dedupeInputData, "a" -> 2, "b" -> 3),
+        CheckLastBatch("b" -> 3)
+      )
+
+      val dedupeInputData2 = MemoryStream[(String, Int, String)]
+      val dedupe2 = dedupeInputData2.toDS().dropDuplicates("_1")
+
+      // initialize new memory stream with previously executed batches
+      dedupeInputData2.addData(("a", 1, "dummy"))
+      dedupeInputData2.addData(Seq(("a", 2, "dummy"), ("b", 3, "dummy")))
+
+      testStream(dedupe2, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+
+        AddData(dedupeInputData2, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+        CheckLastBatch(("c", 9, "c"))
+      )
+    }
+  }
+
+  test("SPARK-39650: recovery from checkpoint having all columns as value 
schema") {
+    // NOTE: We are also changing the schema of input compared to the 
checkpoint. In the checkpoint
+    // we define the input schema as (String, Int).
+    val inputData = MemoryStream[(String, Int, String)]
+    val dedupe = inputData.toDS().dropDuplicates("_1")
+
+    // The fix will land after Spark 3.3.0, hence we can check backward 
compatibility with
+    // checkpoint being built from Spark 3.3.0.
+    val resourceUri = this.getClass.getResource(
+      
"/structured-streaming/checkpoint-version-3.3.0-streaming-deduplication/").toURI
+
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    // Copy the checkpoint to a temp dir to prevent changes to the original.
+    // Not doing this will lead to the test passing on the first run, but fail 
subsequent runs.
+    FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
+
+    inputData.addData(("a", 1, "dummy"))
+    inputData.addData(("a", 2, "dummy"), ("b", 3, "dummy"))
+
+    testStream(dedupe, Append)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
+      /*
+        Note: The checkpoint was generated using the following input in Spark 
version 3.3.0
+        AddData(inputData, ("a", 1)),
+        CheckLastBatch(("a", 1)),
+        AddData(inputData, ("a", 2), ("b", 3)),
+        CheckLastBatch(("b", 3))
+       */
+
+      AddData(inputData, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+      CheckLastBatch(("c", 9, "c"))
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to