HeartSaVioR commented on code in PR #40561: URL: https://github.com/apache/spark/pull/40561#discussion_r1153935588
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -980,3 +1022,65 @@ object StreamingDeduplicateExec { private val EMPTY_ROW = UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null)) } + +case class StreamingDeduplicateWithinWatermarkExec( + keyExpressions: Seq[Attribute], + child: SparkPlan, + stateInfo: Option[StatefulOperatorStateInfo] = None, + eventTimeWatermarkForLateEvents: Option[Long] = None, + eventTimeWatermarkForEviction: Option[Long] = None) + extends BaseStreamingDeduplicateExec { + + protected val schemaForValueRow: StructType = StructType( + Array(StructField("expiresAt", LongType, nullable = false))) Review Comment: Let's consolidate this to below comment thread. I pinged Gengliang in below comment to understand what is the use case story for TimestampType vs TimestampNTZType, is the type somehow interchangeable, etc. Before that, I'll just roll back the change to match with flatMapGroupsWithState. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org