HeartSaVioR commented on code in PR #47359:
URL: https://github.com/apache/spark/pull/47359#discussion_r1683896248


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -30,19 +30,45 @@ import 
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader
 import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.{DataType, StructType}
 
+// Result returned after validating the schema of the state store for schema 
changes
+case class StateSchemaValidationResult(
+    evolvedSchema: Boolean,
+    schemaPath: String
+)
+
+// Used to represent the schema of a column family in the state store
+case class StateStoreColFamilySchema(
+    colFamilyName: String,
+    keySchema: StructType,
+    valueSchema: StructType,
+    keyStateEncoderSpec: Option[KeyStateEncoderSpec] = None,
+    userKeyEncoderSchema: Option[StructType] = None
+)
+
 class StateSchemaCompatibilityChecker(
     providerId: StateStoreProviderId,
-    hadoopConf: Configuration) extends Logging {
+    hadoopConf: Configuration,
+    stateSchemaVersion: Int = 2,

Review Comment:
   nit: probably better to use constant 
`StateSchemaCompatibilityChecker.VERSION`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########


Review Comment:
   Will this change be rebased with #47257? I'm not sure what would be the 
final outcome. I'll defer the review of this file (and related change) after 
rebase.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala:
##########
@@ -33,170 +32,190 @@ import org.apache.spark.util.Utils
 /**
  * Helper classes for reading/writing state schema.
  */
-sealed trait ColumnFamilySchema extends Serializable {
-  def jsonValue: JValue
-
-  def json: String
-
-  def columnFamilyName: String
-}
-
-case class ColumnFamilySchemaV1(
-    columnFamilyName: String,
-    keySchema: StructType,
-    valueSchema: StructType,
-    keyStateEncoderSpec: KeyStateEncoderSpec,
-    userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema {
-  def jsonValue: JValue = {
-    ("columnFamilyName" -> JString(columnFamilyName)) ~
-      ("keySchema" -> JString(keySchema.json)) ~
-      ("valueSchema" -> JString(valueSchema.json)) ~
-      ("keyStateEncoderSpec" -> keyStateEncoderSpec.jsonValue) ~
-      ("userKeyEncoder" -> userKeyEncoder.map(s => 
JString(s.json)).getOrElse(JNothing))
-  }
-
-  def json: String = {
-    compact(render(jsonValue))
-  }
-}
-
-object ColumnFamilySchemaV1 {
-
-  /**
-   * Create a ColumnFamilySchemaV1 object from the Json string
-   * This function is to read the StateSchemaV3 file
-   */
-  def fromJson(json: String): ColumnFamilySchema = {
-    implicit val formats: DefaultFormats.type = DefaultFormats
-    val colFamilyMap = JsonMethods.parse(json).extract[Map[String, Any]]
-    assert(colFamilyMap.isInstanceOf[Map[_, _]],
-      s"Expected Map but got ${colFamilyMap.getClass}")
-    val keySchema = 
StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String])
-    val valueSchema = 
StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String])
-    ColumnFamilySchemaV1(
-      colFamilyMap("columnFamilyName").asInstanceOf[String],
-      keySchema,
-      valueSchema,
-      KeyStateEncoderSpec.fromJson(keySchema, 
colFamilyMap("keyStateEncoderSpec")
-        .asInstanceOf[Map[String, Any]]),
-      
colFamilyMap.get("userKeyEncoder").map(_.asInstanceOf[String]).map(StructType.fromString)
-    )
-  }
-}
+case class StateSchemaFormatV3(
+    stateStoreColFamilySchema: List[StateStoreColFamilySchema]
+)
 
 object SchemaHelper {
 
   sealed trait SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType)
+    def version: Int
+
+    def read(inputStream: FSDataInputStream): List[StateStoreColFamilySchema]
+
+    protected def readJsonSchema(inputStream: FSDataInputStream): String = {
+      val buf = new StringBuilder
+      val numChunks = inputStream.readInt()
+      (0 until numChunks).foreach(_ => buf.append(inputStream.readUTF()))
+      buf.toString()
+    }
   }
 
   object SchemaReader {
     def createSchemaReader(versionStr: String): SchemaReader = {
       val version = MetadataVersionUtil.validateVersion(versionStr,
-        StateSchemaCompatibilityChecker.VERSION)
+        3)
       version match {
         case 1 => new SchemaV1Reader
         case 2 => new SchemaV2Reader
+        case 3 => new SchemaV3Reader
       }
     }
   }
 
   class SchemaV1Reader extends SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
+    override def version: Int = 1
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
       val keySchemaStr = inputStream.readUTF()
       val valueSchemaStr = inputStream.readUTF()
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+      List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+        StructType.fromString(keySchemaStr),
+        StructType.fromString(valueSchemaStr)))
     }
   }
 
   class SchemaV2Reader extends SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
-      val buf = new StringBuilder
-      val numKeyChunks = inputStream.readInt()
-      (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF()))
-      val keySchemaStr = buf.toString()
-
-      buf.clear()
-      val numValueChunks = inputStream.readInt()
-      (0 until numValueChunks).foreach(_ => buf.append(inputStream.readUTF()))
-      val valueSchemaStr = buf.toString()
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+    override def version: Int = 2
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
+      val keySchemaStr = readJsonSchema(inputStream)
+      val valueSchemaStr = readJsonSchema(inputStream)
+
+      List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+        StructType.fromString(keySchemaStr),
+        StructType.fromString(valueSchemaStr)))
+    }
+  }
+
+  class SchemaV3Reader extends SchemaReader {
+    override def version: Int = 3
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
+      implicit val formats: DefaultFormats.type = DefaultFormats
+      val numEntries = inputStream.readInt()
+      (0 until numEntries).map { _ =>
+        // read the col family name and the key and value schema
+        val colFamilyName = inputStream.readUTF()
+        val keySchemaStr = readJsonSchema(inputStream)
+        val valueSchemaStr = readJsonSchema(inputStream)
+        val keySchema = StructType.fromString(keySchemaStr)
+
+        // use the key schema to also populate the encoder spec
+        val keyEncoderSpecStr = readJsonSchema(inputStream)
+        val colFamilyMap = 
JsonMethods.parse(keyEncoderSpecStr).extract[Map[String, Any]]
+        val encoderSpec = KeyStateEncoderSpec.fromJson(keySchema, colFamilyMap)
+
+        // read the user key encoder spec if provided
+        val userKeyEncoderSchemaStr = readJsonSchema(inputStream)
+        val userKeyEncoderSchema = 
Try(StructType.fromString(userKeyEncoderSchemaStr)).toOption
+
+        StateStoreColFamilySchema(colFamilyName,
+          keySchema,

Review Comment:
   nit: inline? just to be consistent with below



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -140,8 +147,32 @@ object StateSchemaCompatibilityChecker extends Logging {
     }
   }
 
-  private def schemasCompatible(storedSchema: StructType, schema: StructType): 
Boolean =
-    DataType.equalsIgnoreNameAndCompatibleNullability(schema, storedSchema)
+  def validateAndMaybeEvolveStateSchema(

Review Comment:
   Shall we add method doc as it's public method? Especially it's unclear how 
to interpret "return value".



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityCheckerSuite.scala:
##########
@@ -235,14 +237,46 @@ class StateSchemaCompatibilityCheckerSuite extends 
SharedSparkSession {
     val queryId = UUID.randomUUID()
     val providerId = StateStoreProviderId(
       StateStoreId(dir, opId, partitionId), queryId)
+    val storeColFamilySchema = 
List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+      keySchema, valueSchema))
     val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
-    checker.createSchemaFile(keySchema, valueSchema,
+    checker.createSchemaFile(storeColFamilySchema,
       SchemaHelper.SchemaWriter.createSchemaWriter(1))
-    val (resultKeySchema, resultValueSchema) = checker.readSchemaFile()
+    val stateSchema = checker.readSchemaFile().head
+    val (resultKeySchema, resultValueSchema) = (stateSchema.keySchema, 
stateSchema.valueSchema)
 
     assert((resultKeySchema, resultValueSchema) === (keySchema, valueSchema))
   }
 
+  test("checking for compatibility with schema version 3") {
+    val stateSchemaVersion = 3
+    val dir = newDir()
+    val queryId = UUID.randomUUID()
+    val providerId = StateStoreProviderId(
+      StateStoreId(dir, opId, partitionId), queryId)
+    val runId = UUID.randomUUID()
+    val stateInfo = StatefulOperatorStateInfo(dir, runId, opId, 0, 200)
+    val storeColFamilySchema = List(
+      StateStoreColFamilySchema("test1", keySchema, valueSchema,

Review Comment:
   Why not test with various encoder specs here given we are testing version 3 
specifically?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -183,25 +215,34 @@ object StateSchemaCompatibilityChecker extends Logging {
     // We need to disallow using binary inequality column in the key schema, 
before we
     // could support this in majority of state store providers (or high-level 
of state
     // store.)
-    disallowBinaryInequalityColumn(newKeySchema)
+    newStateSchema.foreach { schema =>
+      disallowBinaryInequalityColumn(schema.keySchema)
+    }
 
     val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
     val providerId = 
StateStoreProviderId(StateStoreId(stateInfo.checkpointLocation,
       stateInfo.operatorId, 0, storeName), stateInfo.queryRunId)
-    val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf)
+    val checker = new StateSchemaCompatibilityChecker(providerId, hadoopConf, 
stateSchemaVersion,
+      schemaFilePath = schemaFilePath)
     // regardless of configuration, we check compatibility to at least write 
schema file
     // if necessary
     // if the format validation for value schema is disabled, we also disable 
the schema
     // compatibility checker for value schema as well.
+
+    var evolvedSchema = false
     val result = Try(
-      checker.validateAndMaybeEvolveStateSchema(newKeySchema, newValueSchema,
+      checker.validateAndMaybeEvolveStateSchema(newStateSchema,
         ignoreValueSchema = !storeConf.formatValidationCheckValue)
-    ).toEither.fold(Some(_), _ => None)
+    ).toEither.fold(Some(_),
+      hasEvolvedSchema => {
+        evolvedSchema = hasEvolvedSchema

Review Comment:
   Does this follow the return value of validateAndMaybeEvolveStateSchema()? If 
then, are we considering that state schema evolution has happened if there was 
no schema file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to