To me, it sounds strange. I would have expected it to work with
`allowedLateness` and `sideOutput` being defined. I pull in David to have a
look at it. Maybe, he has some more insights. I haven't worked that much
with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in not pulled from the a simulated sideout ( below )
>
> >> Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in *now* pulled from the simulated sideout , essentially the Process
> Function with a reverse predicate to the Filter Process Function.
>
>
> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> And when I added the filter the Exception was not thrown. So the sequence
>> of events
>>
>> * Increased lateness from 12 ( that was what it was initially running
>> with )  to 24 hours
>> * the pipe ran as desired before it blew up with the Exception
>> * masked the issue by increasing the lateness to 48 hours.
>> * It blew up again but now after the added lateness, so essentially the
>> same issue but added lateness let the pipe run for another few hours.
>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>>
>> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
>> VALUE>, KeyedTimedValue<KEY, VALUE>> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventFilter(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx
>> ,
>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<
>> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventSideOutput(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx
>> ,
>> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>>
>>  I am using RocksDB as a backend if that helps.
>>
>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Yes sir. The allowedLateNess and side output always existed.
>>>
>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com>
>>> wrote:
>>>
>>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>>> being added to your pipeline when running into the
>>>> UnsupportedOperationException issue previously?
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>
>>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> As in this is essentially doing what lateness *should* have done  And
>>>>> I think that is a bug. My code now is . Please look at the allowedLateness
>>>>> on the session window.
>>>>>
>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>>> filteredKeyedValue = keyedValue
>>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>>> "late_filter").uid("late_filter");
>>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue
>>>>> = keyedValue
>>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
>>>>> name("late_data").uid("late_data");
>>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>>> aggregate = filteredKeyedValue
>>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>>>> keyBy(value -> value.getKey())
>>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>>>> lateOutputTag)
>>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this
>>>>> .lateNessInMinutes))
>>>>> .name("session_aggregate").uid("session_aggregate");
>>>>>
>>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> I can do that, but I am not certain this is the right filter.  Can
>>>>>> you please validate. That aside I already have the lateness configured 
>>>>>> for
>>>>>> the session window ( the normal withLateNess() )  and this looks like a
>>>>>> session window was not collected and still is alive for some reason ( a
>>>>>> flink bug ? )
>>>>>>
>>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>>> currentWatermark()) {
>>>>>> out.collect(value);
>>>>>> }
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>> based on the error message and the behavior you described,
>>>>>>> introducing a filter for late events is the way to go - just as 
>>>>>>> described
>>>>>>> in the SO thread you mentioned. Usually, you would collect late events 
>>>>>>> in
>>>>>>> some kind of side output [1].
>>>>>>>
>>>>>>> I hope that helps.
>>>>>>> Matthias
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I saw
>>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>>> and this seems to suggest a straight up filter, but I am not sure how 
>>>>>>>> does
>>>>>>>> that filter works as in would it factor is the lateness when filtering 
>>>>>>>> ?
>>>>>>>>
>>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Well it was not a solution after all. We now have a session window
>>>>>>>>> that is stuck with the same issue albeit  after the additional 
>>>>>>>>> lateness. I
>>>>>>>>> had increased the lateness to 2 days and that masked the issue which 
>>>>>>>>> again
>>>>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 
>>>>>>>>> 1 day
>>>>>>>>> ) before. This is very disconcerting.
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey folks,
>>>>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>>>>> fail after retries with this exception. The only thing I had done 
>>>>>>>>>> was to
>>>>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and 
>>>>>>>>>> restart
>>>>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine 
>>>>>>>>>> that
>>>>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>>
>>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput
>>>>>>>>>> .java:204)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor
>>>>>>>>>> .java:65)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>>> .invoke(StreamTask.java:581)
>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:
>>>>>>>>>> 755)
>>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:
>>>>>>>>>> 570)
>>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Reply via email to