Hi Manas,

Reg. 1: I would recommend to use a debugger in your IDE and check which watermarks are travelling through your operators.

Reg. 2: All event-time operations are only performed once the watermark arrived from all parallel instances. So roughly speaking, in machine time you can assume that the window is computed in watermark update intervals. However, "what is computed" depends on the timestamps of your events and how those are categorized in windows.

I hope this helps a bit.

Regards,
Timo

On 28.04.20 14:38, Manas Kale wrote:
Hi David and Piotrek,
Thank you both for your inputs.
I tried an implementation with the algorithm Piotrek suggested and David's example. Although notifications are being generated with the watermark, subsequent transition events are being received after the watermark has crossed their timestamps. For example:
state1 @ 100
notification state1@ 110
notification state1@ 120
notification state1@ 130    <----- shouldn't have emitted this
state2 @ 125                     <----- watermark is > 125 at this stage

I think something might be subtly(?) wrong with how I have structured upstream operators. The allowed lateness is 0 in the watermarkassigner upstream, and I generate watermarks every x seconds. The operator that emits state transitions is constructed using the TumblingWindow approach I described in the first e-mail (so that I can compute at every watermark update). Note that I can use this approach for state-transition-operator because it only wants to emit transitions, and nothing in between.
So, two questions:
1. Any idea on what might be causing this incorrect watermark behaviour?
2. If I want to perform some computation only when the watermark updates, is using a watermark-aligned EventTimeTumblingWindow (meaning windowDuration = watermarkUpdateInterval) the correct way to do this?


Regards,
Manas


On Tue, Apr 28, 2020 at 2:16 AM David Anderson <da...@ververica.com <mailto:da...@ververica.com>> wrote:

    Following up on Piotr's outline, there's an example in the
    documentation of how to use a KeyedProcessFunction to implement an
    event-time tumbling window [1]. Perhaps that can help you get started.

    Regards,
    David

    [1]
    
https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example


    On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <pi...@ververica.com
    <mailto:pi...@ververica.com>> wrote:

        Hi,

        I’m not sure, but I don’t think there is an existing window that
        would do exactly what you want. I would suggest to go back to
        the `keyedProcessFunction` (or a custom operator?), and have a
        MapState<TimeStamp, StateWithTimeStamp> currentStates field.
        Your key would be for example a timestamp of the beginning of
        your window. Value would be the latest state in this time
        window, annotated with a timestamp when this state was record.

        On each element:

        1. you determine the window’s begin ts (key of the map)
        2. If it’s first element, register an event time timer to
        publish results for that window’s end TS
        3. look into the `currentStates` if it should be modified (if
        your new element is newer or first value for the given key)

        On even time timer firing
        1. output the state matching to this timer
        2. Check if there is a (more recent) value for next window, and
        if not:
        3. copy the value to next window
        4. Register a timer for this window to fire

        5. Cleanup currentState and remove value for the no longed
        needed key.

        I hope this helps

        Piotrek

        On 27 Apr 2020, at 12:01, Manas Kale <manaskal...@gmail.com
        <mailto:manaskal...@gmail.com>> wrote:

        Hi,
        I have an upstream operator that outputs device state
        transition messages with event timestamps. Meaning it only
        emits output when a transition takes place.
        For example,
        state1 @ 1 PM
        state2 @ 2 PM
        and so on.

        *Using a downstream operator, I want to emit notification
        messages as per some configured periodicity.* For example, if
        periodicity = 20 min, in the above scenario this operator will
        output :
        state1 notification @ 1PM
        state1 notification @ 1.20PM
        state1 notification @ 1.40PM
         ...

        *Now the main issue is that I want this to be driven by the
        /watermark /and not by transition events received from
        upstream. *Meaning I would like to see notification events as
        soon as the watermark crosses their timestamps; /not/ when the
        next transition event arrives at the operator (which could be
        hours later, as above).

        My first solution, using a keyedProcessFunction and timers did
        not work as expected because the order in which transition
        events arrived at this operator was non-deterministic. To
        elaborate, assume a setAutoWatermarkInterval of 10 second.
        If we get transition events :
        state1 @ 1sec
        state2 @ 3 sec
        state3 @ 5 sec
        state1 @ 8 sec
        the order in which these events arrived at my
        keyedProcessFunction was not fixed. To solve this, these
        messages need to be sorted on event time, which led me to my
        second solution.

        My second solution, using a EventTimeTumblingWindow with size
        = setAutoWatermarkInterval, also does not work. I sorted
        accumulated events in the window and applied
        notification-generation logic on them in order. However, I
        assumed that windows are created even if there are no
        elements. Since this is not the case, this solution generates
        notifications only when the next state tranisition message
        arrives, which could be hours later.

        Does anyone have any suggestions on how I can implement this?
        Thanks!





Reply via email to