Hi Timo,
thanks for the help here, wrapping the MapView in a case class indeed
solved the problem.
It was not immediately apparent from the documentation that using a MapView
as top level accumulator would cause an issue. it seemed a straightforward
intuitive way to use it :)

Cheers
Clemens

On Wed, Jul 14, 2021 at 10:19 PM Timo Walther <twal...@apache.org> wrote:

> Hi Clemens,
>
> first of all can you try to use the MapView within an accumulator POJO
> class. This might solve your exception. I'm not sure if we support the
> views as top-level accumulators.
>
> In any case this seems to be a bug. I will open an issue once I get you
> feedback. We might simply throw an exception for top-level usage then.
>
> Regards,
> Timo
>
>
>
> On 14.07.21 06:33, Clemens Valiente wrote:
> > Hi,
> >
> > we created a new AggregateFunction with Accumulator as Mapview as follows
> >
> >     class CountDistinctAggFunction[T] extends
> >     AggregateFunction[lang.Integer, MapView[T, lang.Integer]] {
> >
> >        override def createAccumulator(): MapView[T, lang.Integer] = {
> >          new MapView[T, lang.Integer]()
> >        }
> >     ...
> >
> > We had NullPointerExceptions happening on
> >
> >     getValue(accumulator: MapView[T, lang.Integer]): lang.Integer
> >
> > and
> >
> >     def accumulate(accumulator: MapView[T, lang.Integer], key: T): Unit
> = {
> >
> > so I added null checks there.
> >
> > Unfortunately the NPEs are still happening, right after triggering
> > checkpointing
> >
> >     2021-07-14 04:01:22,340 INFO
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> >     Triggering checkpoint 1 (type=CHECKPOINT) @ 1626235282013 for job
> >     0cbe21cce72742ec8e5
> >     e6786aa6b44ca.
> >     2021-07-14 04:02:52,249 INFO
> >       org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> >     OverAggregate(partitionBy=[entityID], orderBy=[eventTime ASC],
> >     window=[ RANG BETWEEN
> >     3600000 PRECEDING AND CURRENT ROW],
> >
>  select=[....] 
> app$functions$table$CountDistinctAggFunction$47dfbce463746500de0b303cff5c947b
> >     AS w0$o0) (5/8) (39c23e4703862c39e513dcb5fd629fb4) switched from
> >       RUNNING to FAILED on 10.195.174.180:6122-ba5b74 (dataPort=45665).
> >     java.lang.NullPointerException: null
> >              at
> >
>  
> org.apache.flink.table.data.conversion.RawObjectConverter.toExternal(RawObjectConverter.java:49)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at BoundedOverAggregateHelper$946.setAccumulators(Unknown
> >     Source) ~[?:?]
> >              at
> >
>  
> org.apache.flink.table.runtime.operators.over.RowTimeRangeBoundedPrecedingFunction.onTimer(RowTimeRangeBoundedPrecedingFunction.java:224)
> >     ~[feature_hydra-assembly-master-
> >     25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-2590339
> >     1]
> >              at
> >
>  
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-
> >     25903391]
> >              at
> >
>  
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:194)
> >     ~[feature_hydra-assembly-master-25903391
> >     .jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-2590
> >     3391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:197)
> >     ~[feature_hydra-assembly-master-25903391.ja
> >     r:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
> >     ~[feature_hydra-asse
> >     mbly-master-25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-259
> >     03391]
> >              at
> >     org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:206)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >     org.apache.flink.streaming.runtime.io
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >     org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >     org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at
> >     org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> >     ~[feature_hydra-assembly-master-25903391.jar:master-25903391]
> >              at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_282]
> >
> > reading the stacktrace it looks like the accumulatoronce again is
> nullhere.
> >
> > I have no idea how the accumulator ends up null, I don't think this
> > should be happening. I didn't find any information regarding this
> > specific issue on google. What is the cause? What can I do to prevent
> > this from happening?
> > Running flink 1.12.2 on scala 2.11.12 and jdk 1.8.0_282 on Kubernetes
> >
> > Cheers
> > Clemens
>

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 
<https://grab.com/privacy/>


This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.

Reply via email to