HeartSaVioR commented on code in PR #47035: URL: https://github.com/apache/spark/pull/47035#discussion_r1650477997
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -103,10 +130,85 @@ class StateSchemaCompatibilityChecker( } } + def validateAndMaybeEvolveSchema( Review Comment: If we are going to replace all calls from check to this one, please ensure that check method is removed. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala: ########## @@ -767,6 +767,35 @@ object SymmetricHashJoinStateManager { } } + def getSchemaForStateStores( + joinSide: JoinSide, + inputValueAttributes: Seq[Attribute], + joinKeys: Seq[Expression], + stateFormatVersion: Int): scala.collection.mutable.Map[String, (StructType, StructType)] = { Review Comment: Is caller expected to modify the result? If not, let's simply return immutable Map. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -424,6 +430,11 @@ case class StateStoreRestoreExec( private[sql] val stateManager = StreamingAggregationStateManager.createStateManager( keyExpressions, child.output, stateFormatVersion) + override def validateAndMaybeEvolveSchema(hadoopConf: Configuration): Unit = { Review Comment: doesn't need this in here ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -70,6 +72,10 @@ trait StatefulOperator extends SparkPlan { throw new IllegalStateException("State location not present for execution") } } + + // Function used to record state schema for the first time and validate it against proposed + // schema changes in the future. Runs as part of a planning rule on the driver. + def validateAndMaybeEvolveSchema(hadoopConf: Configuration): Unit Review Comment: The name is ambiguous, which schema is the target? Operator has multiple aspect of schema. Let's say `State`Schema. Also, if you add this in StatefulOperator, streaming aggregation will have this in both places, restore & save. StateStoreWriter is probably the better place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org