xuanyuanking commented on a change in pull request #28707:
URL: https://github.com/apache/spark/pull/28707#discussion_r434418012



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StreamingAggregationStateManager.scala
##########
@@ -94,6 +110,28 @@ abstract class StreamingAggregationStateManagerBaseImpl(
     // discard and don't convert values to avoid computation
     store.getRange(None, None).map(_.key)
   }
+
+  override def unsafeRowFormatValidation(row: UnsafeRow, schema: StructType): 
Unit = {
+    if (checkFormat && SQLConf.get.getConf(
+        SQLConf.STREAMING_STATE_FORMAT_CHECK_ENABLED) && row != null) {
+      if (schema.fields.length != row.numFields) {

Review comment:
       Actually that's the first version I did. Since the checking logic is 
only used for streaming aggregation query and also depends on the streaming 
config, I choose to put it in StreamingAggregationStateManager, WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to