Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound like a highly demanded feature from the end users, haven't seen that a lot on StackOverflow or mailing lists. I was just curious about the reasons.
Using the arbitrary stateful processing could be indeed a workaround! But IMHO it would be easier to expose this watermark value from a function like a current_watermark() and let the users do anything with the data. And it wouldn't require having the state store overhead to deal with. The function could simplify implementing the *side output pattern* where we could process the on-time data differently from the late data, e.g. write late data to a dedicated space in the lake and facilitate the backfilling for the batch pipelines? With the current_watermark function it could be expressed as a simple: streamDataset.foreachBatch((dataframe, batchVersion) => { dataframe.cache() dataframe.filter(current_watermark() > event_time_from_datafarame).writeTo("late_data") dataframe.filter(current_watermark() <= event_time_from_datafarame).writeTo("on_time_data") }) A little bit as you can do with Apache Flink in fact: https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81 WDYT? Best, Bartosz. PS. Will be happy to contribute on that if the feature does make sense ;) On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> wrote: > Technically speaking, "late data" represents the data which cannot be > processed due to the fact the engine threw out the state associated with > the data already. > > That said, the only reason watermark does exist for streaming is to handle > stateful operators. From the engine's point of view, there is no concept > about "late data" for stateless query. It's something users have to > leverage "filter" by themselves, without relying on the value of watermark. > I guess someone may see some benefit of automatic tracking of trend for > event time and want to define late data based on the watermark even in > stateless query, but personally I don't hear about the request so far. > > As a workaround you can leverage flatMapGroupsWithState which provides the > value of watermark for you, but I'd agree it's too heavyweight just to do > this. If we see consistent demand on it, we could probably look into it and > maybe introduce a new SQL function (which works only on streaming - that's > probably a major blocker on introduction) on it. > > On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <bartkoniec...@gmail.com> > wrote: > >> Hi, >> >> I've been analyzing the watermark propagation added in the 3.5.0 recently >> and had to return to the basics of watermarks. One question is still >> unanswered in my head. >> >> Why are the watermarks reserved to stateful queries? Can't they apply to >> the filtering late date out only? >> >> The reason is only historical, as the initial design doc >> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit> >> mentions the aggregated queries exclusively? Or are there any technical >> limitations why writing the jobs like below don't drop late data >> automatically? >> >> import sparkSession.implicits._ >> implicit val sparkContext = sparkSession.sqlContext >> val clicksStream = MemoryStream[Click] >> val clicksWithWatermark = clicksStream.toDF >> .withWatermark("clickTime", "10 minutes") >> val query = >> clicksWithWatermark.writeStream.format("console").option("truncate", false) >> .start() >> >> clicksStream.addData(Seq( >> Click(1, Timestamp.valueOf("2023-06-10 10:10:00")), >> Click(2, Timestamp.valueOf("2023-06-10 10:12:00")), >> Click(3, Timestamp.valueOf("2023-06-10 10:14:00")) >> )) >> >> >> query.processAllAvailable() >> >> clicksStream.addData(Seq( >> Click(4, Timestamp.valueOf("2023-06-10 11:00:40")), >> Click(5, Timestamp.valueOf("2023-06-10 11:00:30")), >> Click(6, Timestamp.valueOf("2023-06-10 11:00:10")), >> Click(10, Timestamp.valueOf("2023-06-10 10:00:10")) >> )) >> query.processAllAvailable() >> >> One quick implementation could be adding a new physical plan rule to the >> IncrementalExecution >> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala> >> for the EventTimeWatermark node. That's a first thought, maybe too >> simplistic and hiding some pitfalls? >> >> Best, >> Bartosz. >> -- >> freelance data engineer >> https://www.waitingforcode.com >> https://github.com/bartosz25/ >> https://twitter.com/waitingforcode >> >> -- Bartosz Konieczny freelance data engineer https://www.waitingforcode.com https://github.com/bartosz25/ https://twitter.com/waitingforcode