Hi Aljoscha,
Thank you, that clarification helps. I am generating a new watermark in the
getCurrentWatermark() method of my assigner, which causes the watermark to
be actually updated every autoWatermark interval. I assumed that actual
watermark updates were caused by only setAutoWatermark() method, which was
incorrect. Your explanation makes it clear.
Note that I have canned this problem for now, and I'll send out a reply to
this chain if I need help to solve it properly again. I don't want to waste
anyone's time.

Thanks!


On Mon, May 18, 2020 at 7:59 PM Aljoscha Krettek <[email protected]>
wrote:

> I think there is some confusion in this thread between the auto
> watermark interval and the interval (length) of an event-time window.
> Maybe clearing that up for everyone helps.
>
> The auto watermark interval is the periodicity (in processing time) at
> which Flink asks the source (or a watermark generator) what the current
> watermark is. The source will keep track of the timestamps that it can
> "respond" to Flink when it asks. For example, if the auto watermark
> interval is set to 1 sec, Flink will update the watermark information
> every second. This doesn't mean, though, that the watermark advances 1
> sec in that time. If you're reading through some historic data the
> watermark could jump by hours in between those 1 second intervals. You
> can also think of this as the sampling interval for updating the current
> watermark.
>
> The window size size independent of the auto watermark interval, you can
> have an arbitrary size here. The auto watermark interval only controls
> how frequent Flink will check and emit the contents of windows, if their
> end timestamp is below the watermark.
>
> I hope that helps. If we're all clear we can look at the concrete
> problem again.
>
> Best,
> Aljoscha
>
> On 30.04.20 12:46, Manas Kale wrote:
> > Hi Timo and Piotrek,
> > Thank you for the suggestions.
> > I have been trying to set up unit tests at the operator granularity, and
> > the blog post's testHarness examples certainly help a lot in this regard.
> >
> > I understood my problem - an upstream session window operator can only
> > report the end of the session window when the watermark has passed
> > {lastObserverEvent + sessionTimeout}. However, my watermark was being
> > updated periodically without taking this into account. It seems I will
> have
> > to delay this notification operator's watermark by sessionTimeout.
> > Another complication is that this sessionTimeout is per-key, so I guess I
> > will have to implement a watermark assigner that extracts the delay
> period
> > from data (similar to DynamicEventTimeWindows).
> >
> > Also, if I do implement such an assigner, would it be helpful to add it
> to
> > Flink? I am happy to contribute if so. Any other comments/observations
> are
> > also welcome!
> >
> > Thank you all for the help,
> > Manas
> >
> >
> > On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <[email protected]>
> wrote:
> >
> >> Hi Manas,
> >>
> >> Adding to the response from Timo, if you don’t have unit
> tests/integration
> >> tests, I would strongly recommend setting them up, as it makes debugging
> >> and testing easier. You can read how to do it for your functions and
> >> operators here [1] and here [2].
> >>
> >> Piotrek
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
> >> [2]
> >>
> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
> >>
> >> On 28 Apr 2020, at 18:45, Timo Walther <[email protected]> wrote:
> >>
> >> Hi Manas,
> >>
> >> Reg. 1: I would recommend to use a debugger in your IDE and check which
> >> watermarks are travelling through your operators.
> >>
> >> Reg. 2: All event-time operations are only performed once the watermark
> >> arrived from all parallel instances. So roughly speaking, in machine
> time
> >> you can assume that the window is computed in watermark update
> intervals.
> >> However, "what is computed" depends on the timestamps of your events and
> >> how those are categorized in windows.
> >>
> >> I hope this helps a bit.
> >>
> >> Regards,
> >> Timo
> >>
> >> On 28.04.20 14:38, Manas Kale wrote:
> >>
> >> Hi David and Piotrek,
> >> Thank you both for your inputs.
> >> I tried an implementation with the algorithm Piotrek suggested and
> David's
> >> example. Although notifications are being generated with the watermark,
> >> subsequent transition events are being received after the watermark has
> >> crossed their timestamps. For example:
> >> state1 @ 100
> >> notification state1@ 110
> >> notification state1@ 120
> >> notification state1@ 130    <----- shouldn't have emitted this
> >> state2 @ 125                     <----- watermark is > 125 at this stage
> >> I think something might be subtly(?) wrong with how I have structured
> >> upstream operators. The allowed lateness is 0 in the watermarkassigner
> >> upstream, and I generate watermarks every x seconds.
> >> The operator that emits state transitions is constructed using the
> >> TumblingWindow approach I described in the first e-mail (so that I can
> >> compute at every watermark update). Note that I can use this approach
> for
> >> state-transition-operator because it only wants to emit transitions, and
> >> nothing in between.
> >> So, two questions:
> >> 1. Any idea on what might be causing this incorrect watermark behaviour?
> >> 2. If I want to perform some computation only when the watermark
> updates,
> >> is using a watermark-aligned EventTimeTumblingWindow (meaning
> >> windowDuration = watermarkUpdateInterval) the correct way to do this?
> >> Regards,
> >> Manas
> >> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[email protected] <
> >> mailto:[email protected] <[email protected]>>> wrote:
> >>     Following up on Piotr's outline, there's an example in the
> >>     documentation of how to use a KeyedProcessFunction to implement an
> >>     event-time tumbling window [1]. Perhaps that can help you get
> started.
> >>     Regards,
> >>     David
> >>     [1]
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
> >>     On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[email protected]
> >>     <mailto:[email protected] <[email protected]>>> wrote:
> >>         Hi,
> >>         I’m not sure, but I don’t think there is an existing window that
> >>         would do exactly what you want. I would suggest to go back to
> >>         the `keyedProcessFunction` (or a custom operator?), and have a
> >>         MapState<TimeStamp, StateWithTimeStamp> currentStates field.
> >>         Your key would be for example a timestamp of the beginning of
> >>         your window. Value would be the latest state in this time
> >>         window, annotated with a timestamp when this state was record.
> >>         On each element:
> >>         1. you determine the window’s begin ts (key of the map)
> >>         2. If it’s first element, register an event time timer to
> >>         publish results for that window’s end TS
> >>         3. look into the `currentStates` if it should be modified (if
> >>         your new element is newer or first value for the given key)
> >>         On even time timer firing
> >>         1. output the state matching to this timer
> >>         2. Check if there is a (more recent) value for next window, and
> >>         if not:
> >>         3. copy the value to next window
> >>         4. Register a timer for this window to fire
> >>         5. Cleanup currentState and remove value for the no longed
> >>         needed key.
> >>         I hope this helps
> >>         Piotrek
> >>
> >>         On 27 Apr 2020, at 12:01, Manas Kale <[email protected]
> >>         <mailto:[email protected] <[email protected]>>> wrote:
> >>
> >>         Hi,
> >>         I have an upstream operator that outputs device state
> >>         transition messages with event timestamps. Meaning it only
> >>         emits output when a transition takes place.
> >>         For example,
> >>         state1 @ 1 PM
> >>         state2 @ 2 PM
> >>         and so on.
> >>
> >>         *Using a downstream operator, I want to emit notification
> >>         messages as per some configured periodicity.* For example, if
> >>         periodicity = 20 min, in the above scenario this operator will
> >>         output :
> >>         state1 notification @ 1PM
> >>         state1 notification @ 1.20PM
> >>         state1 notification @ 1.40PM
> >>          ...
> >>
> >>         *Now the main issue is that I want this to be driven by the
> >>         /watermark /and not by transition events received from
> >>         upstream. *Meaning I would like to see notification events as
> >>         soon as the watermark crosses their timestamps; /not/ when the
> >>         next transition event arrives at the operator (which could be
> >>         hours later, as above).
> >>
> >>         My first solution, using a keyedProcessFunction and timers did
> >>         not work as expected because the order in which transition
> >>         events arrived at this operator was non-deterministic. To
> >>         elaborate, assume a setAutoWatermarkInterval of 10 second.
> >>         If we get transition events :
> >>         state1 @ 1sec
> >>         state2 @ 3 sec
> >>         state3 @ 5 sec
> >>         state1 @ 8 sec
> >>         the order in which these events arrived at my
> >>         keyedProcessFunction was not fixed. To solve this, these
> >>         messages need to be sorted on event time, which led me to my
> >>         second solution.
> >>
> >>         My second solution, using a EventTimeTumblingWindow with size
> >>         = setAutoWatermarkInterval, also does not work. I sorted
> >>         accumulated events in the window and applied
> >>         notification-generation logic on them in order. However, I
> >>         assumed that windows are created even if there are no
> >>         elements. Since this is not the case, this solution generates
> >>         notifications only when the next state tranisition message
> >>         arrives, which could be hours later.
> >>
> >>         Does anyone have any suggestions on how I can implement this?
> >>         Thanks!
> >>
> >>
> >>
> >
>
>

Reply via email to