Thank you for the clarification, Jungtaek 🙏 Indeed, it doesn't sound like
a highly demanded feature from the end users, haven't seen that a lot on
StackOverflow or mailing lists. I was just curious about the reasons.

Using the arbitrary stateful processing could be indeed a workaround! But
IMHO it would be easier to expose this watermark value from a function like
a current_watermark() and let the users do anything with the data. And it
wouldn't require having the state store overhead to deal with. The function
could simplify implementing the *side output pattern* where we could
process the on-time data differently from the late data, e.g. write late
data to a dedicated space in the lake and facilitate the backfilling for
the batch pipelines?

With the current_watermark function it could be expressed as a simple:

streamDataset.foreachBatch((dataframe, batchVersion) =>  {
  dataframe.cache()
  dataframe.filter(current_watermark() >
event_time_from_datafarame).writeTo("late_data")
  dataframe.filter(current_watermark() <=
event_time_from_datafarame).writeTo("on_time_data")
})

A little bit as you can do with Apache Flink in fact:
https://github.com/immerok/recipes/blob/main/late-data-to-sink/src/main/java/com/immerok/cookbook/LateDataToSeparateSink.java#L81

WDYT?

Best,
Bartosz.

PS. Will be happy to contribute on that if the feature does make sense ;)

On Tue, Oct 10, 2023 at 3:23 AM Jungtaek Lim <kabhwan.opensou...@gmail.com>
wrote:

> Technically speaking, "late data" represents the data which cannot be
> processed due to the fact the engine threw out the state associated with
> the data already.
>
> That said, the only reason watermark does exist for streaming is to handle
> stateful operators. From the engine's point of view, there is no concept
> about "late data" for stateless query. It's something users have to
> leverage "filter" by themselves, without relying on the value of watermark.
> I guess someone may see some benefit of automatic tracking of trend for
> event time and want to define late data based on the watermark even in
> stateless query, but personally I don't hear about the request so far.
>
> As a workaround you can leverage flatMapGroupsWithState which provides the
> value of watermark for you, but I'd agree it's too heavyweight just to do
> this. If we see consistent demand on it, we could probably look into it and
> maybe introduce a new SQL function (which works only on streaming - that's
> probably a major blocker on introduction) on it.
>
> On Mon, Oct 9, 2023 at 11:03 AM Bartosz Konieczny <bartkoniec...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I've been analyzing the watermark propagation added in the 3.5.0 recently
>> and had to return to the basics of watermarks. One question is still
>> unanswered in my head.
>>
>> Why are the watermarks reserved to stateful queries? Can't they apply to
>> the filtering late date out only?
>>
>> The reason is only historical, as the initial design doc
>> <https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/edit>
>> mentions the aggregated queries exclusively? Or are there any technical
>> limitations why writing the jobs like below don't drop late data
>> automatically?
>>
>>     import sparkSession.implicits._
>>     implicit val sparkContext = sparkSession.sqlContext
>>     val clicksStream = MemoryStream[Click]
>>     val clicksWithWatermark = clicksStream.toDF
>>       .withWatermark("clickTime", "10 minutes")
>>     val query =
>> clicksWithWatermark.writeStream.format("console").option("truncate", false)
>>       .start()
>>
>>     clicksStream.addData(Seq(
>>       Click(1, Timestamp.valueOf("2023-06-10 10:10:00")),
>>       Click(2, Timestamp.valueOf("2023-06-10 10:12:00")),
>>       Click(3, Timestamp.valueOf("2023-06-10 10:14:00"))
>>     ))
>>
>>
>>     query.processAllAvailable()
>>
>>     clicksStream.addData(Seq(
>>       Click(4, Timestamp.valueOf("2023-06-10 11:00:40")),
>>       Click(5, Timestamp.valueOf("2023-06-10 11:00:30")),
>>       Click(6, Timestamp.valueOf("2023-06-10 11:00:10")),
>>       Click(10, Timestamp.valueOf("2023-06-10 10:00:10"))
>>     ))
>>     query.processAllAvailable()
>>
>> One quick implementation could be adding a new physical plan rule to the
>> IncrementalExecution
>> <https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala>
>> for the EventTimeWatermark node. That's a first thought, maybe too
>> simplistic and hiding some pitfalls?
>>
>> Best,
>> Bartosz.
>> --
>> freelance data engineer
>> https://www.waitingforcode.com
>> https://github.com/bartosz25/
>> https://twitter.com/waitingforcode
>>
>>

-- 
Bartosz Konieczny
freelance data engineer
https://www.waitingforcode.com
https://github.com/bartosz25/
https://twitter.com/waitingforcode

Reply via email to