Hi Manas,

Adding to the response from Timo, if you don’t have unit tests/integration 
tests, I would strongly recommend setting them up, as it makes debugging and 
testing easier. You can read how to do it for your functions and operators here 
[1] and here [2].

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html>
[2] 
https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
 
<https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html>

> On 28 Apr 2020, at 18:45, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Manas,
> 
> Reg. 1: I would recommend to use a debugger in your IDE and check which 
> watermarks are travelling through your operators.
> 
> Reg. 2: All event-time operations are only performed once the watermark 
> arrived from all parallel instances. So roughly speaking, in machine time you 
> can assume that the window is computed in watermark update intervals. 
> However, "what is computed" depends on the timestamps of your events and how 
> those are categorized in windows.
> 
> I hope this helps a bit.
> 
> Regards,
> Timo
> 
> On 28.04.20 14:38, Manas Kale wrote:
>> Hi David and Piotrek,
>> Thank you both for your inputs.
>> I tried an implementation with the algorithm Piotrek suggested and David's 
>> example. Although notifications are being generated with the watermark, 
>> subsequent transition events are being received after the watermark has 
>> crossed their timestamps. For example:
>> state1 @ 100
>> notification state1@ 110
>> notification state1@ 120
>> notification state1@ 130    <----- shouldn't have emitted this
>> state2 @ 125                     <----- watermark is > 125 at this stage
>> I think something might be subtly(?) wrong with how I have structured 
>> upstream operators. The allowed lateness is 0 in the watermarkassigner 
>> upstream, and I generate watermarks every x seconds.
>> The operator that emits state transitions is constructed using the 
>> TumblingWindow approach I described in the first e-mail (so that I can 
>> compute at every watermark update). Note that I can use this approach for 
>> state-transition-operator because it only wants to emit transitions, and 
>> nothing in between.
>> So, two questions:
>> 1. Any idea on what might be causing this incorrect watermark behaviour?
>> 2. If I want to perform some computation only when the watermark updates, is 
>> using a watermark-aligned EventTimeTumblingWindow (meaning windowDuration = 
>> watermarkUpdateInterval) the correct way to do this?
>> Regards,
>> Manas
>> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <da...@ververica.com 
>> <mailto:da...@ververica.com> <mailto:da...@ververica.com 
>> <mailto:da...@ververica.com>>> wrote:
>>    Following up on Piotr's outline, there's an example in the
>>    documentation of how to use a KeyedProcessFunction to implement an
>>    event-time tumbling window [1]. Perhaps that can help you get started.
>>    Regards,
>>    David
>>    [1]
>>    
>> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example>
>>    On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>
>>    <mailto:pi...@ververica.com <mailto:pi...@ververica.com>>> wrote:
>>        Hi,
>>        I’m not sure, but I don’t think there is an existing window that
>>        would do exactly what you want. I would suggest to go back to
>>        the `keyedProcessFunction` (or a custom operator?), and have a
>>        MapState<TimeStamp, StateWithTimeStamp> currentStates field.
>>        Your key would be for example a timestamp of the beginning of
>>        your window. Value would be the latest state in this time
>>        window, annotated with a timestamp when this state was record.
>>        On each element:
>>        1. you determine the window’s begin ts (key of the map)
>>        2. If it’s first element, register an event time timer to
>>        publish results for that window’s end TS
>>        3. look into the `currentStates` if it should be modified (if
>>        your new element is newer or first value for the given key)
>>        On even time timer firing
>>        1. output the state matching to this timer
>>        2. Check if there is a (more recent) value for next window, and
>>        if not:
>>        3. copy the value to next window
>>        4. Register a timer for this window to fire
>>        5. Cleanup currentState and remove value for the no longed
>>        needed key.
>>        I hope this helps
>>        Piotrek
>>>        On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com 
>>> <mailto:manaskal...@gmail.com>
>>>        <mailto:manaskal...@gmail.com <mailto:manaskal...@gmail.com>>> wrote:
>>> 
>>>        Hi,
>>>        I have an upstream operator that outputs device state
>>>        transition messages with event timestamps. Meaning it only
>>>        emits output when a transition takes place.
>>>        For example,
>>>        state1 @ 1 PM
>>>        state2 @ 2 PM
>>>        and so on.
>>> 
>>>        *Using a downstream operator, I want to emit notification
>>>        messages as per some configured periodicity.* For example, if
>>>        periodicity = 20 min, in the above scenario this operator will
>>>        output :
>>>        state1 notification @ 1PM
>>>        state1 notification @ 1.20PM
>>>        state1 notification @ 1.40PM
>>>         ...
>>> 
>>>        *Now the main issue is that I want this to be driven by the
>>>        /watermark /and not by transition events received from
>>>        upstream. *Meaning I would like to see notification events as
>>>        soon as the watermark crosses their timestamps; /not/ when the
>>>        next transition event arrives at the operator (which could be
>>>        hours later, as above).
>>> 
>>>        My first solution, using a keyedProcessFunction and timers did
>>>        not work as expected because the order in which transition
>>>        events arrived at this operator was non-deterministic. To
>>>        elaborate, assume a setAutoWatermarkInterval of 10 second.
>>>        If we get transition events :
>>>        state1 @ 1sec
>>>        state2 @ 3 sec
>>>        state3 @ 5 sec
>>>        state1 @ 8 sec
>>>        the order in which these events arrived at my
>>>        keyedProcessFunction was not fixed. To solve this, these
>>>        messages need to be sorted on event time, which led me to my
>>>        second solution.
>>> 
>>>        My second solution, using a EventTimeTumblingWindow with size
>>>        = setAutoWatermarkInterval, also does not work. I sorted
>>>        accumulated events in the window and applied
>>>        notification-generation logic on them in order. However, I
>>>        assumed that windows are created even if there are no
>>>        elements. Since this is not the case, this solution generates
>>>        notifications only when the next state tranisition message
>>>        arrives, which could be hours later.
>>> 
>>>        Does anyone have any suggestions on how I can implement this?
>>>        Thanks!

Reply via email to