Hi, Sunny.
A watermark always comes from one subtask of this window operator's input(s),  
and this window operator will retain all watermarks about multi input subtasks.
The `currentWatermark` in the window operator is the min value of these 
watermarks.

--

    Best!
    Xuyang




At 2024-03-07 23:03:39, "Sunny S" <sunny8...@outlook.in> wrote:

Thanks for the response! Sad that that side output for late data is not 
supported in Table API and SQL. I will start the discussions regarding this.


In the meanwhile, I am trying to use the built-in function 
CURRENT_WATERMARK(rowtime) to be able to collect late data. The scenario I have 
is : I am creating a table with Kafka connector and defining the watermark in 
that table. Reference to this table definition can be found in the mail above. 
Next, I apply a tumbling window SQL query on this table. I want to collect the 
late data for this window operation. I am not clear how would CURRENT_WATERMARK 
function help me in getting the late data for the window operator.


Also, I am a bit confused regarding the way we determine if an event is late 
for a window operator. From the WindowOperator code : 


    protected boolean isElementLate(StreamRecord<IN> element) {
        return (windowAssigner.isEventTime())
                && (element.getTimestamp() + allowedLateness
                        <= internalTimerService.currentWatermark());
    }


it seems the operator maintains a currentWatermark. I am trying to understand 
how does this currentWatermark change during the course of the operator 
receiving the first event that belongs to this window until the time this 
window fires.  


Please help understanding these.


Thanks 


















From: Feng Jin <jinfeng1...@gmail.com>
Sent: 06 March 2024 07:08
To: Sunny S <sunny8...@outlook.in>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Handling late events with Table API / SQL
 


You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering, 
please refer to [1] for details.




https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/


Best,
Feng



On Wed, Mar 6, 2024 at 1:56 AM Sunny S <sunny8...@outlook.in> wrote:

Hi,


I am using Flink SQL to create a table something like this :


CREATE TABLE some-table ( 
      ...,
      ...,
      ...,
      ...,
      event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
      WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
) WITH (
      'connector' = 'kafka',
      'topic' = 'some-topic', +
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'testGroup',
      'value.format' = 'csv'
)


I want to understand how can I deal with late events / out of order events when 
using Flink SQL / Table API? How can I collect the late / out of order events 
to a side output with Table API / SQL?


Thanks 

Reply via email to