Hi, Could you check that your grouping key has a stable hashcode and equals? It is very likely caused by an unstable hashcode and that a record with an incorrect key ends up on a wrong task manager.
Best, Dawid On 13/04/2021 08:47, Si-li Liu wrote: > Hi, > > I encounter a weird NPE when try to do aggregate on a fixed window. If > I set a small parallism number the whole job uses only one > TaskManager, this NPE will not happen. But when the job scales to two > TaskManagers, the TaskManager will crash at Create stage. The Flink > version I use is 1.11.1. > > The NPE exception stack is: > > 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task > [] - Window(TumblingProcessingTimeWindows(5000), > ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction) > -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f) > switched from RUNNING to FAILED. > java.io <http://java.io>.IOException: Exception while applying > AggregateFunction in aggregating state > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.streaming.runtime.io > <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221] > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > ... 13 more > My aggregate code is > public class AggregateDataEntry implements AggregateFunction<Tuple2<DataKey, > DataIndex>, Map<DataKey, DataIndex>, Map<DataKey, DataIndex>> { > > @Override public Map<DataKey, DataIndex> createAccumulator() { > return new HashMap<>(); > } > > @Override public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex> > value, Map<DataKey, DataIndex> accumulator) { > accumulator.merge(value.f0, value.f1, DataIndex::add); > return accumulator; > } > > @Override public Map<DataKey, DataIndex> getResult(Map<DataKey, > DataIndex> accumulator) { > return accumulator; > } > > @Override public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a, > Map<DataKey, DataIndex> b) { > a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, > DataIndex::add)); > return b; > } > } > Could anyone know something about this NPE, thanks! > -- > Best regards > > Sili Liu
OpenPGP_signature
Description: OpenPGP digital signature