anishshri-db commented on code in PR #47359:
URL: https://github.com/apache/spark/pull/47359#discussion_r1683589525


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala:
##########
@@ -68,11 +68,11 @@ object StreamStreamJoinStateHelper {
     val newHadoopConf = session.sessionState.newHadoopConf()
 
     val manager = new 
StateSchemaCompatibilityChecker(providerIdForKeyToNumValues, newHadoopConf)
-    val (keySchema, _) = manager.readSchemaFile()
+    val keySchema = manager.readSchemaFile().head.keySchema
 
     val manager2 = new 
StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,

Review Comment:
   Done



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala:
##########
@@ -235,14 +237,43 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     val queryId = UUID.randomUUID()
     val providerId = StateStoreProviderId(
       StateStoreId(dir, opId, partitionId), queryId)
+    val storeColFamilySchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+      keySchema, valueSchema))
     val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
-    checker.createSchemaFile(keySchema, valueSchema,
+    checker.createSchemaFile(storeColFamilySchema,
       SchemaHelper.SchemaWriter.createSchemaWriter(1))
-    val (resultKeySchema, resultValueSchema) = checker.readSchemaFile()
+    val stateSchema = checker.readSchemaFile().head
+    val (resultKeySchema, resultValueSchema) = (stateSchema.keySchema, 
stateSchema.valueSchema)
 
     assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema))
   }
 
+  test("checking for compatibility with schema version 3") {
+    val stateSchemaVersion = 3
+    val dir = newDir()
+    val queryId = UUID.randomUUID()
+    val providerId = StateStoreProviderId(
+      StateStoreId(dir, opId, partitionId), queryId)
+    val runId = UUID.randomUUID()
+    val stateInfo = StatefulOperatorStateInfo(dir, runId, opId, 0, 200)
+    val storeColFamilySchema = List(
+      StateStoreColFamilySchema("test1", keySchema, valueSchema,
+        keyStateEncoderSpec = getKeyStateEncoderSpec(stateSchemaVersion, 
keySchema)),
+      StateStoreColFamilySchema("test2", longKeySchema, longValueSchema,
+        keyStateEncoderSpec = getKeyStateEncoderSpec(stateSchemaVersion, 
longKeySchema)),
+      StateStoreColFamilySchema("test3", keySchema65535Bytes, 
valueSchema65535Bytes,
+        keyStateEncoderSpec = getKeyStateEncoderSpec(stateSchemaVersion, 
keySchema65535Bytes)))

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to