You're saying that you used `allowedLateness`/`sideOutputLateData` as
described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
being added to your pipeline when running into the
UnsupportedOperationException issue previously?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> As in this is essentially doing what lateness *should* have done  And I
> think that is a bug. My code now is . Please look at the allowedLateness on
> the session window.
>
> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> filteredKeyedValue
> = keyedValue
> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
> "late_filter").uid("late_filter");
> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue =
> keyedValue
> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
> "late_data").uid("late_data");
> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
> aggregate = filteredKeyedValue
> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy
> (value -> value.getKey())
> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
> lateOutputTag)
> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
> .aggregate(new SortAggregate<KEY, VALUE>(),
> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
> lateNessInMinutes))
> .name("session_aggregate").uid("session_aggregate");
>
> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> I can do that, but I am not certain this is the right filter.  Can you
>> please validate. That aside I already have the lateness configured for the
>> session window ( the normal withLateNess() )  and this looks like a session
>> window was not collected and still is alive for some reason ( a flink bug ?
>> )
>>
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>>
>>
>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> Hi Vishal,
>>> based on the error message and the behavior you described, introducing a
>>> filter for late events is the way to go - just as described in the SO
>>> thread you mentioned. Usually, you would collect late events in some kind
>>> of side output [1].
>>>
>>> I hope that helps.
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> I saw
>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>> and this seems to suggest a straight up filter, but I am not sure how does
>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>
>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Well it was not a solution after all. We now have a session window
>>>>> that is stuck with the same issue albeit  after the additional lateness. I
>>>>> had increased the lateness to 2 days and that masked the issue which again
>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 
>>>>> day
>>>>> ) before. This is very disconcerting.
>>>>>
>>>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp
>>>>> of an event-time window cannot become earlier than the current
>>>>> watermark by merging. Current watermark: 1619053742129 window:
>>>>> TimeWindow{start=1618877773663, end=1618879580402}
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> Hey folks,
>>>>>>                I had a pipe with sessionization restarts and then
>>>>>> fail after retries with this exception. The only thing I had done was to
>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>
>>>>>> 2021-04-21 11:27:58
>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>> event-time window cannot become earlier than the current watermark by
>>>>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>>>>> 1618878336107, end=1618880140466}
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>> OneInputStreamTask.java:191)
>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>> .processElement(StreamTaskNetworkInput.java:204)
>>>>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>>>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>>>>> .processInput(StreamOneInputProcessor.java:65)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .processInput(StreamTask.java:396)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>> StreamTask.java:581)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>>
>>>>>>
>>>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica <https://www.ververica.com/>

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner

Reply via email to