Great, thanks for the update.  The upfront filter does work and has for the
last 24 hours and no reason why it should not.

Again I have to note that there is no mail group that has been this
reactive to issues, so thank you again.



On Fri, Apr 23, 2021 at 4:34 AM Matthias Pohl <matth...@ververica.com>
wrote:

> After having talked to David about this issue offline, I decided to create
> a Jira ticket FLINK-22425 [1] to cover this. Thanks for reporting it on the
> mailing list, Vishal. Hopefully, the community has the chance to look into
> it.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22425
>
> On Fri, Apr 23, 2021 at 8:16 AM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> To me, it sounds strange. I would have expected it to work with
>> `allowedLateness` and `sideOutput` being defined. I pull in David to have a
>> look at it. Maybe, he has some more insights. I haven't worked that much
>> with lateness, yet.
>>
>> Matthias
>>
>> On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in not pulled from the a simulated sideout ( below )
>>>
>>> >> Added the Fliter upfront  as below, the pipe has no issues. Also
>>> metrics show that no data is being pushed through the sideoutput and that
>>> data in *now* pulled from the simulated sideout , essentially the
>>> Process Function with a reverse predicate to the Filter Process Function.
>>>
>>>
>>> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> And when I added the filter the Exception was not thrown. So the
>>>> sequence of events
>>>>
>>>> * Increased lateness from 12 ( that was what it was initially running
>>>> with )  to 24 hours
>>>> * the pipe ran as desired before it blew up with the Exception
>>>> * masked the issue by increasing the lateness to 48 hours.
>>>> * It blew up again but now after the added lateness, so essentially the
>>>> same issue but added lateness let the pipe run for another few hours.
>>>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>>>> metrics show that no data is being pushed through the sideoutput and that
>>>> data in not pulled from the a simulated sideout ( below )
>>>>
>>>>
>>>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<
>>>> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> long allowedLateness;
>>>> public LateEventFilter(long allowedLateness){
>>>> this.allowedLateness = allowedLateness;
>>>> }
>>>> @Override
>>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>>> ctx,
>>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>> currentWatermark()) {
>>>> out.collect(value);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> public class LateEventSideOutput extends ProcessFunction<
>>>> KeyedTimedValue<KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>>>> private static final long serialVersionUID = 1L;
>>>>
>>>> long allowedLateness;
>>>> public LateEventSideOutput(long allowedLateness){
>>>> this.allowedLateness = allowedLateness;
>>>> }
>>>> @Override
>>>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context
>>>> ctx,
>>>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>>>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>>>> currentWatermark()) {
>>>> out.collect(value);
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>>  I am using RocksDB as a backend if that helps.
>>>>
>>>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Yes sir. The allowedLateNess and side output always existed.
>>>>>
>>>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>>>>> being added to your pipeline when running into the
>>>>>> UnsupportedOperationException issue previously?
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> As in this is essentially doing what lateness *should* have done
>>>>>>> And I think that is a bug. My code now is . Please look at
>>>>>>> the allowedLateness on the session window.
>>>>>>>
>>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>>> filteredKeyedValue = keyedValue
>>>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>>>>> "late_filter").uid("late_filter");
>>>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>>>> lateKeyedValue = keyedValue
>>>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>>>>>> name("late_data").uid("late_data");
>>>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>>>>> aggregate = filteredKeyedValue
>>>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null
>>>>>>> ).keyBy(value -> value.getKey())
>>>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData
>>>>>>> (lateOutputTag)
>>>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes,
>>>>>>> this.lateNessInMinutes))
>>>>>>> .name("session_aggregate").uid("session_aggregate");
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I can do that, but I am not certain this is the right filter.  Can
>>>>>>>> you please validate. That aside I already have the lateness configured 
>>>>>>>> for
>>>>>>>> the session window ( the normal withLateNess() )  and this looks like a
>>>>>>>> session window was not collected and still is alive for some reason ( a
>>>>>>>> flink bug ? )
>>>>>>>>
>>>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>>>>> currentWatermark()) {
>>>>>>>> out.collect(value);
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <
>>>>>>>> matth...@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>> based on the error message and the behavior you described,
>>>>>>>>> introducing a filter for late events is the way to go - just as 
>>>>>>>>> described
>>>>>>>>> in the SO thread you mentioned. Usually, you would collect late 
>>>>>>>>> events in
>>>>>>>>> some kind of side output [1].
>>>>>>>>>
>>>>>>>>> I hope that helps.
>>>>>>>>> Matthias
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>>>
>>>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I saw
>>>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>>>>> and this seems to suggest a straight up filter, but I am not sure 
>>>>>>>>>> how does
>>>>>>>>>> that filter works as in would it factor is the lateness when 
>>>>>>>>>> filtering ?
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Well it was not a solution after all. We now have a session
>>>>>>>>>>> window that is stuck with the same issue albeit  after the 
>>>>>>>>>>> additional
>>>>>>>>>>> lateness. I had increased the lateness to 2 days and that masked 
>>>>>>>>>>> the issue
>>>>>>>>>>> which again reared it's head after the 2 days ;lateness was over ( 
>>>>>>>>>>> instead
>>>>>>>>>>> of the 1 day ) before. This is very disconcerting.
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>>>>> timestamp of an event-time window cannot become earlier than
>>>>>>>>>>> the current watermark by merging. Current watermark:
>>>>>>>>>>> 1619053742129 window: TimeWindow{start=1618877773663, end=
>>>>>>>>>>> 1618879580402}
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey folks,
>>>>>>>>>>>>                I had a pipe with sessionization restarts and
>>>>>>>>>>>> then fail after retries with this exception. The only thing I had 
>>>>>>>>>>>> done was
>>>>>>>>>>>> to increase the lateness by 12 hours ( to  a day )  in this pipe 
>>>>>>>>>>>> and
>>>>>>>>>>>> restart from SP and it ran for 12 hours plus without issue. I 
>>>>>>>>>>>> cannot
>>>>>>>>>>>> imagine that increasing the lateness created this and the way I 
>>>>>>>>>>>> solved this
>>>>>>>>>>>> was to increase the lateness further. Could this be if there are 
>>>>>>>>>>>> TMs in the
>>>>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>>>>
>>>>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of
>>>>>>>>>>>> an event-time window cannot become earlier than the current 
>>>>>>>>>>>> watermark
>>>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput
>>>>>>>>>>>> .java:204)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174
>>>>>>>>>>>> )
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor
>>>>>>>>>>>> .java:65)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>>>> .invoke(StreamTask.java:581)
>>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task
>>>>>>>>>>>> .java:755)
>>>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:
>>>>>>>>>>>> 570)
>>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to