Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread M Singh
 Thanks David for your detailed answers.   Mans
On Wednesday, December 11, 2019, 08:12:51 AM EST, David Anderson 
 wrote:  
 
 
If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness), 

    (a) it is considered late and included in the window function as a late 
firing ?
An event with a timestamp that falls within the window's boundaries that 
arrives when the current watermark is at window end + 3 will be included as a 
late event that has arrived within the allowed lateness.

Actually, I'm not sure I got this right -- on this point I recommend some 
experimentation, or careful reading of the code.
On Wed, Dec 11, 2019 at 2:08 PM David Anderson  wrote:

I'll attempt to answer your questions.

If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness), 
    (a) it is considered late and included in the window function as a late 
firing ?


An event with a timestamp that falls within the window's boundaries that 
arrives when the current watermark is at window end + 3 will be included as a 
late event that has arrived within the allowed lateness. 
    (b) Are the late firings under the control of the trigger ? 


Yes, the trigger is involved in all firings, late or not. 
    (c) If there are may events like this - are there multiple window function 
invocations ?

With the default event time trigger, each late event causes a late firing. You 
could use a custom trigger to implement other behaviors. 
    (d) Are these events (still within window end + allowed lateness) also 
emitted via the side output late data ?


No. The side output for late events is only used to collect events that fall 
outside the allowed lateness. 
2. If an event arrives after the window end + allowed lateness - 
    (a) Is it excluded from the window function but still emitted from the side 
output late data ?  


Yes. 
    (b) And if it is emitted is there any attribute which indicates for which 
window it was a late event ?  


No, the event is emitted without any additional information.

    (c) Is there any time limit while the late side output remains active for a 
particular window or all late events channeled to it ?

There is no time limit; the late side output remains operative indefinitely. 
Hope that helps,David
On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:

 Thanks Timo for your answer.  I will try the prototype but was wondering if I 
can find some theoretical documentation to give me a sound understanding.
Mans
On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther 
 wrote:  
 
 Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:
> Hi Mans,
> 
> I would recommend to create a little prototype to answer most of your 
> questions in action.
> 
> You can simple do:
> 
> stream = env.fromElements(1L, 2L, 3L, 4L)
>     .assignTimestampsAndWatermarks(
>     new AssignerWithPunctuatedWatermarks{
>     extractTimestamp(e) = e,
>     checkAndGetNextWatermark(e, ts) = new Watermark(e)
>     })
> 
> stream.keyBy(e -> e).window(...).print()
> env.execute()
> 
> This allows to quickly create a stream of event time for testing the 
> semantics.
> 
> I hope this helps. Otherwise of course we can help you in finding the 
> answers to the remaining questions.
> 
> Regards,
> Timo
> 
> 
> 
> On 10.12.19 20:32, M Singh wrote:
>> Hi:
>>
>> I have a few questions about the side output late data.
>>
>> Here is the API
>>
>> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
>> required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
>> default trigger) [.evictor(...)] <- optional: "evictor" (else no 
>> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
>> [.sideOutputLateData(...)] <- optional: "output tag" (else no side 
>> output for late data) .reduce/aggregate/fold/apply() <- required: 
>> "function" [.getSideOutput(...)] <- optional: "output tag"|
>>
>>
>>
>> Apache Flink 1.9 Documentation: Windows 
>> 
>>  
>>
>>
>>
>>
>>
>>     Apache Flink 1.9 Documentation: Windows
>>
>> 
>>  
>>
>>
>>
>> Here is the documentation:
>>
>>
>>   Late elements
>>      
>> considerations
>>  
>>
>>
>> When specifying an allowed lateness greater than 0, the window along 
>> with its content is kept after the watermark passes the end of the 
>> window. In these cases, when a late but not dropped element arrives, 
>> it could trigger another firing for the window. These firings are 
>> called |late fir

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread David Anderson
>
> If we have allowed lateness to be greater than 0 (say 5), then if an event
> which arrives at window end + 3 (within allowed lateness),
>
> (a) it is considered late and included in the window function as a
> late firing ?
> An event with a timestamp that falls within the window's boundaries that
> arrives when the current watermark is at window end + 3 will be included as
> a late event that has arrived within the allowed lateness.
>
> Actually, I'm not sure I got this right -- on this point I recommend some
experimentation, or careful reading of the code.

On Wed, Dec 11, 2019 at 2:08 PM David Anderson  wrote:

> I'll attempt to answer your questions.
>
> If we have allowed lateness to be greater than 0 (say 5), then if an event
>> which arrives at window end + 3 (within allowed lateness),
>> (a) it is considered late and included in the window function as a
>> late firing ?
>>
>
> An event with a timestamp that falls within the window's boundaries that
> arrives when the current watermark is at window end + 3 will be included as
> a late event that has arrived within the allowed lateness.
>
>
>> (b) Are the late firings under the control of the trigger ?
>>
>
> Yes, the trigger is involved in all firings, late or not.
>
>
>> (c) If there are may events like this - are there multiple window
>> function invocations ?
>
>
> With the default event time trigger, each late event causes a late firing.
> You could use a custom trigger to implement other behaviors.
>
>
>> (d) Are these events (still within window end + allowed lateness)
>> also emitted via the side output late data ?
>>
>
> No. The side output for late events is only used to collect events that
> fall outside the allowed lateness.
>
>
>> 2. If an event arrives after the window end + allowed lateness -
>> (a) Is it excluded from the window function but still emitted from
>> the side output late data ?
>>
>
> Yes.
>
>
>> (b) And if it is emitted is there any attribute which indicates for
>> which window it was a late event ?
>>
>
> No, the event is emitted without any additional information.
>
> (c) Is there any time limit while the late side output remains active
>> for a particular window or all late events channeled to it ?
>
>
> There is no time limit; the late side output remains operative
> indefinitely.
>
> Hope that helps,
> David
>
> On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:
>
>> Thanks Timo for your answer.  I will try the prototype but was wondering
>> if I can find some theoretical documentation to give me a sound
>> understanding.
>>
>> Mans
>>
>> On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther <
>> twal...@apache.org> wrote:
>>
>>
>> Little mistake: The key must be any constant instead of `e`.
>>
>>
>> On 11.12.19 11:42, Timo Walther wrote:
>> > Hi Mans,
>> >
>> > I would recommend to create a little prototype to answer most of your
>> > questions in action.
>> >
>> > You can simple do:
>> >
>> > stream = env.fromElements(1L, 2L, 3L, 4L)
>> > .assignTimestampsAndWatermarks(
>> > new AssignerWithPunctuatedWatermarks{
>> > extractTimestamp(e) = e,
>> > checkAndGetNextWatermark(e, ts) = new Watermark(e)
>> > })
>> >
>> > stream.keyBy(e -> e).window(...).print()
>> > env.execute()
>> >
>> > This allows to quickly create a stream of event time for testing the
>> > semantics.
>> >
>> > I hope this helps. Otherwise of course we can help you in finding the
>> > answers to the remaining questions.
>> >
>> > Regards,
>> > Timo
>> >
>> >
>> >
>> > On 10.12.19 20:32, M Singh wrote:
>> >> Hi:
>> >>
>> >> I have a few questions about the side output late data.
>> >>
>> >> Here is the API
>> >>
>> >> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <-
>> >> required: "assigner" [.trigger(...)] <- optional: "trigger" (else
>> >> default trigger) [.evictor(...)] <- optional: "evictor" (else no
>> >> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero)
>> >> [.sideOutputLateData(...)] <- optional: "output tag" (else no side
>> >> output for late data) .reduce/aggregate/fold/apply() <- required:
>> >> "function" [.getSideOutput(...)] <- optional: "output tag"|
>> >>
>> >>
>> >>
>> >> Apache Flink 1.9 Documentation: Windows
>> >> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> Apache Flink 1.9 Documentation: Windows
>> >>
>> >> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>
>> >>
>> >>
>> >>
>> >> Here is the documentation:
>> >>
>> >>
>> >>   Late elements
>> >>
>> >> considerations<
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>
>> >>
>> >>
>> >> When specifying an allowed lateness greater than 0, the window along
>> >> with its con

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread David Anderson
I'll attempt to answer your questions.

If we have allowed lateness to be greater than 0 (say 5), then if an event
> which arrives at window end + 3 (within allowed lateness),
> (a) it is considered late and included in the window function as a
> late firing ?
>

An event with a timestamp that falls within the window's boundaries that
arrives when the current watermark is at window end + 3 will be included as
a late event that has arrived within the allowed lateness.


> (b) Are the late firings under the control of the trigger ?
>

Yes, the trigger is involved in all firings, late or not.


> (c) If there are may events like this - are there multiple window
> function invocations ?


With the default event time trigger, each late event causes a late firing.
You could use a custom trigger to implement other behaviors.


> (d) Are these events (still within window end + allowed lateness) also
> emitted via the side output late data ?
>

No. The side output for late events is only used to collect events that
fall outside the allowed lateness.


> 2. If an event arrives after the window end + allowed lateness -
> (a) Is it excluded from the window function but still emitted from the
> side output late data ?
>

Yes.


> (b) And if it is emitted is there any attribute which indicates for
> which window it was a late event ?
>

No, the event is emitted without any additional information.

(c) Is there any time limit while the late side output remains active
> for a particular window or all late events channeled to it ?


There is no time limit; the late side output remains operative
indefinitely.

Hope that helps,
David

On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:

> Thanks Timo for your answer.  I will try the prototype but was wondering
> if I can find some theoretical documentation to give me a sound
> understanding.
>
> Mans
>
> On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther <
> twal...@apache.org> wrote:
>
>
> Little mistake: The key must be any constant instead of `e`.
>
>
> On 11.12.19 11:42, Timo Walther wrote:
> > Hi Mans,
> >
> > I would recommend to create a little prototype to answer most of your
> > questions in action.
> >
> > You can simple do:
> >
> > stream = env.fromElements(1L, 2L, 3L, 4L)
> > .assignTimestampsAndWatermarks(
> > new AssignerWithPunctuatedWatermarks{
> > extractTimestamp(e) = e,
> > checkAndGetNextWatermark(e, ts) = new Watermark(e)
> > })
> >
> > stream.keyBy(e -> e).window(...).print()
> > env.execute()
> >
> > This allows to quickly create a stream of event time for testing the
> > semantics.
> >
> > I hope this helps. Otherwise of course we can help you in finding the
> > answers to the remaining questions.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 10.12.19 20:32, M Singh wrote:
> >> Hi:
> >>
> >> I have a few questions about the side output late data.
> >>
> >> Here is the API
> >>
> >> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <-
> >> required: "assigner" [.trigger(...)] <- optional: "trigger" (else
> >> default trigger) [.evictor(...)] <- optional: "evictor" (else no
> >> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero)
> >> [.sideOutputLateData(...)] <- optional: "output tag" (else no side
> >> output for late data) .reduce/aggregate/fold/apply() <- required:
> >> "function" [.getSideOutput(...)] <- optional: "output tag"|
> >>
> >>
> >>
> >> Apache Flink 1.9 Documentation: Windows
> >> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>
> >>
> >>
> >>
> >>
> >>
> >> Apache Flink 1.9 Documentation: Windows
> >>
> >> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>
> >>
> >>
> >>
> >> Here is the documentation:
> >>
> >>
> >>   Late elements
> >>
> >> considerations<
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>
> >>
> >>
> >> When specifying an allowed lateness greater than 0, the window along
> >> with its content is kept after the watermark passes the end of the
> >> window. In these cases, when a late but not dropped element arrives,
> >> it could trigger another firing for the window. These firings are
> >> called |late firings|, as they are triggered by late events and in
> >> contrast to the |main firing| which is the first firing of the window.
> >> In case of session windows, late firings can further lead to merging
> >> of windows, as they may “bridge” the gap between two pre-existing,
> >> unmerged windows.
> >>
> >> Attention You should be aware that the elements emitted by a late
> >> firing should be treated as updated results of a previous computation,
> >> i.e., your data stream will contain multiple results for the same
> >> computation. Depending on your application, you nee

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread M Singh
 Thanks Timo for your answer.  I will try the prototype but was wondering if I 
can find some theoretical documentation to give me a sound understanding.
Mans
On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther 
 wrote:  
 
 Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:
> Hi Mans,
> 
> I would recommend to create a little prototype to answer most of your 
> questions in action.
> 
> You can simple do:
> 
> stream = env.fromElements(1L, 2L, 3L, 4L)
>     .assignTimestampsAndWatermarks(
>     new AssignerWithPunctuatedWatermarks{
>     extractTimestamp(e) = e,
>     checkAndGetNextWatermark(e, ts) = new Watermark(e)
>     })
> 
> stream.keyBy(e -> e).window(...).print()
> env.execute()
> 
> This allows to quickly create a stream of event time for testing the 
> semantics.
> 
> I hope this helps. Otherwise of course we can help you in finding the 
> answers to the remaining questions.
> 
> Regards,
> Timo
> 
> 
> 
> On 10.12.19 20:32, M Singh wrote:
>> Hi:
>>
>> I have a few questions about the side output late data.
>>
>> Here is the API
>>
>> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
>> required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
>> default trigger) [.evictor(...)] <- optional: "evictor" (else no 
>> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
>> [.sideOutputLateData(...)] <- optional: "output tag" (else no side 
>> output for late data) .reduce/aggregate/fold/apply() <- required: 
>> "function" [.getSideOutput(...)] <- optional: "output tag"|
>>
>>
>>
>> Apache Flink 1.9 Documentation: Windows 
>> 
>>  
>>
>>
>>
>>
>>
>>     Apache Flink 1.9 Documentation: Windows
>>
>> 
>>  
>>
>>
>>
>> Here is the documentation:
>>
>>
>>   Late elements
>>      
>> considerations
>>  
>>
>>
>> When specifying an allowed lateness greater than 0, the window along 
>> with its content is kept after the watermark passes the end of the 
>> window. In these cases, when a late but not dropped element arrives, 
>> it could trigger another firing for the window. These firings are 
>> called |late firings|, as they are triggered by late events and in 
>> contrast to the |main firing| which is the first firing of the window. 
>> In case of session windows, late firings can further lead to merging 
>> of windows, as they may “bridge” the gap between two pre-existing, 
>> unmerged windows.
>>
>> Attention You should be aware that the elements emitted by a late 
>> firing should be treated as updated results of a previous computation, 
>> i.e., your data stream will contain multiple results for the same 
>> computation. Depending on your application, you need to take these 
>> duplicated results into account or deduplicate them.
>>
>>
>> Questions:
>>
>> 1. If we have allowed lateness to be greater than 0 (say 5), then if 
>> an event which arrives at window end + 3 (within allowed lateness),
>>      (a) it is considered late and  included in the window function as 
>> a late firing ?
>>      (b) Are the late firings under the control of the trigger ?
>>      (c) If there are may events like this - are there multiple window 
>> function invocations ?
>>      (d) Are these events (still within window end + allowed lateness) 
>> also emitted via the side output late data ?
>> 2. If an event arrives after the window end + allowed lateness -
>>      (a) Is it excluded from the window function but still emitted 
>> from the side output late data ?
>>      (b) And if it is emitted is there any attribute which indicates 
>> for which window it was a late event ?
>>      (c) Is there any time limit while the late side output remains 
>> active for a particular window or all late events channeled to it ?
>>
>> Thanks
>>
>> Thanks
>>
>> Mans
>>
>>
>>
>>
>>

  

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread Timo Walther

Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:

Hi Mans,

I would recommend to create a little prototype to answer most of your 
questions in action.


You can simple do:

stream = env.fromElements(1L, 2L, 3L, 4L)
    .assignTimestampsAndWatermarks(
    new AssignerWithPunctuatedWatermarks{
    extractTimestamp(e) = e,
    checkAndGetNextWatermark(e, ts) = new Watermark(e)
    })

stream.keyBy(e -> e).window(...).print()
env.execute()

This allows to quickly create a stream of event time for testing the 
semantics.


I hope this helps. Otherwise of course we can help you in finding the 
answers to the remaining questions.


Regards,
Timo



On 10.12.19 20:32, M Singh wrote:

Hi:

I have a few questions about the side output late data.

Here is the API

|stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
default trigger) [.evictor(...)] <- optional: "evictor" (else no 
evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
[.sideOutputLateData(...)] <- optional: "output tag" (else no side 
output for late data) .reduce/aggregate/fold/apply() <- required: 
"function" [.getSideOutput(...)] <- optional: "output tag"|




Apache Flink 1.9 Documentation: Windows 
 






    Apache Flink 1.9 Documentation: Windows

 




Here is the documentation:


  Late elements
  
considerations 



When specifying an allowed lateness greater than 0, the window along 
with its content is kept after the watermark passes the end of the 
window. In these cases, when a late but not dropped element arrives, 
it could trigger another firing for the window. These firings are 
called |late firings|, as they are triggered by late events and in 
contrast to the |main firing| which is the first firing of the window. 
In case of session windows, late firings can further lead to merging 
of windows, as they may “bridge” the gap between two pre-existing, 
unmerged windows.


Attention You should be aware that the elements emitted by a late 
firing should be treated as updated results of a previous computation, 
i.e., your data stream will contain multiple results for the same 
computation. Depending on your application, you need to take these 
duplicated results into account or deduplicate them.



Questions:

1. If we have allowed lateness to be greater than 0 (say 5), then if 
an event which arrives at window end + 3 (within allowed lateness),
     (a) it is considered late and  included in the window function as 
a late firing ?

     (b) Are the late firings under the control of the trigger ?
     (c) If there are may events like this - are there multiple window 
function invocations ?
     (d) Are these events (still within window end + allowed lateness) 
also emitted via the side output late data ?

2. If an event arrives after the window end + allowed lateness -
     (a) Is it excluded from the window function but still emitted 
from the side output late data ?
     (b) And if it is emitted is there any attribute which indicates 
for which window it was a late event ?
     (c) Is there any time limit while the late side output remains 
active for a particular window or all late events channeled to it ?


Thanks

Thanks

Mans









Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread Timo Walther

Hi Mans,

I would recommend to create a little prototype to answer most of your 
questions in action.


You can simple do:

stream = env.fromElements(1L, 2L, 3L, 4L)
   .assignTimestampsAndWatermarks(
   new AssignerWithPunctuatedWatermarks{
   extractTimestamp(e) = e,
   checkAndGetNextWatermark(e, ts) = new Watermark(e)
   })

stream.keyBy(e -> e).window(...).print()
env.execute()

This allows to quickly create a stream of event time for testing the 
semantics.


I hope this helps. Otherwise of course we can help you in finding the 
answers to the remaining questions.


Regards,
Timo



On 10.12.19 20:32, M Singh wrote:

Hi:

I have a few questions about the side output late data.

Here is the API

|stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
default trigger) [.evictor(...)] <- optional: "evictor" (else no 
evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
[.sideOutputLateData(...)] <- optional: "output tag" (else no side 
output for late data) .reduce/aggregate/fold/apply() <- required: 
"function" [.getSideOutput(...)] <- optional: "output tag"|




Apache Flink 1.9 Documentation: Windows 






Apache Flink 1.9 Documentation: Windows




Here is the documentation:


  Late elements
  
considerations

When specifying an allowed lateness greater than 0, the window along 
with its content is kept after the watermark passes the end of the 
window. In these cases, when a late but not dropped element arrives, it 
could trigger another firing for the window. These firings are called 
|late firings|, as they are triggered by late events and in contrast to 
the |main firing| which is the first firing of the window. In case of 
session windows, late firings can further lead to merging of windows, as 
they may “bridge” the gap between two pre-existing, unmerged windows.


Attention You should be aware that the elements emitted by a late firing 
should be treated as updated results of a previous computation, i.e., 
your data stream will contain multiple results for the same computation. 
Depending on your application, you need to take these duplicated results 
into account or deduplicate them.



Questions:

1. If we have allowed lateness to be greater than 0 (say 5), then if an 
event which arrives at window end + 3 (within allowed lateness),
     (a) it is considered late and  included in the window function as a 
late firing ?

     (b) Are the late firings under the control of the trigger ?
     (c) If there are may events like this - are there multiple window 
function invocations ?
     (d) Are these events (still within window end + allowed lateness) 
also emitted via the side output late data ?

2. If an event arrives after the window end + allowed lateness -
     (a) Is it excluded from the window function but still emitted from 
the side output late data ?
     (b) And if it is emitted is there any attribute which indicates for 
which window it was a late event ?
     (c) Is there any time limit while the late side output remains 
active for a particular window or all late events channeled to it ?


Thanks

Thanks

Mans