I like some way to expose watermarks to the user. It does affect the processing of the records, so it is relevant for the users. `current_watermark()` is a good option. The implementation of this might be engine specific. But it is a very relevant concept for authors of streaming pipelines. Ideally I would like the engine to drop (or write to side output) even for stateless pipelines for consistency.
On Tue, Oct 10, 2023 at 2:27 AM Bartosz Konieczny <bartkoniec...@gmail.com> wrote: > Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound like > a highly demanded feature from the end users, haven't seen that a lot on > StackOverflow or mailing lists. I was just curious about the reasons. > > Using the arbitrary stateful processing could be indeed a workaround! But > IMHO it would be easier to expose this watermark value from a function like > a current_watermark() and let the users do anything with the data. And it > wouldn't require having the state store overhead to deal with. The function > could simplify implementing the *side output pattern* where we could > process the on-time data differently from the late data, e.g. write late > data to a dedicated space in the lake and facilitate the backfilling for > the batch pipelines? > > With the current_watermark function it could be expressed as a simple: > > streamDataset.foreachBatch((dataframe, batchVersion) => { > dataframe.cache() > dataframe.filter(current_watermark() > > event_time_from_datafarame).writeTo("late_data") > dataframe.filter(current_watermark() <= > event_time_from_datafarame).writeTo("on_time_data") > }) > > A little bit as you can do with Apache Flink in fact: > > https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81 > > WDYT? > > Best, > Bartosz. > > PS. Will be happy to contribute on that if the feature does make sense ;) > > On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> > wrote: > >> Technically speaking, "late data" represents the data which cannot be >> processed due to the fact the engine threw out the state associated with >> the data already. >> >> That said, the only reason watermark does exist for streaming is to >> handle stateful operators. From the engine's point of view, there is no >> concept about "late data" for stateless query. It's something users have to >> leverage "filter" by themselves, without relying on the value of watermark. >> I guess someone may see some benefit of automatic tracking of trend for >> event time and want to define late data based on the watermark even in >> stateless query, but personally I don't hear about the request so far. >> >> As a workaround you can leverage flatMapGroupsWithState which provides >> the value of watermark for you, but I'd agree it's too heavyweight just to >> do this. If we see consistent demand on it, we could probably look into it >> and maybe introduce a new SQL function (which works only on streaming - >> that's probably a major blocker on introduction) on it. >> >> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny < >> bartkoniec...@gmail.com> wrote: >> >>> Hi, >>> >>> I've been analyzing the watermark propagation added in the 3.5.0 >>> recently and had to return to the basics of watermarks. One question is >>> still unanswered in my head. >>> >>> Why are the watermarks reserved to stateful queries? Can't they apply to >>> the filtering late date out only? >>> >>> The reason is only historical, as the initial design doc >>> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit> >>> mentions the aggregated queries exclusively? Or are there any technical >>> limitations why writing the jobs like below don't drop late data >>> automatically? >>> >>> import sparkSession.implicits._ >>> implicit val sparkContext = sparkSession.sqlContext >>> val clicksStream = MemoryStream[Click] >>> val clicksWithWatermark = clicksStream.toDF >>> .withWatermark("clickTime", "10 minutes") >>> val query = >>> clicksWithWatermark.writeStream.format("console").option("truncate", false) >>> .start() >>> >>> clicksStream.addData(Seq( >>> Click(1, Timestamp.valueOf("2023-06-10 10:10:00")), >>> Click(2, Timestamp.valueOf("2023-06-10 10:12:00")), >>> Click(3, Timestamp.valueOf("2023-06-10 10:14:00")) >>> )) >>> >>> >>> query.processAllAvailable() >>> >>> clicksStream.addData(Seq( >>> Click(4, Timestamp.valueOf("2023-06-10 11:00:40")), >>> Click(5, Timestamp.valueOf("2023-06-10 11:00:30")), >>> Click(6, Timestamp.valueOf("2023-06-10 11:00:10")), >>> Click(10, Timestamp.valueOf("2023-06-10 10:00:10")) >>> )) >>> query.processAllAvailable() >>> >>> One quick implementation could be adding a new physical plan rule to the >>> IncrementalExecution >>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala> >>> for the EventTimeWatermark node. That's a first thought, maybe too >>> simplistic and hiding some pitfalls? >>> >>> Best, >>> Bartosz. >>> -- >>> freelance data engineer >>> https://www.waitingforcode.com >>> https://github.com/bartosz25/ >>> https://twitter.com/waitingforcode >>> >>> > > -- > Bartosz Konieczny > freelance data engineer > https://www.waitingforcode.com > https://github.com/bartosz25/ > https://twitter.com/waitingforcode > >