Hi Timo and Piotrek,
Thank you for the suggestions.
I have been trying to set up unit tests at the operator granularity, and
the blog post's testHarness examples certainly help a lot in this regard.
I understood my problem - an upstream session window operator can only
report the end of the session window when the watermark has passed
{lastObserverEvent + sessionTimeout}. However, my watermark was being
updated periodically without taking this into account. It seems I will have
to delay this notification operator's watermark by sessionTimeout.
Another complication is that this sessionTimeout is per-key, so I guess I
will have to implement a watermark assigner that extracts the delay period
from data (similar to DynamicEventTimeWindows).
Also, if I do implement such an assigner, would it be helpful to add it to
Flink? I am happy to contribute if so. Any other comments/observations are
also welcome!
Thank you all for the help,
Manas
On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <[email protected]> wrote:
> Hi Manas,
>
> Adding to the response from Timo, if you don’t have unit tests/integration
> tests, I would strongly recommend setting them up, as it makes debugging
> and testing easier. You can read how to do it for your functions and
> operators here [1] and here [2].
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> [2]
> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
>
> On 28 Apr 2020, at 18:45, Timo Walther <[email protected]> wrote:
>
> Hi Manas,
>
> Reg. 1: I would recommend to use a debugger in your IDE and check which
> watermarks are travelling through your operators.
>
> Reg. 2: All event-time operations are only performed once the watermark
> arrived from all parallel instances. So roughly speaking, in machine time
> you can assume that the window is computed in watermark update intervals.
> However, "what is computed" depends on the timestamps of your events and
> how those are categorized in windows.
>
> I hope this helps a bit.
>
> Regards,
> Timo
>
> On 28.04.20 14:38, Manas Kale wrote:
>
> Hi David and Piotrek,
> Thank you both for your inputs.
> I tried an implementation with the algorithm Piotrek suggested and David's
> example. Although notifications are being generated with the watermark,
> subsequent transition events are being received after the watermark has
> crossed their timestamps. For example:
> state1 @ 100
> notification state1@ 110
> notification state1@ 120
> notification state1@ 130 <----- shouldn't have emitted this
> state2 @ 125 <----- watermark is > 125 at this stage
> I think something might be subtly(?) wrong with how I have structured
> upstream operators. The allowed lateness is 0 in the watermarkassigner
> upstream, and I generate watermarks every x seconds.
> The operator that emits state transitions is constructed using the
> TumblingWindow approach I described in the first e-mail (so that I can
> compute at every watermark update). Note that I can use this approach for
> state-transition-operator because it only wants to emit transitions, and
> nothing in between.
> So, two questions:
> 1. Any idea on what might be causing this incorrect watermark behaviour?
> 2. If I want to perform some computation only when the watermark updates,
> is using a watermark-aligned EventTimeTumblingWindow (meaning
> windowDuration = watermarkUpdateInterval) the correct way to do this?
> Regards,
> Manas
> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[email protected] <
> mailto:[email protected] <[email protected]>>> wrote:
> Following up on Piotr's outline, there's an example in the
> documentation of how to use a KeyedProcessFunction to implement an
> event-time tumbling window [1]. Perhaps that can help you get started.
> Regards,
> David
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
> On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[email protected]
> <mailto:[email protected] <[email protected]>>> wrote:
> Hi,
> I’m not sure, but I don’t think there is an existing window that
> would do exactly what you want. I would suggest to go back to
> the `keyedProcessFunction` (or a custom operator?), and have a
> MapState<TimeStamp, StateWithTimeStamp> currentStates field.
> Your key would be for example a timestamp of the beginning of
> your window. Value would be the latest state in this time
> window, annotated with a timestamp when this state was record.
> On each element:
> 1. you determine the window’s begin ts (key of the map)
> 2. If it’s first element, register an event time timer to
> publish results for that window’s end TS
> 3. look into the `currentStates` if it should be modified (if
> your new element is newer or first value for the given key)
> On even time timer firing
> 1. output the state matching to this timer
> 2. Check if there is a (more recent) value for next window, and
> if not:
> 3. copy the value to next window
> 4. Register a timer for this window to fire
> 5. Cleanup currentState and remove value for the no longed
> needed key.
> I hope this helps
> Piotrek
>
> On 27 Apr 2020, at 12:01, Manas Kale <[email protected]
> <mailto:[email protected] <[email protected]>>> wrote:
>
> Hi,
> I have an upstream operator that outputs device state
> transition messages with event timestamps. Meaning it only
> emits output when a transition takes place.
> For example,
> state1 @ 1 PM
> state2 @ 2 PM
> and so on.
>
> *Using a downstream operator, I want to emit notification
> messages as per some configured periodicity.* For example, if
> periodicity = 20 min, in the above scenario this operator will
> output :
> state1 notification @ 1PM
> state1 notification @ 1.20PM
> state1 notification @ 1.40PM
> ...
>
> *Now the main issue is that I want this to be driven by the
> /watermark /and not by transition events received from
> upstream. *Meaning I would like to see notification events as
> soon as the watermark crosses their timestamps; /not/ when the
> next transition event arrives at the operator (which could be
> hours later, as above).
>
> My first solution, using a keyedProcessFunction and timers did
> not work as expected because the order in which transition
> events arrived at this operator was non-deterministic. To
> elaborate, assume a setAutoWatermarkInterval of 10 second.
> If we get transition events :
> state1 @ 1sec
> state2 @ 3 sec
> state3 @ 5 sec
> state1 @ 8 sec
> the order in which these events arrived at my
> keyedProcessFunction was not fixed. To solve this, these
> messages need to be sorted on event time, which led me to my
> second solution.
>
> My second solution, using a EventTimeTumblingWindow with size
> = setAutoWatermarkInterval, also does not work. I sorted
> accumulated events in the window and applied
> notification-generation logic on them in order. However, I
> assumed that windows are created even if there are no
> elements. Since this is not the case, this solution generates
> notifications only when the next state tranisition message
> arrives, which could be hours later.
>
> Does anyone have any suggestions on how I can implement this?
> Thanks!
>
>
>