Great, thanks for the update. The upfront filter does work and has for the last 24 hours and no reason why it should not.
Again I have to note that there is no mail group that has been this reactive to issues, so thank you again. On Fri, Apr 23, 2021 at 4:34 AM Matthias Pohl <matth...@ververica.com> wrote: > After having talked to David about this issue offline, I decided to create > a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the > mailing list, Vishal. Hopefully, the community has the chance to look into > it. > > Best, > Matthias > > [1] https://issues.apache.org/jira/browse/FLINK-22425 > > On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <matth...@ververica.com> > wrote: > >> To me, it sounds strange. I would have expected it to work with >> `allowedLateness` and `sideOutput` being defined. I pull in David to have a >> look at it. Maybe, he has some more insights. I haven't worked that much >> with lateness, yet. >> >> Matthias >> >> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> << Added the Fliter upfront as below, the pipe has no issues. Also >>> metrics show that no data is being pushed through the sideoutput and that >>> data in not pulled from the a simulated sideout ( below ) >>> >>> >> Added the Fliter upfront as below, the pipe has no issues. Also >>> metrics show that no data is being pushed through the sideoutput and that >>> data in *now* pulled from the simulated sideout , essentially the >>> Process Function with a reverse predicate to the Filter Process Function. >>> >>> >>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> And when I added the filter the Exception was not thrown. So the >>>> sequence of events >>>> >>>> * Increased lateness from 12 ( that was what it was initially running >>>> with ) to 24 hours >>>> * the pipe ran as desired before it blew up with the Exception >>>> * masked the issue by increasing the lateness to 48 hours. >>>> * It blew up again but now after the added lateness, so essentially the >>>> same issue but added lateness let the pipe run for another few hours. >>>> * Added the Fliter upfront as below, the pipe has no issues. Also >>>> metrics show that no data is being pushed through the sideoutput and that >>>> data in not pulled from the a simulated sideout ( below ) >>>> >>>> >>>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue< >>>> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> { >>>> private static final long serialVersionUID = 1L; >>>> >>>> long allowedLateness; >>>> public LateEventFilter(long allowedLateness){ >>>> this.allowedLateness = allowedLateness; >>>> } >>>> @Override >>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context >>>> ctx, >>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { >>>> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >>>> currentWatermark()) { >>>> out.collect(value); >>>> } >>>> } >>>> } >>>> >>>> >>>> public class LateEventSideOutput extends ProcessFunction< >>>> KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> { >>>> private static final long serialVersionUID = 1L; >>>> >>>> long allowedLateness; >>>> public LateEventSideOutput(long allowedLateness){ >>>> this.allowedLateness = allowedLateness; >>>> } >>>> @Override >>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context >>>> ctx, >>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception { >>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService(). >>>> currentWatermark()) { >>>> out.collect(value); >>>> } >>>> } >>>> } >>>> >>>> >>>> >>>> I am using RocksDB as a backend if that helps. >>>> >>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Yes sir. The allowedLateNess and side output always existed. >>>>> >>>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com> >>>>> wrote: >>>>> >>>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as >>>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput` >>>>>> being added to your pipeline when running into the >>>>>> UnsupportedOperationException issue previously? >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>> >>>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> As in this is essentially doing what lateness *should* have done >>>>>>> And I think that is a bug. My code now is . Please look at >>>>>>> the allowedLateness on the session window. >>>>>>> >>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> >>>>>>> filteredKeyedValue = keyedValue >>>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name( >>>>>>> "late_filter").uid("late_filter"); >>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> >>>>>>> lateKeyedValue = keyedValue >>>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)). >>>>>>> name("late_data").uid("late_data"); >>>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> >>>>>>> aggregate = filteredKeyedValue >>>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null >>>>>>> ).keyBy(value -> value.getKey()) >>>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes))) >>>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData >>>>>>> (lateOutputTag) >>>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1))) >>>>>>> .aggregate(new SortAggregate<KEY, VALUE>(), >>>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, >>>>>>> this.lateNessInMinutes)) >>>>>>> .name("session_aggregate").uid("session_aggregate"); >>>>>>> >>>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> I can do that, but I am not certain this is the right filter. Can >>>>>>>> you please validate. That aside I already have the lateness configured >>>>>>>> for >>>>>>>> the session window ( the normal withLateNess() ) and this looks like a >>>>>>>> session window was not collected and still is alive for some reason ( a >>>>>>>> flink bug ? ) >>>>>>>> >>>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >>>>>>>> currentWatermark()) { >>>>>>>> out.collect(value); >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl < >>>>>>>> matth...@ververica.com> wrote: >>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> based on the error message and the behavior you described, >>>>>>>>> introducing a filter for late events is the way to go - just as >>>>>>>>> described >>>>>>>>> in the SO thread you mentioned. Usually, you would collect late >>>>>>>>> events in >>>>>>>>> some kind of side output [1]. >>>>>>>>> >>>>>>>>> I hope that helps. >>>>>>>>> Matthias >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>>>>>>>> >>>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I saw >>>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >>>>>>>>>> and this seems to suggest a straight up filter, but I am not sure >>>>>>>>>> how does >>>>>>>>>> that filter works as in would it factor is the lateness when >>>>>>>>>> filtering ? >>>>>>>>>> >>>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Well it was not a solution after all. We now have a session >>>>>>>>>>> window that is stuck with the same issue albeit after the >>>>>>>>>>> additional >>>>>>>>>>> lateness. I had increased the lateness to 2 days and that masked >>>>>>>>>>> the issue >>>>>>>>>>> which again reared it's head after the 2 days ;lateness was over ( >>>>>>>>>>> instead >>>>>>>>>>> of the 1 day ) before. This is very disconcerting. >>>>>>>>>>> >>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end >>>>>>>>>>> timestamp of an event-time window cannot become earlier than >>>>>>>>>>> the current watermark by merging. Current watermark: >>>>>>>>>>> 1619053742129 window: TimeWindow{start=1618877773663, end= >>>>>>>>>>> 1618879580402} >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey folks, >>>>>>>>>>>> I had a pipe with sessionization restarts and >>>>>>>>>>>> then fail after retries with this exception. The only thing I had >>>>>>>>>>>> done was >>>>>>>>>>>> to increase the lateness by 12 hours ( to a day ) in this pipe >>>>>>>>>>>> and >>>>>>>>>>>> restart from SP and it ran for 12 hours plus without issue. I >>>>>>>>>>>> cannot >>>>>>>>>>>> imagine that increasing the lateness created this and the way I >>>>>>>>>>>> solved this >>>>>>>>>>>> was to increase the lateness further. Could this be if there are >>>>>>>>>>>> TMs in the >>>>>>>>>>>> cluster whose time is off ( as in not synchronized ) ? >>>>>>>>>>>> >>>>>>>>>>>> 2021-04-21 11:27:58 >>>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of >>>>>>>>>>>> an event-time window cannot become earlier than the current >>>>>>>>>>>> watermark >>>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow >>>>>>>>>>>> {start=1618878336107, end=1618880140466} >>>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>>>>>>>>>> OneInputStreamTask.java:191) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput >>>>>>>>>>>> .java:204) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174 >>>>>>>>>>>> ) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.io. >>>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor >>>>>>>>>>>> .java:65) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>>> .processInput(StreamTask.java:396) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>>> .runMailboxLoop(StreamTask.java:617) >>>>>>>>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>>>>>>>> .invoke(StreamTask.java:581) >>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task >>>>>>>>>>>> .java:755) >>>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java: >>>>>>>>>>>> 570) >>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>