anishshri-db commented on code in PR #47359:
URL: https://github.com/apache/spark/pull/47359#discussion_r1685038116


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala:
##########
@@ -33,170 +32,190 @@ import org.apache.spark.util.Utils
 /**
  * Helper classes for reading/writing state schema.
  */
-sealed trait ColumnFamilySchema extends Serializable {
-  def jsonValue: JValue
-
-  def json: String
-
-  def columnFamilyName: String
-}
-
-case class ColumnFamilySchemaV1(
-    columnFamilyName: String,
-    keySchema: StructType,
-    valueSchema: StructType,
-    keyStateEncoderSpec: KeyStateEncoderSpec,
-    userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema {
-  def jsonValue: JValue = {
-    ("columnFamilyName" -> JString(columnFamilyName)) ~
-      ("keySchema" -> JString(keySchema.json)) ~
-      ("valueSchema" -> JString(valueSchema.json)) ~
-      ("keyStateEncoderSpec" -> keyStateEncoderSpec.jsonValue) ~
-      ("userKeyEncoder" -> userKeyEncoder.map(s => 
JString(s.json)).getOrElse(JNothing))
-  }
-
-  def json: String = {
-    compact(render(jsonValue))
-  }
-}
-
-object ColumnFamilySchemaV1 {
-
-  /**
-   * Create a ColumnFamilySchemaV1 object from the Json string
-   * This function is to read the StateSchemaV3 file
-   */
-  def fromJson(json: String): ColumnFamilySchema = {
-    implicit val formats: DefaultFormats.type = DefaultFormats
-    val colFamilyMap = JsonMethods.parse(json).extract[Map[String, Any]]
-    assert(colFamilyMap.isInstanceOf[Map[_, _]],
-      s"Expected Map but got ${colFamilyMap.getClass}")
-    val keySchema = 
StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String])
-    val valueSchema = 
StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String])
-    ColumnFamilySchemaV1(
-      colFamilyMap("columnFamilyName").asInstanceOf[String],
-      keySchema,
-      valueSchema,
-      KeyStateEncoderSpec.fromJson(keySchema, 
colFamilyMap("keyStateEncoderSpec")
-        .asInstanceOf[Map[String, Any]]),
-      
colFamilyMap.get("userKeyEncoder").map(_.asInstanceOf[String]).map(StructType.fromString)
-    )
-  }
-}
+case class StateSchemaFormatV3(
+    stateStoreColFamilySchema: List[StateStoreColFamilySchema]
+)
 
 object SchemaHelper {
 
   sealed trait SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType)
+    def version: Int
+
+    def read(inputStream: FSDataInputStream): List[StateStoreColFamilySchema]
+
+    protected def readJsonSchema(inputStream: FSDataInputStream): String = {
+      val buf = new StringBuilder
+      val numChunks = inputStream.readInt()
+      (0 until numChunks).foreach(_ => buf.append(inputStream.readUTF()))
+      buf.toString()
+    }
   }
 
   object SchemaReader {
     def createSchemaReader(versionStr: String): SchemaReader = {
       val version = MetadataVersionUtil.validateVersion(versionStr,
-        StateSchemaCompatibilityChecker.VERSION)
+        3)
       version match {
         case 1 => new SchemaV1Reader
         case 2 => new SchemaV2Reader
+        case 3 => new SchemaV3Reader
       }
     }
   }
 
   class SchemaV1Reader extends SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
+    override def version: Int = 1
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
       val keySchemaStr = inputStream.readUTF()
       val valueSchemaStr = inputStream.readUTF()
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+      List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+        StructType.fromString(keySchemaStr),
+        StructType.fromString(valueSchemaStr)))
     }
   }
 
   class SchemaV2Reader extends SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
-      val buf = new StringBuilder
-      val numKeyChunks = inputStream.readInt()
-      (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF()))
-      val keySchemaStr = buf.toString()
-
-      buf.clear()
-      val numValueChunks = inputStream.readInt()
-      (0 until numValueChunks).foreach(_ => buf.append(inputStream.readUTF()))
-      val valueSchemaStr = buf.toString()
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+    override def version: Int = 2
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
+      val keySchemaStr = readJsonSchema(inputStream)
+      val valueSchemaStr = readJsonSchema(inputStream)
+
+      List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+        StructType.fromString(keySchemaStr),
+        StructType.fromString(valueSchemaStr)))
+    }
+  }
+
+  class SchemaV3Reader extends SchemaReader {
+    override def version: Int = 3
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
+      implicit val formats: DefaultFormats.type = DefaultFormats
+      val numEntries = inputStream.readInt()
+      (0 until numEntries).map { _ =>
+        // read the col family name and the key and value schema
+        val colFamilyName = inputStream.readUTF()
+        val keySchemaStr = readJsonSchema(inputStream)
+        val valueSchemaStr = readJsonSchema(inputStream)
+        val keySchema = StructType.fromString(keySchemaStr)
+
+        // use the key schema to also populate the encoder spec
+        val keyEncoderSpecStr = readJsonSchema(inputStream)
+        val colFamilyMap = 
JsonMethods.parse(keyEncoderSpecStr).extract[Map[String, Any]]
+        val encoderSpec = KeyStateEncoderSpec.fromJson(keySchema, colFamilyMap)
+
+        // read the user key encoder spec if provided
+        val userKeyEncoderSchemaStr = readJsonSchema(inputStream)
+        val userKeyEncoderSchema = 
Try(StructType.fromString(userKeyEncoderSchemaStr)).toOption
+
+        StateStoreColFamilySchema(colFamilyName,
+          keySchema,

Review Comment:
   Discussed offline - we need the `keySchema` in the line above (at 107). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to