Hi,

Could you check that your grouping key has a stable hashcode and equals?
It is very likely caused by an unstable hashcode and that a record with
an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
> Hi, 
>
> I encounter a weird NPE when try to do aggregate on a fixed window. If
> I set a small parallism number the whole job uses only one
> TaskManager, this NPE will not happen. But when the job scales to two
> TaskManagers, the TaskManager will crash at Create stage. The Flink
> version I use is 1.11.1.
>
> The NPE exception stack is:
>
> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Window(TumblingProcessingTimeWindows(5000),
> ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction)
> -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f)
> switched from RUNNING to FAILED.
> java.io <http://java.io>.IOException: Exception while applying
> AggregateFunction in aggregating state
>     at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by: java.lang.NullPointerException
>     at
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>     ... 13 more
> My aggregate code is
> public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, 
> DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> {
>
>     @Override public Map<DataKey, DataIndex> createAccumulator() {
>         return new HashMap<>();
>     }
>
>     @Override public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> 
> value, Map<DataKey, DataIndex> accumulator) {
>         accumulator.merge(value.f0, value.f1, DataIndex::add);
>         return accumulator;
>     }
>
>     @Override public Map<DataKey, DataIndex> getResult(Map<DataKey, 
> DataIndex> accumulator) {
>         return accumulator;
>     }
>
>     @Override public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, 
> Map<DataKey, DataIndex> b) {
>         a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
> DataIndex::add));
>         return b;
>     }
> }
> Could anyone know something about this NPE, thanks!
> -- 
> Best regards
>
> Sili Liu

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to