HeartSaVioR commented on code in PR #47035:
URL: https://github.com/apache/spark/pull/47035#discussion_r1650477997


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -103,10 +130,85 @@ class StateSchemaCompatibilityChecker(
     }
   }
 
+  def validateAndMaybeEvolveSchema(

Review Comment:
   If we are going to replace all calls from check to this one, please ensure 
that check method is removed.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -767,6 +767,35 @@ object SymmetricHashJoinStateManager {
     }
   }
 
+  def getSchemaForStateStores(
+      joinSide: JoinSide,
+      inputValueAttributes: Seq[Attribute],
+      joinKeys: Seq[Expression],
+      stateFormatVersion: Int): scala.collection.mutable.Map[String, 
(StructType, StructType)] = {

Review Comment:
   Is caller expected to modify the result? If not, let's simply return 
immutable Map.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -424,6 +430,11 @@ case class StateStoreRestoreExec(
   private[sql] val stateManager = 
StreamingAggregationStateManager.createStateManager(
     keyExpressions, child.output, stateFormatVersion)
 
+  override def validateAndMaybeEvolveSchema(hadoopConf: Configuration): Unit = 
{

Review Comment:
   doesn't need this in here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -70,6 +72,10 @@ trait StatefulOperator extends SparkPlan {
       throw new IllegalStateException("State location not present for 
execution")
     }
   }
+
+  // Function used to record state schema for the first time and validate it 
against proposed
+  // schema changes in the future. Runs as part of a planning rule on the 
driver.
+  def validateAndMaybeEvolveSchema(hadoopConf: Configuration): Unit

Review Comment:
   The name is ambiguous, which schema is the target? Operator has multiple 
aspect of schema. Let's say `State`Schema.
   
   Also, if you add this in StatefulOperator, streaming aggregation will have 
this in both places, restore & save. StateStoreWriter is probably the better 
place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to