<<  Added the Fliter upfront  as below, the pipe has no issues. Also
metrics show that no data is being pushed through the sideoutput and that
data in not pulled from the a simulated sideout ( below )

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
*now* pulled from the simulated sideout , essentially the Process Function
with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> And when I added the filter the Exception was not thrown. So the sequence
> of events
>
> * Increased lateness from 12 ( that was what it was initially running with
> )  to 24 hours
> * the pipe ran as desired before it blew up with the Exception
> * masked the issue by increasing the lateness to 48 hours.
> * It blew up again but now after the added lateness, so essentially the
> same issue but added lateness let the pipe run for another few hours.
> * Added the Fliter upfront  as below, the pipe has no issues. Also metrics
> show that no data is being pushed through the sideoutput and that data in
> not pulled from the a simulated sideout ( below )
>
>
> public class LateEventFilter extends ProcessFunction<KeyedTimedValue<KEY,
> VALUE>, KeyedTimedValue<KEY, VALUE>> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventFilter(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
> public class LateEventSideOutput extends ProcessFunction<KeyedTimedValue<
> KEY, VALUE>, KeyedTimedValue<KEY, VALUE>> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventSideOutput(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue<KEY, VALUE> value, Context ctx,
> Collector<KeyedTimedValue<KEY, VALUE>> out) throws Exception {
> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
>
>  I am using RocksDB as a backend if that helps.
>
> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Yes sir. The allowedLateNess and side output always existed.
>>
>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>> being added to your pipeline when running into the
>>> UnsupportedOperationException issue previously?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> As in this is essentially doing what lateness *should* have done  And
>>>> I think that is a bug. My code now is . Please look at the allowedLateness
>>>> on the session window.
>>>>
>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>>
>>>> filteredKeyedValue = keyedValue
>>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>>> "late_filter").uid("late_filter");
>>>> SingleOutputStreamOperator<KeyedTimedValue<KEY, VALUE>> lateKeyedValue
>>>> = keyedValue
>>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name
>>>> ("late_data").uid("late_data");
>>>> SingleOutputStreamOperator<KeyedSessionWithSessionID<KEY, VALUE>>
>>>> aggregate = filteredKeyedValue
>>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>>> keyBy(value -> value.getKey())
>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>>> lateOutputTag)
>>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>>> .aggregate(new SortAggregate<KEY, VALUE>(),
>>>> new SessionIdProcessWindowFunction<KEY, VALUE>(this.gapInMinutes, this.
>>>> lateNessInMinutes))
>>>> .name("session_aggregate").uid("session_aggregate");
>>>>
>>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> I can do that, but I am not certain this is the right filter.  Can you
>>>>> please validate. That aside I already have the lateness configured for the
>>>>> session window ( the normal withLateNess() )  and this looks like a 
>>>>> session
>>>>> window was not collected and still is alive for some reason ( a flink bug 
>>>>> ?
>>>>> )
>>>>>
>>>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>>>> currentWatermark()) {
>>>>> out.collect(value);
>>>>> }
>>>>>
>>>>>
>>>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl <matth...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>> based on the error message and the behavior you described,
>>>>>> introducing a filter for late events is the way to go - just as described
>>>>>> in the SO thread you mentioned. Usually, you would collect late events in
>>>>>> some kind of side output [1].
>>>>>>
>>>>>> I hope that helps.
>>>>>> Matthias
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>>>>
>>>>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> I saw
>>>>>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>>>>>> and this seems to suggest a straight up filter, but I am not sure how 
>>>>>>> does
>>>>>>> that filter works as in would it factor is the lateness when filtering ?
>>>>>>>
>>>>>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Well it was not a solution after all. We now have a session window
>>>>>>>> that is stuck with the same issue albeit  after the additional 
>>>>>>>> lateness. I
>>>>>>>> had increased the lateness to 2 days and that masked the issue which 
>>>>>>>> again
>>>>>>>> reared it's head after the 2 days ;lateness was over ( instead of the 
>>>>>>>> 1 day
>>>>>>>> ) before. This is very disconcerting.
>>>>>>>>
>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The end
>>>>>>>> timestamp of an event-time window cannot become earlier than the
>>>>>>>> current watermark by merging. Current watermark: 1619053742129
>>>>>>>> window: TimeWindow{start=1618877773663, end=1618879580402}
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey folks,
>>>>>>>>>                I had a pipe with sessionization restarts and then
>>>>>>>>> fail after retries with this exception. The only thing I had done was 
>>>>>>>>> to
>>>>>>>>> increase the lateness by 12 hours ( to  a day )  in this pipe and 
>>>>>>>>> restart
>>>>>>>>> from SP and it ran for 12 hours plus without issue. I cannot imagine 
>>>>>>>>> that
>>>>>>>>> increasing the lateness created this and the way I solved this was to
>>>>>>>>> increase the lateness further. Could this be if there are TMs in the
>>>>>>>>> cluster whose time is off ( as in not synchronized )  ?
>>>>>>>>>
>>>>>>>>> 2021-04-21 11:27:58
>>>>>>>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>>>>>>>> event-time window cannot become earlier than the current watermark
>>>>>>>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>>>>>>>> {start=1618878336107, end=1618880140466}
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>>>>>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>>>>>>>> WindowOperator.processElement(WindowOperator.java:319)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>>>>>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
>>>>>>>>> OneInputStreamTask.java:191)
>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>> StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:
>>>>>>>>> 204)
>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>> StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>>>     at org.apache.flink.streaming.runtime.io.
>>>>>>>>> StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:
>>>>>>>>> 65)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>> .processInput(StreamTask.java:396)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>>>>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>>>>>>> .runMailboxLoop(StreamTask.java:617)
>>>>>>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>>>>>>> StreamTask.java:581)
>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:
>>>>>>>>> 755)
>>>>>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570
>>>>>>>>> )
>>>>>>>>>     at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>
>>> --
>>>
>>> Matthias Pohl | Engineer
>>>
>>> Follow us @VervericaData Ververica <https://www.ververica.com/>
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl
>>> Anton Wehner
>>>
>>

Reply via email to