Re: Understanding event time wrt watermarking strategy in flink

2024-04-15 Thread Sachin Mittal
Hi Yunfeng,
So regarding the dropping of records for out of order watermark, lats say
records later than T - B will be dropped by the first operator after
watermarking, which is reading from the source.
So then these records will never be forwarded to the step where we do
event-time windowing. Hence those records will never arrive at that step.

Hence records with timestamp T - A - B will never reach my windowing
operator, to get collected by the side outputs.

Is this understanding correct?

If this is the case then shouldn't A be less than B to atleast collect
those records to get included in a particular window.

Basically having allowed lateness A greater than the out of order bound B
won't make sense as records later than T - B would have got dropped at the
source itself.

Please let me know if I am understanding this correctly or am I missing
something?

Thanks
Sachin


On Mon, Apr 15, 2024 at 6:56 AM Yunfeng Zhou 
wrote:

> Hi Sachin,
>
> Firstly sorry for my misunderstanding about watermarking in the last
> email. When you configure an out-of-orderness watermark with a
> tolerance of B, the next watermark emitted after a record with
> timestamp T would be T-B instead of T described in my last email.
>
> Then let's go back to your question. When the Flink job receives n
> records with timestamp Tn, it will set the timestamp of the next
> watermark to be max(Tn - B - 1ms), and that watermark will be emitted
> after the next autoWatermarkInternal is reached. So a record with
> timestamp T will not influence records less than T - B immediately,
> instead it influences the next watermark, and the watermark afterwards
> influences those records.
>
> As for the influence on those late records, Flink operators will drop
> them by default, but you can also gather them for other downstream
> logics. Please refer to
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>
> Based on the analysis above, if you configure allowed lateness to A,
> records with timestamps less than T - A - B will be dropped or
> gathered as side outputs.
>
> Best,
> Yunfeng
>
> On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal  wrote:
> >
> > Hi Yunfeng,
> > I have a question around the tolerance for out of order bound
> watermarking,
> >
> > What I understand that when consuming from source with out of order
> bound set as B, lets say it gets a record with timestamp T.
> > After that it will drop all the subsequent records which arrive with the
> timestamp less than T - B.
> >
> > Please let me know if I understood this correctly.
> >
> > If this is correct, then how does allowed lateness when performing event
> time windowing works ?  Say allowed lateness is set as A,
> > does this mean that value of A should be less than that of B because
> records with timestamp less than T - B would have already been dropped at
> the source.
> >
> > If this is not the case than how does lateness work with our of order
> boundedness ?
> >
> > Thanks
> > Sachin
> >
> >
> > On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com> wrote:
> >>
> >> Hi Sachin,
> >>
> >> 1. When your Flink job performs an operation like map or flatmap, the
> >> output records would be automatically assigned with the same timestamp
> >> as the input record. You don't need to manually assign the timestamp
> >> in each step. So the windowing result in your example should be as you
> >> have expected.
> >>
> >> 2. The frequency of watermarks can be configured by
> >> pipeline.auto-watermark-interval in flink-conf.yaml, or
> >> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> >> the event time related to the Watermark is still T, just that the job
> >> will tolerate any records whose timestamp is in range [T-B, T].
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal 
> wrote:
> >> >
> >> > Hello folks,
> >> > I have few questions:
> >> >
> >> > Say I have a source like this:
> >> >
> >> > final DataStream data =
> >> > env.fromSource(
> >> > source,
> >> >
>  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> >> > .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >> >
> >> >
> >> > My pipeline after this is as followed:
> >> >
> >> > data.flatMap(new MyFlattendData())
> >> > .keyBy(new MyKeySelector())
> >> > .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> >> > .reduce(new MyReducer());
> >> >
> >> >
> >> > First question I have is that the timestamp I assign from the source,
> would it get carried to all steps below to my window ?
> >> > Example say I have timestamped data from source as:
> >> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
> >> >
> >> >  would this get flattened to say:
> >> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
> >> >
> >> > then keyed to say:
> >> > => [ 

Re: Understanding event time wrt watermarking strategy in flink

2024-04-14 Thread Yunfeng Zhou
Hi Sachin,

Firstly sorry for my misunderstanding about watermarking in the last
email. When you configure an out-of-orderness watermark with a
tolerance of B, the next watermark emitted after a record with
timestamp T would be T-B instead of T described in my last email.

Then let's go back to your question. When the Flink job receives n
records with timestamp Tn, it will set the timestamp of the next
watermark to be max(Tn - B - 1ms), and that watermark will be emitted
after the next autoWatermarkInternal is reached. So a record with
timestamp T will not influence records less than T - B immediately,
instead it influences the next watermark, and the watermark afterwards
influences those records.

As for the influence on those late records, Flink operators will drop
them by default, but you can also gather them for other downstream
logics. Please refer to
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output

Based on the analysis above, if you configure allowed lateness to A,
records with timestamps less than T - A - B will be dropped or
gathered as side outputs.

Best,
Yunfeng

On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal  wrote:
>
> Hi Yunfeng,
> I have a question around the tolerance for out of order bound watermarking,
>
> What I understand that when consuming from source with out of order bound set 
> as B, lets say it gets a record with timestamp T.
> After that it will drop all the subsequent records which arrive with the 
> timestamp less than T - B.
>
> Please let me know if I understood this correctly.
>
> If this is correct, then how does allowed lateness when performing event time 
> windowing works ?  Say allowed lateness is set as A,
> does this mean that value of A should be less than that of B because records 
> with timestamp less than T - B would have already been dropped at the source.
>
> If this is not the case than how does lateness work with our of order 
> boundedness ?
>
> Thanks
> Sachin
>
>
> On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou  
> wrote:
>>
>> Hi Sachin,
>>
>> 1. When your Flink job performs an operation like map or flatmap, the
>> output records would be automatically assigned with the same timestamp
>> as the input record. You don't need to manually assign the timestamp
>> in each step. So the windowing result in your example should be as you
>> have expected.
>>
>> 2. The frequency of watermarks can be configured by
>> pipeline.auto-watermark-interval in flink-conf.yaml, or
>> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
>> the event time related to the Watermark is still T, just that the job
>> will tolerate any records whose timestamp is in range [T-B, T].
>>
>> Best,
>> Yunfeng
>>
>> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal  wrote:
>> >
>> > Hello folks,
>> > I have few questions:
>> >
>> > Say I have a source like this:
>> >
>> > final DataStream data =
>> > env.fromSource(
>> > source,
>> > 
>> > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
>> > .withTimestampAssigner((event, timestamp) -> event.timestamp));
>> >
>> >
>> > My pipeline after this is as followed:
>> >
>> > data.flatMap(new MyFlattendData())
>> > .keyBy(new MyKeySelector())
>> > .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> > .reduce(new MyReducer());
>> >
>> >
>> > First question I have is that the timestamp I assign from the source, 
>> > would it get carried to all steps below to my window ?
>> > Example say I have timestamped data from source as:
>> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
>> >
>> >  would this get flattened to say:
>> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
>> >
>> > then keyed to say:
>> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1, 
>> > flatdata4]),...]
>> >
>> > windows:
>> > 1st => [ flatdata1, flatdata2 ]
>> > 2nd => [ flatdata4, ... ]
>> >
>> > Would the windows created before the reduce function be applied be like I 
>> > have illustrated or to have it this way, do I need to output a record at 
>> > each step with the timestamp assigned for that record ?
>> >
>> > Basically is the timestamp assigned when reading from the source pushed 
>> > (retained) down to all the steps below when doing event time window 
>> > operation ?
>> >
>> >
>> > Next question is in my watermark strategy: how do I set the period of the 
>> > watermarking.
>> > Basically from An out-of-order bound B means that once an event with 
>> > timestamp T was encountered, no events older than T - B will follow any 
>> > more when the watermarking is done.
>> >
>> > However, how frequently is watermarking done and when say watermarking, 
>> > the last encountered event was with timestamp T , does this mean watermark 
>> > timestamp would be T - B ?
>> >
>> > How can we control the watermarking period ?
>> >
>> > 

Re: Understanding event time wrt watermarking strategy in flink

2024-04-12 Thread Sachin Mittal
Hi Yunfeng,
I have a question around the tolerance for out of order bound watermarking,

What I understand that when consuming from source with out of order bound
set as B, lets say it gets a record with timestamp T.
After that it will drop all the subsequent records which arrive with the
timestamp less than T - B.

Please let me know if I understood this correctly.

If this is correct, then how does allowed lateness when performing event
time windowing works ?  Say allowed lateness is set as A,
does this mean that value of A should be less than that of B because
records with timestamp less than T - B would have already been dropped at
the source.

If this is not the case than how does lateness work with our of order
boundedness ?

Thanks
Sachin


On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou 
wrote:

> Hi Sachin,
>
> 1. When your Flink job performs an operation like map or flatmap, the
> output records would be automatically assigned with the same timestamp
> as the input record. You don't need to manually assign the timestamp
> in each step. So the windowing result in your example should be as you
> have expected.
>
> 2. The frequency of watermarks can be configured by
> pipeline.auto-watermark-interval in flink-conf.yaml, or
> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> the event time related to the Watermark is still T, just that the job
> will tolerate any records whose timestamp is in range [T-B, T].
>
> Best,
> Yunfeng
>
> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal  wrote:
> >
> > Hello folks,
> > I have few questions:
> >
> > Say I have a source like this:
> >
> > final DataStream data =
> > env.fromSource(
> > source,
> >
>  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> > .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >
> >
> > My pipeline after this is as followed:
> >
> > data.flatMap(new MyFlattendData())
> > .keyBy(new MyKeySelector())
> > .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> > .reduce(new MyReducer());
> >
> >
> > First question I have is that the timestamp I assign from the source,
> would it get carried to all steps below to my window ?
> > Example say I have timestamped data from source as:
> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
> >
> >  would this get flattened to say:
> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
> >
> > then keyed to say:
> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
> flatdata4]),...]
> >
> > windows:
> > 1st => [ flatdata1, flatdata2 ]
> > 2nd => [ flatdata4, ... ]
> >
> > Would the windows created before the reduce function be applied be like
> I have illustrated or to have it this way, do I need to output a record at
> each step with the timestamp assigned for that record ?
> >
> > Basically is the timestamp assigned when reading from the source pushed
> (retained) down to all the steps below when doing event time window
> operation ?
> >
> >
> > Next question is in my watermark strategy: how do I set the period of
> the watermarking.
> > Basically from An out-of-order bound B means that once an event with
> timestamp T was encountered, no events older than T - B will follow any
> more when the watermarking is done.
> >
> > However, how frequently is watermarking done and when say watermarking,
> the last encountered event was with timestamp T , does this mean watermark
> timestamp would be T - B ?
> >
> > How can we control the watermarking period ?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> >
> >
> >
>


Re: Understanding event time wrt watermarking strategy in flink

2024-04-12 Thread Yunfeng Zhou
Hi Sachin,

1. When your Flink job performs an operation like map or flatmap, the
output records would be automatically assigned with the same timestamp
as the input record. You don't need to manually assign the timestamp
in each step. So the windowing result in your example should be as you
have expected.

2. The frequency of watermarks can be configured by
pipeline.auto-watermark-interval in flink-conf.yaml, or
ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
the event time related to the Watermark is still T, just that the job
will tolerate any records whose timestamp is in range [T-B, T].

Best,
Yunfeng

On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal  wrote:
>
> Hello folks,
> I have few questions:
>
> Say I have a source like this:
>
> final DataStream data =
> env.fromSource(
> source,
> 
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> .withTimestampAssigner((event, timestamp) -> event.timestamp));
>
>
> My pipeline after this is as followed:
>
> data.flatMap(new MyFlattendData())
> .keyBy(new MyKeySelector())
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .reduce(new MyReducer());
>
>
> First question I have is that the timestamp I assign from the source, would 
> it get carried to all steps below to my window ?
> Example say I have timestamped data from source as:
> => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
>
>  would this get flattened to say:
> => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
>
> then keyed to say:
> => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1, 
> flatdata4]),...]
>
> windows:
> 1st => [ flatdata1, flatdata2 ]
> 2nd => [ flatdata4, ... ]
>
> Would the windows created before the reduce function be applied be like I 
> have illustrated or to have it this way, do I need to output a record at each 
> step with the timestamp assigned for that record ?
>
> Basically is the timestamp assigned when reading from the source pushed 
> (retained) down to all the steps below when doing event time window operation 
> ?
>
>
> Next question is in my watermark strategy: how do I set the period of the 
> watermarking.
> Basically from An out-of-order bound B means that once an event with 
> timestamp T was encountered, no events older than T - B will follow any more 
> when the watermarking is done.
>
> However, how frequently is watermarking done and when say watermarking, the 
> last encountered event was with timestamp T , does this mean watermark 
> timestamp would be T - B ?
>
> How can we control the watermarking period ?
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>


Understanding event time wrt watermarking strategy in flink

2024-04-11 Thread Sachin Mittal
Hello folks,
I have few questions:

Say I have a source like this:

final DataStream data =
env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((event, timestamp) -> event.timestamp));


My pipeline after this is as followed:

data.flatMap(new MyFlattendData())
.keyBy(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.reduce(new MyReducer());


First question I have is that the timestamp I assign from the source, would
it get carried to all steps below to my window ?
Example say I have timestamped data from source as:
=> [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]

 would this get flattened to say:
=> [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]

then keyed to say:
=> [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
flatdata4]),...]

windows:
1st => [ flatdata1, flatdata2 ]
2nd => [ flatdata4, ... ]

Would the windows created before the reduce function be applied be like I
have illustrated or to have it this way, do I need to output a record at
each step with the timestamp assigned for that record ?

Basically is the timestamp assigned when reading from the source pushed
(retained) down to all the steps below when doing event time window
operation ?


Next question is in my watermark strategy: how do I set the period of the
watermarking.
Basically from An out-of-order bound B means that once an event with
timestamp T was encountered, no events older than T - B will follow any
more when the watermarking is done.

However, how frequently is watermarking done and when say watermarking, the
last encountered event was with timestamp T , does this mean watermark
timestamp would be T - B ?

How can we control the watermarking period ?

Thanks
Sachin