ericm-db commented on code in PR #47359:
URL: https://github.com/apache/spark/pull/47359#discussion_r1683426922


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -30,19 +30,44 @@ import 
org.apache.spark.sql.execution.streaming.state.SchemaHelper.{SchemaReader
 import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.{DataType, StructType}
 
+// Result returned after validating the schema of the state store for schema 
changes
+case class StateSchemaValidationResult(
+    evolvedSchema: Boolean,
+    schemaPath: String
+)
+
+// Used to represent the schema of a column family in the state store
+case class StateStoreColFamilySchema(
+    colFamilyName: String,
+    keySchema: StructType,
+    valueSchema: StructType,
+    keyStateEncoderSpec: Option[KeyStateEncoderSpec] = None

Review Comment:
   do we not need userKeySchema?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -18,58 +18,46 @@ package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.Encoder
 import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
-import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
 
-trait ColumnFamilySchemaUtils {
-  def getValueStateSchema[T](stateName: String, hasTtl: Boolean): 
ColumnFamilySchema
+object StateSchemaUtils {

Review Comment:
   Should we rename the file as well



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala:
##########
@@ -33,170 +28,164 @@ import org.apache.spark.util.Utils
 /**
  * Helper classes for reading/writing state schema.
  */
-sealed trait ColumnFamilySchema extends Serializable {
-  def jsonValue: JValue
-
-  def json: String
-
-  def columnFamilyName: String
-}
-
-case class ColumnFamilySchemaV1(
-    columnFamilyName: String,
-    keySchema: StructType,
-    valueSchema: StructType,
-    keyStateEncoderSpec: KeyStateEncoderSpec,
-    userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema {
-  def jsonValue: JValue = {
-    ("columnFamilyName" -> JString(columnFamilyName)) ~
-      ("keySchema" -> JString(keySchema.json)) ~
-      ("valueSchema" -> JString(valueSchema.json)) ~
-      ("keyStateEncoderSpec" -> keyStateEncoderSpec.jsonValue) ~
-      ("userKeyEncoder" -> userKeyEncoder.map(s => 
JString(s.json)).getOrElse(JNothing))
-  }
-
-  def json: String = {
-    compact(render(jsonValue))
-  }
-}
-
-object ColumnFamilySchemaV1 {
-
-  /**
-   * Create a ColumnFamilySchemaV1 object from the Json string
-   * This function is to read the StateSchemaV3 file
-   */
-  def fromJson(json: String): ColumnFamilySchema = {
-    implicit val formats: DefaultFormats.type = DefaultFormats
-    val colFamilyMap = JsonMethods.parse(json).extract[Map[String, Any]]
-    assert(colFamilyMap.isInstanceOf[Map[_, _]],
-      s"Expected Map but got ${colFamilyMap.getClass}")
-    val keySchema = 
StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String])
-    val valueSchema = 
StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String])
-    ColumnFamilySchemaV1(
-      colFamilyMap("columnFamilyName").asInstanceOf[String],
-      keySchema,
-      valueSchema,
-      KeyStateEncoderSpec.fromJson(keySchema, 
colFamilyMap("keyStateEncoderSpec")
-        .asInstanceOf[Map[String, Any]]),
-      
colFamilyMap.get("userKeyEncoder").map(_.asInstanceOf[String]).map(StructType.fromString)
-    )
-  }
-}
+case class StateSchemaFormatV3(
+    stateStoreColFamilySchema: List[StateStoreColFamilySchema]
+)
 
 object SchemaHelper {
 
   sealed trait SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType)
+    def version: Int
+
+    def read(inputStream: FSDataInputStream): List[StateStoreColFamilySchema]
+
+    protected def readJsonSchema(inputStream: FSDataInputStream): String = {
+      val buf = new StringBuilder
+      val numChunks = inputStream.readInt()
+      (0 until numChunks).foreach(_ => buf.append(inputStream.readUTF()))
+      buf.toString()
+    }
   }
 
   object SchemaReader {
     def createSchemaReader(versionStr: String): SchemaReader = {
       val version = MetadataVersionUtil.validateVersion(versionStr,
-        StateSchemaCompatibilityChecker.VERSION)
+        3)
       version match {
         case 1 => new SchemaV1Reader
         case 2 => new SchemaV2Reader
+        case 3 => new SchemaV3Reader
       }
     }
   }
 
   class SchemaV1Reader extends SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
+    override def version: Int = 1
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
       val keySchemaStr = inputStream.readUTF()
       val valueSchemaStr = inputStream.readUTF()
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+      List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+        StructType.fromString(keySchemaStr),
+        StructType.fromString(valueSchemaStr)))
     }
   }
 
   class SchemaV2Reader extends SchemaReader {
-    def read(inputStream: FSDataInputStream): (StructType, StructType) = {
-      val buf = new StringBuilder
-      val numKeyChunks = inputStream.readInt()
-      (0 until numKeyChunks).foreach(_ => buf.append(inputStream.readUTF()))
-      val keySchemaStr = buf.toString()
-
-      buf.clear()
-      val numValueChunks = inputStream.readInt()
-      (0 until numValueChunks).foreach(_ => buf.append(inputStream.readUTF()))
-      val valueSchemaStr = buf.toString()
-      (StructType.fromString(keySchemaStr), 
StructType.fromString(valueSchemaStr))
+    override def version: Int = 2
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
+      val keySchemaStr = readJsonSchema(inputStream)
+      val valueSchemaStr = readJsonSchema(inputStream)
+
+      List(StateStoreColFamilySchema(StateStore.DEFAULT_COL_FAMILY_NAME,
+        StructType.fromString(keySchemaStr),
+        StructType.fromString(valueSchemaStr)))
+    }
+  }
+
+  class SchemaV3Reader extends SchemaReader {
+    override def version: Int = 3
+
+    override def read(inputStream: FSDataInputStream): 
List[StateStoreColFamilySchema] = {
+      val numEntries = inputStream.readInt()
+      (0 until numEntries).map { _ =>
+        val colFamilyName = inputStream.readUTF()
+        val keySchemaStr = readJsonSchema(inputStream)
+        val valueSchemaStr = readJsonSchema(inputStream)
+        StateStoreColFamilySchema(colFamilyName,
+          StructType.fromString(keySchemaStr),
+          StructType.fromString(valueSchemaStr))

Review Comment:
   Don't you need to read/write the KeyStateEncoderSpec?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SchemaHelper.scala:
##########
@@ -33,170 +28,164 @@ import org.apache.spark.util.Utils
 /**
  * Helper classes for reading/writing state schema.
  */
-sealed trait ColumnFamilySchema extends Serializable {
-  def jsonValue: JValue
-
-  def json: String
-
-  def columnFamilyName: String
-}
-
-case class ColumnFamilySchemaV1(
-    columnFamilyName: String,
-    keySchema: StructType,
-    valueSchema: StructType,
-    keyStateEncoderSpec: KeyStateEncoderSpec,
-    userKeyEncoder: Option[StructType] = None) extends ColumnFamilySchema {
-  def jsonValue: JValue = {
-    ("columnFamilyName" -> JString(columnFamilyName)) ~
-      ("keySchema" -> JString(keySchema.json)) ~
-      ("valueSchema" -> JString(valueSchema.json)) ~
-      ("keyStateEncoderSpec" -> keyStateEncoderSpec.jsonValue) ~
-      ("userKeyEncoder" -> userKeyEncoder.map(s => 
JString(s.json)).getOrElse(JNothing))
-  }
-
-  def json: String = {
-    compact(render(jsonValue))
-  }
-}
-
-object ColumnFamilySchemaV1 {
-
-  /**
-   * Create a ColumnFamilySchemaV1 object from the Json string
-   * This function is to read the StateSchemaV3 file
-   */
-  def fromJson(json: String): ColumnFamilySchema = {
-    implicit val formats: DefaultFormats.type = DefaultFormats
-    val colFamilyMap = JsonMethods.parse(json).extract[Map[String, Any]]
-    assert(colFamilyMap.isInstanceOf[Map[_, _]],
-      s"Expected Map but got ${colFamilyMap.getClass}")
-    val keySchema = 
StructType.fromString(colFamilyMap("keySchema").asInstanceOf[String])
-    val valueSchema = 
StructType.fromString(colFamilyMap("valueSchema").asInstanceOf[String])
-    ColumnFamilySchemaV1(
-      colFamilyMap("columnFamilyName").asInstanceOf[String],
-      keySchema,
-      valueSchema,
-      KeyStateEncoderSpec.fromJson(keySchema, 
colFamilyMap("keyStateEncoderSpec")
-        .asInstanceOf[Map[String, Any]]),
-      
colFamilyMap.get("userKeyEncoder").map(_.asInstanceOf[String]).map(StructType.fromString)
-    )
-  }
-}
+case class StateSchemaFormatV3(
+    stateStoreColFamilySchema: List[StateStoreColFamilySchema]

Review Comment:
   Do we want to make this a V1? How will we deal with future versions of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to