I like some way to expose watermarks to the user. It does affect
the processing of the records, so it is relevant for the users.
`current_watermark()` is a good option.
The implementation of this might be engine specific. But it is a very
relevant concept for authors of streaming pipelines.
Ideally I would like the engine to drop (or write to side output) even for
stateless pipelines for consistency.

On Tue, Oct 10, 2023 at 2:27 AM Bartosz Konieczny <bartkoniec...@gmail.com>
wrote:

> Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound like
> a highly demanded feature from the end users, haven't seen that a lot on
> StackOverflow or mailing lists. I was just curious about the reasons.
>
> Using the arbitrary stateful processing could be indeed a workaround! But
> IMHO it would be easier to expose this watermark value from a function like
> a current_watermark() and let the users do anything with the data. And it
> wouldn't require having the state store overhead to deal with. The function
> could simplify implementing the *side output pattern* where we could
> process the on-time data differently from the late data, e.g. write late
> data to a dedicated space in the lake and facilitate the backfilling for
> the batch pipelines?
>
> With the current_watermark function it could be expressed as a simple:
>
> streamDataset.foreachBatch((dataframe, batchVersion) =>  {
>   dataframe.cache()
>   dataframe.filter(current_watermark() >
> event_time_from_datafarame).writeTo("late_data")
>   dataframe.filter(current_watermark() <=
> event_time_from_datafarame).writeTo("on_time_data")
> })
>
> A little bit as you can do with Apache Flink in fact:
>
> https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81
>
> WDYT?
>
> Best,
> Bartosz.
>
> PS. Will be happy to contribute on that if the feature does make sense ;)
>
> On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
> wrote:
>
>> Technically speaking, "late data" represents the data which cannot be
>> processed due to the fact the engine threw out the state associated with
>> the data already.
>>
>> That said, the only reason watermark does exist for streaming is to
>> handle stateful operators. From the engine's point of view, there is no
>> concept about "late data" for stateless query. It's something users have to
>> leverage "filter" by themselves, without relying on the value of watermark.
>> I guess someone may see some benefit of automatic tracking of trend for
>> event time and want to define late data based on the watermark even in
>> stateless query, but personally I don't hear about the request so far.
>>
>> As a workaround you can leverage flatMapGroupsWithState which provides
>> the value of watermark for you, but I'd agree it's too heavyweight just to
>> do this. If we see consistent demand on it, we could probably look into it
>> and maybe introduce a new SQL function (which works only on streaming -
>> that's probably a major blocker on introduction) on it.
>>
>> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <
>> bartkoniec...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I've been analyzing the watermark propagation added in the 3.5.0
>>> recently and had to return to the basics of watermarks. One question is
>>> still unanswered in my head.
>>>
>>> Why are the watermarks reserved to stateful queries? Can't they apply to
>>> the filtering late date out only?
>>>
>>> The reason is only historical, as the initial design doc
>>> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
>>> mentions the aggregated queries exclusively? Or are there any technical
>>> limitations why writing the jobs like below don't drop late data
>>> automatically?
>>>
>>>     import sparkSession.implicits._
>>>     implicit val sparkContext = sparkSession.sqlContext
>>>     val clicksStream = MemoryStream[Click]
>>>     val clicksWithWatermark = clicksStream.toDF
>>>       .withWatermark("clickTime", "10 minutes")
>>>     val query =
>>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>>       .start()
>>>
>>>     clicksStream.addData(Seq(
>>>       Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>>       Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>>       Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>>>     ))
>>>
>>>
>>>     query.processAllAvailable()
>>>
>>>     clicksStream.addData(Seq(
>>>       Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>>       Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>>       Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>>       Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>>>     ))
>>>     query.processAllAvailable()
>>>
>>> One quick implementation could be adding a new physical plan rule to the
>>> IncrementalExecution
>>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
>>> for the EventTimeWatermark node. That's a first thought, maybe too
>>> simplistic and hiding some pitfalls?
>>>
>>> Best,
>>> Bartosz.
>>> --
>>> freelance data engineer
>>> https://www.waitingforcode.com
>>> https://github.com/bartosz25/
>>> https://twitter.com/waitingforcode
>>>
>>>
>
> --
> Bartosz Konieczny
> freelance data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>

Reply via email to