Hi, I've been analyzing the watermark propagation added in the 3.5.0 recently and had to return to the basics of watermarks. One question is still unanswered in my head.
Why are the watermarks reserved to stateful queries? Can't they apply to the filtering late date out only? The reason is only historical, as the initial design doc <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit> mentions the aggregated queries exclusively? Or are there any technical limitations why writing the jobs like below don't drop late data automatically? import sparkSession.implicits._ implicit val sparkContext = sparkSession.sqlContext val clicksStream = MemoryStream[Click] val clicksWithWatermark = clicksStream.toDF .withWatermark("clickTime", "10 minutes") val query = clicksWithWatermark.writeStream.format("console").option("truncate", false) .start() clicksStream.addData(Seq( Click(1, Timestamp.valueOf("2023-06-10 10:10:00")), Click(2, Timestamp.valueOf("2023-06-10 10:12:00")), Click(3, Timestamp.valueOf("2023-06-10 10:14:00")) )) query.processAllAvailable() clicksStream.addData(Seq( Click(4, Timestamp.valueOf("2023-06-10 11:00:40")), Click(5, Timestamp.valueOf("2023-06-10 11:00:30")), Click(6, Timestamp.valueOf("2023-06-10 11:00:10")), Click(10, Timestamp.valueOf("2023-06-10 10:00:10")) )) query.processAllAvailable() One quick implementation could be adding a new physical plan rule to the IncrementalExecution <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala> for the EventTimeWatermark node. That's a first thought, maybe too simplistic and hiding some pitfalls? Best, Bartosz. -- freelance data engineer https://www.waitingforcode.com https://github.com/bartosz25/ https://twitter.com/waitingforcode