Hi, Sunny.
A watermark always comes from one subtask of this window operator's input(s),
and this window operator will retain all watermarks about multi input subtasks.
The `currentWatermark` in the window operator is the min value of these
watermarks.
--
Best!
Xuyang
At 2024-03-07 23:03:39, "Sunny S" <[email protected]> wrote:
Thanks for the response! Sad that that side output for late data is not
supported in Table API and SQL. I will start the discussions regarding this.
In the meanwhile, I am trying to use the built-in function
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have
is : I am creating a table with Kafka connector and defining the watermark in
that table. Reference to this table definition can be found in the mail above.
Next, I apply a tumbling window SQL query on this table. I want to collect the
late data for this window operation. I am not clear how would CURRENT_WATERMARK
function help me in getting the late data for the window operator.
Also, I am a bit confused regarding the way we determine if an event is late
for a window operator. From the WindowOperator code :
protected boolean isElementLate(StreamRecord<IN> element) {
return (windowAssigner.isEventTime())
&& (element.getTimestamp() + allowedLateness
<= internalTimerService.currentWatermark());
}
it seems the operator maintains a currentWatermark. I am trying to understand
how does this currentWatermark change during the course of the operator
receiving the first event that belongs to this window until the time this
window fires.
Please help understanding these.
Thanks
From: Feng Jin <[email protected]>
Sent: 06 March 2024 07:08
To: Sunny S <[email protected]>
Cc: [email protected] <[email protected]>
Subject: Re: Handling late events with Table API / SQL
You can use the CURRENT_WATERMARK(rowtime) function for some filtering,
please refer to [1] for details.
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/
Best,
Feng
On Wed, Mar 6, 2024 at 1:56 AM Sunny S <[email protected]> wrote:
Hi,
I am using Flink SQL to create a table something like this :
CREATE TABLE some-table (
...,
...,
...,
...,
event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'some-topic', +
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'value.format' = 'csv'
)
I want to understand how can I deal with late events / out of order events when
using Flink SQL / Table API? How can I collect the late / out of order events
to a side output with Table API / SQL?
Thanks