Hi Yan, Thanks for providing the logs and opening the JIRA issue! Let's continue the discussion there.
Best, Fabian 2018-06-05 1:26 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>: > Hi Fabian, > > I added some trace logs in ProcTimeBoundedRangeOver and think it should > be a bug. The log should explain how cleanup_time_1 bypasses the > needToCleanupState > check and causes NPE. A jira ticket [1] is created. > > Best > Yan > > > *[ts:1528149296456] [label:state_ttl_update] register for cleanup at > 1528150096456(CLEANUP_TIME_1), because of Row:(orderId:001,userId:U123)* > *[ts:1528149296456] [label:register_pt] register for process input at > 1528149296457, because of Row:(orderId:001,userId:U123)* > *[ts:1528149296458] [label:state_apply] ontimer at 1528149296457, apply > Row:(orderId:001,userId:U123) to accumulator* > > *[ts:1528149885813] [label:state_ttl_update] register at > 1528150685813(CLEANUP_TIME_2), because of Row:(orderId:002,userId:U123)* > *[ts:1528149885813] [label:register_pt] register for process input at > 1528149885814, because of Row:(orderId:002,userId:U123)* > *[ts:1528149885814] [label:state_apply] ontimer at 1528149885814, apply > Row:(orderId:002,userId:U123) to accumulator* > > *[ts:1528150096460] [label:NO_ELEMENTS_IN_STATE] ontimer at > 1528150096456(CLEANUP_TIME_1), bypass needToCleanupState check, however > rowMapState is {key:1528150096455, value:[]}* > > *[ts:1528150685815] [label:state_timeout] ontimer at > 1528150685813(CLEANUP_TIME_2), clean/empty the rowMapState > [{key:1528149885813, value:[Row:(orderId:002,userId:U123)]}]* > > > > > > > > > [1] : https://issues.apache.org/jira/browse/FLINK-9524 > > > ------------------------------ > *From:* Yan Zhou [FDS Science] <yz...@coupang.com> > *Sent:* Monday, June 4, 2018 4:05 PM > *To:* Fabian Hueske > > *Cc:* Dawid Wysakowicz; user > *Subject:* Re: NPE in flink sql over-window > > > Hi Fabian, > > > Yes, the NPE cause the job failure and recovery( instead of being the > result of a recovery). > > And yet, during the recovery, the same exceptions are thrown from same > line. > > > Best > > Yan > ------------------------------ > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Thursday, May 31, 2018 12:09:03 AM > *To:* Yan Zhou [FDS Science] > *Cc:* Dawid Wysakowicz; user > *Subject:* Re: NPE in flink sql over-window > > Hi Yan, > > Thanks for the details and for digging into the issue. > If I got it right, the NPE caused the job failure and recovery (instead of > being the result of a recovery), correct? > > Best, Fabian > > 2018-05-31 7:00 GMT+02:00 Yan Zhou [FDS Science] <yz...@coupang.com>: > > Thanks for the replay. > > > Yes, it only happen if I config the idle state retention times. The error > occurs the first time before the first recovery. I haven't run with > proctime but rowtime in flink 1.4.x. I am not sure if it will cause > problems with proctime in 1.4.x. > > > I am adding some trace log for ProcTimeBoundedRangeOver. I will update > with my test result and fire a JIRA after that. > > > Best > > Yan > ------------------------------ > *From:* Fabian Hueske <fhue...@gmail.com> > *Sent:* Wednesday, May 30, 2018 1:43:01 AM > *To:* Dawid Wysakowicz > *Cc:* user > *Subject:* Re: NPE in flink sql over-window > > Hi, > > Dawid's analysis is certainly correct, but looking at the code this should > not happen. > > I have a few questions: > - You said this only happens if you configure idle state retention times, > right? > - Does the error occur the first time without a previous recovery? > - Did you run the same query on Flink 1.4.x without any problems? > > Thanks, Fabian > > 2018-05-30 9:25 GMT+02:00 Dawid Wysakowicz <dwysakow...@apache.org>: > > Hi Yan, > > > I think it is a bug in the ProcTimeBoundedRangeOver. It tries to access a > list of elements that was already cleared and does not check against null. > Could you please file a JIRA for that? > > > Best, > > Dawid > > On 30/05/18 08:27, Yan Zhou [FDS Science] wrote: > > I also get warnning that CodeCache is full around that time. It's printed > by JVM and doesn't have timestamp. But I suspect that it's because so > many failure recoveries from checkpoint and the sql queries are dynamically > compiled too many times. > > > > *Java HotSpot(TM) 64-Bit Server VM warning: CodeCache is full. Compiler > has been disabled.* > *Java HotSpot(TM) 64-Bit Server VM warning: Try increasing the code cache > size using -XX:ReservedCodeCacheSize=* > *CodeCache: size=245760Kb used=244114Kb max_used=244146Kb free=1645Kb* > *bounds [0x00007fa4fd000000, 0x00007fa50c000000, 0x00007fa50c000000]* > *total_blobs=54308 nmethods=53551 adapters=617* > *compilation: disabled (not enough contiguous free space left)* > > > > ------------------------------ > *From:* Yan Zhou [FDS Science] <yz...@coupang.com> <yz...@coupang.com> > *Sent:* Tuesday, May 29, 2018 10:52:18 PM > *To:* user@flink.apache.org > *Subject:* NPE in flink sql over-window > > > Hi, > > I am using flink sql 1.5.0. My application throws NPE. And after it > recover from checkpoint automatically, it throws NPE immediately from same > line of code. > > > My application read message from kafka, convert the datastream into a > table, issue an Over-window aggregation and write the result into a sink. > NPE throws from class ProcTimeBoundedRangeOver. Please see exception log > at the bottom. > > > The exceptions always happens after the application started for > *maxIdleStateRetentionTime > *time. What could be the possible causes? > > > Best > > Yan > > > *2018-05-27 11:03:37,656 INFO org.apache.flink.runtime.taskmanager.Task > - over: (PARTITION BY: uid, ORDER BY: proctime, > RANGEBETWEEN 86400000 PRECEDI* > *NG AND CURRENT ROW, select: (id, uid, proctime, group_concat($7) AS > w0$o0)) -> select: * > *(id, uid, proctime, w0$o0 AS EXPR$3) -> to: Row -> Flat Map -> Filter -> > Sink: Unnamed (3/15) (327* > *efe96243bbfdf1f1e40a3372f64aa) switched from RUNNING to FAILED.* > *TimerException{java.lang.NullPointerException}* > * at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)* > * at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* > * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* > * at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)* > * at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)* > * at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* > * at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* > * at java.lang.Thread.run(Thread.java:748)* > *Caused by: java.lang.NullPointerException* > * at > org.apache.flink.table.runtime.aggregate.ProcTimeBoundedRangeOverWithLog.onTimer(ProcTimeBoundedRangeOver.scala:181)* > * at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.invokeUserFunction(LegacyKeyedProcessOperator.java:97)* > * at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.onProcessingTime(LegacyKeyedProcessOperator.java:81)* > * at > org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime(HeapInternalTimerService.java:266)* > * at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)* > > > > > > >