xuanyuanking commented on a change in pull request #28707: URL: https://github.com/apache/spark/pull/28707#discussion_r434418012
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala ########## @@ -94,6 +110,28 @@ abstract class StreamingAggregationStateManagerBaseImpl( // discard and don't convert values to avoid computation store.getRange(None, None).map(_.key) } + + override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): Unit = { + if (checkFormat && SQLConf.get.getConf( + SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) { + if (schema.fields.length != row.numFields) { Review comment: Actually that's the first version I did. Since the checking logic is only used for streaming aggregation query and also depends on the streaming config, I choose to put it in StreamingAggregationStateManager, WDYT? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org