Hi,

I've been analyzing the watermark propagation added in the 3.5.0 recently
and had to return to the basics of watermarks. One question is still
unanswered in my head.

Why are the watermarks reserved to stateful queries? Can't they apply to
the filtering late date out only?

The reason is only historical, as the initial design doc
<https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
mentions the aggregated queries exclusively? Or are there any technical
limitations why writing the jobs like below don't drop late data
automatically?

    import sparkSession.implicits._
    implicit val sparkContext = sparkSession.sqlContext
    val clicksStream = MemoryStream[Click]
    val clicksWithWatermark = clicksStream.toDF
      .withWatermark("clickTime", "10 minutes")
    val query =
clicksWithWatermark.writeStream.format("console").option("truncate", false)
      .start()

    clicksStream.addData(Seq(
      Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
      Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
      Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
    ))


    query.processAllAvailable()

    clicksStream.addData(Seq(
      Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
      Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
      Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
      Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
    ))
    query.processAllAvailable()

One quick implementation could be adding a new physical plan rule to the
IncrementalExecution
<https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
for the EventTimeWatermark node. That's a first thought, maybe too
simplistic and hiding some pitfalls?

Best,
Bartosz.
-- 
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode

Reply via email to