Hi Aljoscha, Thank you, that clarification helps. I am generating a new watermark in the getCurrentWatermark() method of my assigner, which causes the watermark to be actually updated every autoWatermark interval. I assumed that actual watermark updates were caused by only setAutoWatermark() method, which was incorrect. Your explanation makes it clear. Note that I have canned this problem for now, and I'll send out a reply to this chain if I need help to solve it properly again. I don't want to waste anyone's time.
Thanks! On Mon, May 18, 2020 at 7:59 PM Aljoscha Krettek <[email protected]> wrote: > I think there is some confusion in this thread between the auto > watermark interval and the interval (length) of an event-time window. > Maybe clearing that up for everyone helps. > > The auto watermark interval is the periodicity (in processing time) at > which Flink asks the source (or a watermark generator) what the current > watermark is. The source will keep track of the timestamps that it can > "respond" to Flink when it asks. For example, if the auto watermark > interval is set to 1 sec, Flink will update the watermark information > every second. This doesn't mean, though, that the watermark advances 1 > sec in that time. If you're reading through some historic data the > watermark could jump by hours in between those 1 second intervals. You > can also think of this as the sampling interval for updating the current > watermark. > > The window size size independent of the auto watermark interval, you can > have an arbitrary size here. The auto watermark interval only controls > how frequent Flink will check and emit the contents of windows, if their > end timestamp is below the watermark. > > I hope that helps. If we're all clear we can look at the concrete > problem again. > > Best, > Aljoscha > > On 30.04.20 12:46, Manas Kale wrote: > > Hi Timo and Piotrek, > > Thank you for the suggestions. > > I have been trying to set up unit tests at the operator granularity, and > > the blog post's testHarness examples certainly help a lot in this regard. > > > > I understood my problem - an upstream session window operator can only > > report the end of the session window when the watermark has passed > > {lastObserverEvent + sessionTimeout}. However, my watermark was being > > updated periodically without taking this into account. It seems I will > have > > to delay this notification operator's watermark by sessionTimeout. > > Another complication is that this sessionTimeout is per-key, so I guess I > > will have to implement a watermark assigner that extracts the delay > period > > from data (similar to DynamicEventTimeWindows). > > > > Also, if I do implement such an assigner, would it be helpful to add it > to > > Flink? I am happy to contribute if so. Any other comments/observations > are > > also welcome! > > > > Thank you all for the help, > > Manas > > > > > > On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <[email protected]> > wrote: > > > >> Hi Manas, > >> > >> Adding to the response from Timo, if you don’t have unit > tests/integration > >> tests, I would strongly recommend setting them up, as it makes debugging > >> and testing easier. You can read how to do it for your functions and > >> operators here [1] and here [2]. > >> > >> Piotrek > >> > >> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html > >> [2] > >> > https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html > >> > >> On 28 Apr 2020, at 18:45, Timo Walther <[email protected]> wrote: > >> > >> Hi Manas, > >> > >> Reg. 1: I would recommend to use a debugger in your IDE and check which > >> watermarks are travelling through your operators. > >> > >> Reg. 2: All event-time operations are only performed once the watermark > >> arrived from all parallel instances. So roughly speaking, in machine > time > >> you can assume that the window is computed in watermark update > intervals. > >> However, "what is computed" depends on the timestamps of your events and > >> how those are categorized in windows. > >> > >> I hope this helps a bit. > >> > >> Regards, > >> Timo > >> > >> On 28.04.20 14:38, Manas Kale wrote: > >> > >> Hi David and Piotrek, > >> Thank you both for your inputs. > >> I tried an implementation with the algorithm Piotrek suggested and > David's > >> example. Although notifications are being generated with the watermark, > >> subsequent transition events are being received after the watermark has > >> crossed their timestamps. For example: > >> state1 @ 100 > >> notification state1@ 110 > >> notification state1@ 120 > >> notification state1@ 130 <----- shouldn't have emitted this > >> state2 @ 125 <----- watermark is > 125 at this stage > >> I think something might be subtly(?) wrong with how I have structured > >> upstream operators. The allowed lateness is 0 in the watermarkassigner > >> upstream, and I generate watermarks every x seconds. > >> The operator that emits state transitions is constructed using the > >> TumblingWindow approach I described in the first e-mail (so that I can > >> compute at every watermark update). Note that I can use this approach > for > >> state-transition-operator because it only wants to emit transitions, and > >> nothing in between. > >> So, two questions: > >> 1. Any idea on what might be causing this incorrect watermark behaviour? > >> 2. If I want to perform some computation only when the watermark > updates, > >> is using a watermark-aligned EventTimeTumblingWindow (meaning > >> windowDuration = watermarkUpdateInterval) the correct way to do this? > >> Regards, > >> Manas > >> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[email protected] < > >> mailto:[email protected] <[email protected]>>> wrote: > >> Following up on Piotr's outline, there's an example in the > >> documentation of how to use a KeyedProcessFunction to implement an > >> event-time tumbling window [1]. Perhaps that can help you get > started. > >> Regards, > >> David > >> [1] > >> > >> > https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example > >> On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[email protected] > >> <mailto:[email protected] <[email protected]>>> wrote: > >> Hi, > >> I’m not sure, but I don’t think there is an existing window that > >> would do exactly what you want. I would suggest to go back to > >> the `keyedProcessFunction` (or a custom operator?), and have a > >> MapState<TimeStamp, StateWithTimeStamp> currentStates field. > >> Your key would be for example a timestamp of the beginning of > >> your window. Value would be the latest state in this time > >> window, annotated with a timestamp when this state was record. > >> On each element: > >> 1. you determine the window’s begin ts (key of the map) > >> 2. If it’s first element, register an event time timer to > >> publish results for that window’s end TS > >> 3. look into the `currentStates` if it should be modified (if > >> your new element is newer or first value for the given key) > >> On even time timer firing > >> 1. output the state matching to this timer > >> 2. Check if there is a (more recent) value for next window, and > >> if not: > >> 3. copy the value to next window > >> 4. Register a timer for this window to fire > >> 5. Cleanup currentState and remove value for the no longed > >> needed key. > >> I hope this helps > >> Piotrek > >> > >> On 27 Apr 2020, at 12:01, Manas Kale <[email protected] > >> <mailto:[email protected] <[email protected]>>> wrote: > >> > >> Hi, > >> I have an upstream operator that outputs device state > >> transition messages with event timestamps. Meaning it only > >> emits output when a transition takes place. > >> For example, > >> state1 @ 1 PM > >> state2 @ 2 PM > >> and so on. > >> > >> *Using a downstream operator, I want to emit notification > >> messages as per some configured periodicity.* For example, if > >> periodicity = 20 min, in the above scenario this operator will > >> output : > >> state1 notification @ 1PM > >> state1 notification @ 1.20PM > >> state1 notification @ 1.40PM > >> ... > >> > >> *Now the main issue is that I want this to be driven by the > >> /watermark /and not by transition events received from > >> upstream. *Meaning I would like to see notification events as > >> soon as the watermark crosses their timestamps; /not/ when the > >> next transition event arrives at the operator (which could be > >> hours later, as above). > >> > >> My first solution, using a keyedProcessFunction and timers did > >> not work as expected because the order in which transition > >> events arrived at this operator was non-deterministic. To > >> elaborate, assume a setAutoWatermarkInterval of 10 second. > >> If we get transition events : > >> state1 @ 1sec > >> state2 @ 3 sec > >> state3 @ 5 sec > >> state1 @ 8 sec > >> the order in which these events arrived at my > >> keyedProcessFunction was not fixed. To solve this, these > >> messages need to be sorted on event time, which led me to my > >> second solution. > >> > >> My second solution, using a EventTimeTumblingWindow with size > >> = setAutoWatermarkInterval, also does not work. I sorted > >> accumulated events in the window and applied > >> notification-generation logic on them in order. However, I > >> assumed that windows are created even if there are no > >> elements. Since this is not the case, this solution generates > >> notifications only when the next state tranisition message > >> arrives, which could be hours later. > >> > >> Does anyone have any suggestions on how I can implement this? > >> Thanks! > >> > >> > >> > > > >
