anishshri-db commented on code in PR #47257:
URL: https://github.com/apache/spark/pull/47257#discussion_r1676311164


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ColumnFamilySchemaUtils.scala:
##########
@@ -17,58 +17,68 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.Encoder
-import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA, VALUE_ROW_SCHEMA_WITH_TTL}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema._
+import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchema.{COMPOSITE_KEY_ROW_SCHEMA,
 KEY_ROW_SCHEMA}
 import org.apache.spark.sql.execution.streaming.state.{ColumnFamilySchema, 
ColumnFamilySchemaV1, NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec}
 
 trait ColumnFamilySchemaUtils {
-  def getValueStateSchema[T](stateName: String, hasTtl: Boolean): 
ColumnFamilySchema
+  def getValueStateSchema[T](
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any],
+      valEncoder: Encoder[T],
+      hasTtl: Boolean): ColumnFamilySchema
 
-  def getListStateSchema[T](stateName: String, hasTtl: Boolean): 
ColumnFamilySchema
+  def getListStateSchema[T](
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any],
+      valEncoder: Encoder[T],
+      hasTtl: Boolean): ColumnFamilySchema
 
   def getMapStateSchema[K, V](
       stateName: String,
+      keyEncoder: ExpressionEncoder[Any],
       userKeyEnc: Encoder[K],
+      valEncoder: Encoder[V],
       hasTtl: Boolean): ColumnFamilySchema
 }
-
 object ColumnFamilySchemaUtilsV1 extends ColumnFamilySchemaUtils {
 
-  def getValueStateSchema[T](stateName: String, hasTtl: Boolean): 
ColumnFamilySchemaV1 = {
-    ColumnFamilySchemaV1(
+  def getValueStateSchema[T](
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any],
+      valEncoder: Encoder[T],
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(
       stateName,
-      KEY_ROW_SCHEMA,
-      if (hasTtl) {
-        VALUE_ROW_SCHEMA_WITH_TTL
-      } else {
-        VALUE_ROW_SCHEMA
-      },
+      getKeySchema(keyEncoder.schema),
+      getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA))
   }
 
-  def getListStateSchema[T](stateName: String, hasTtl: Boolean): 
ColumnFamilySchemaV1 = {
-    ColumnFamilySchemaV1(
+  def getListStateSchema[T](
+      stateName: String,
+      keyEncoder: ExpressionEncoder[Any],
+      valEncoder: Encoder[T],
+      hasTtl: Boolean): ColumnFamilySchemaV1 = {
+    new ColumnFamilySchemaV1(
       stateName,
-      KEY_ROW_SCHEMA,
-      if (hasTtl) {
-        VALUE_ROW_SCHEMA_WITH_TTL
-      } else {
-        VALUE_ROW_SCHEMA
-      },
+      getKeySchema(keyEncoder.schema),
+      getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       NoPrefixKeyStateEncoderSpec(KEY_ROW_SCHEMA))

Review Comment:
   Do we need to change the schema passed to the encoder spec too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to