Hi Timo, thanks for the help here, wrapping the MapView in a case class indeed solved the problem. It was not immediately apparent from the documentation that using a MapView as top level accumulator would cause an issue. it seemed a straightforward intuitive way to use it :)
Cheers Clemens On Wed, Jul 14, 2021 at 10:19 PM Timo Walther <twal...@apache.org> wrote: > Hi Clemens, > > first of all can you try to use the MapView within an accumulator POJO > class. This might solve your exception. I'm not sure if we support the > views as top-level accumulators. > > In any case this seems to be a bug. I will open an issue once I get you > feedback. We might simply throw an exception for top-level usage then. > > Regards, > Timo > > > > On 14.07.21 06:33, Clemens Valiente wrote: > > Hi, > > > > we created a new AggregateFunction with Accumulator as Mapview as follows > > > > class CountDistinctAggFunction[T] extends > > AggregateFunction[lang.Integer, MapView[T, lang.Integer]] { > > > > override def createAccumulator(): MapView[T, lang.Integer] = { > > new MapView[T, lang.Integer]() > > } > > ... > > > > We had NullPointerExceptions happening on > > > > getValue(accumulator: MapView[T, lang.Integer]): lang.Integer > > > > and > > > > def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit > = { > > > > so I added null checks there. > > > > Unfortunately the NPEs are still happening, right after triggering > > checkpointing > > > > 2021-07-14 04:01:22,340 INFO > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - > > Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job > > 0cbe21cce72742ec8e5 > > e6786aa6b44ca. > > 2021-07-14 04:02:52,249 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > > OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC], > > window=[ RANG BETWEEN > > 3600000 PRECEDING AND CURRENT ROW], > > > select=[....] > app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b > > AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from > > RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665). > > java.lang.NullPointerException: null > > at > > > > org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at BoundedOverAggregateHelper$946.setAccumulators(Unknown > > Source) ~[?:?] > > at > > > > org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224) > > ~[feature_hydra-assembly-master- > > 25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > > ~[feature_hydra-assembly-master-25903391.jar:master-2590339 > > 1] > > at > > > > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > > ~[feature_hydra-assembly-master-25903391.jar:master- > > 25903391] > > at > > > > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194) > > ~[feature_hydra-assembly-master-25903391 > > .jar:master-25903391] > > at > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) > > ~[feature_hydra-assembly-master-25903391.jar:master-2590 > > 3391] > > at > > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197) > > ~[feature_hydra-assembly-master-25903391.ja > > r:master-25903391] > > at > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) > > ~[feature_hydra-asse > > mbly-master-25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) > > ~[feature_hydra-assembly-master-25903391.jar:master-259 > > 03391] > > at > > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) > > ~[feature_hydra-assembly-master-25903391.jar:master-25903391] > > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282] > > > > reading the stacktrace it looks like the accumulatoronce again is > nullhere. > > > > I have no idea how the accumulator ends up null, I don't think this > > should be happening. I didn't find any information regarding this > > specific issue on google. What is the cause? What can I do to prevent > > this from happening? > > Running flink 1.12.2 on scala 2.11.12 and jdk 1.8.0_282 on Kubernetes > > > > Cheers > > Clemens > -- By communicating with Grab Inc and/or its subsidiaries, associate companies and jointly controlled entities (“Grab Group”), you are deemed to have consented to the processing of your personal data as set out in the Privacy Notice which can be viewed at https://grab.com/privacy/ <https://grab.com/privacy/> This email contains confidential information and is only for the intended recipient(s). If you are not the intended recipient(s), please do not disseminate, distribute or copy this email Please notify Grab Group immediately if you have received this by mistake and delete this email from your system. Email transmission cannot be guaranteed to be secure or error-free as any information therein could be intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do not accept liability for any errors or omissions in the contents of this email arises as a result of email transmission. All intellectual property rights in this email and attachments therein shall remain vested in Grab Group, unless otherwise provided by law.