You're saying that you used `allowedLateness`/`sideOutputLateData` as described in [1] but without the `LateEventFilter`/`LateEventSideOutput` being added to your pipeline when running into the UnsupportedOperationException issue previously?
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > As in this is essentially doing what lateness *should* have done And I > think that is a bug. My code now is . Please look at the allowedLateness on > the session window. > > SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue > = keyedValue > .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name( > "late_filter").uid("late_filter"); > SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue = > keyedValue > .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name( > "late_data").uid("late_data"); > SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>> > aggregate = filteredKeyedValue > .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy > (value -> value.getKey()) > .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes))) > .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData( > lateOutputTag) > .trigger(PurgingTrigger.of(CountTrigger.of(1))) > .aggregate(new SortAggregate<KEY, VALUE>(), > new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this. > lateNessInMinutes)) > .name("session_aggregate").uid("session_aggregate"); > > On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> I can do that, but I am not certain this is the right filter. Can you >> please validate. That aside I already have the lateness configured for the >> session window ( the normal withLateNess() ) and this looks like a session >> window was not collected and still is alive for some reason ( a flink bug ? >> ) >> >> if (ctx.timestamp() + allowedLateness > ctx.timerService(). >> currentWatermark()) { >> out.collect(value); >> } >> >> >> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> Hi Vishal, >>> based on the error message and the behavior you described, introducing a >>> filter for late events is the way to go - just as described in the SO >>> thread you mentioned. Usually, you would collect late events in some kind >>> of side output [1]. >>> >>> I hope that helps. >>> Matthias >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output >>> >>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> I saw >>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. >>>> and this seems to suggest a straight up filter, but I am not sure how does >>>> that filter works as in would it factor is the lateness when filtering ? >>>> >>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Well it was not a solution after all. We now have a session window >>>>> that is stuck with the same issue albeit after the additional lateness. I >>>>> had increased the lateness to 2 days and that masked the issue which again >>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 >>>>> day >>>>> ) before. This is very disconcerting. >>>>> >>>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp >>>>> of an event-time window cannot become earlier than the current >>>>> watermark by merging. Current watermark: 1619053742129 window: >>>>> TimeWindow{start=1618877773663, end=1618879580402} >>>>> >>>>> >>>>> >>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> Hey folks, >>>>>> I had a pipe with sessionization restarts and then >>>>>> fail after retries with this exception. The only thing I had done was to >>>>>> increase the lateness by 12 hours ( to a day ) in this pipe and restart >>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that >>>>>> increasing the lateness created this and the way I solved this was to >>>>>> increase the lateness further. Could this be if there are TMs in the >>>>>> cluster whose time is off ( as in not synchronized ) ? >>>>>> >>>>>> 2021-04-21 11:27:58 >>>>>> java.lang.UnsupportedOperationException: The end timestamp of an >>>>>> event-time window cannot become earlier than the current watermark by >>>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start= >>>>>> 1618878336107, end=1618880140466} >>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>> WindowOperator$2.merge(WindowOperator.java:339) >>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>> WindowOperator$2.merge(WindowOperator.java:321) >>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>>>>> at org.apache.flink.streaming.runtime.operators.windowing. >>>>>> WindowOperator.processElement(WindowOperator.java:319) >>>>>> at org.apache.flink.streaming.runtime.tasks. >>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord( >>>>>> OneInputStreamTask.java:191) >>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>> .processElement(StreamTaskNetworkInput.java:204) >>>>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>>>>> .emitNext(StreamTaskNetworkInput.java:174) >>>>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>>>>> .processInput(StreamOneInputProcessor.java:65) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .processInput(StreamTask.java:396) >>>>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>>>> .runMailboxLoop(StreamTask.java:617) >>>>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>>>> StreamTask.java:581) >>>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> >>>>>> >>>>>> >>> -- Matthias Pohl | Engineer Follow us @VervericaData Ververica <https://www.ververica.com/> -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton Wehner