Hi,

I encounter a weird NPE when try to do aggregate on a fixed window. If I
set a small parallism number the whole job uses only one TaskManager, this
NPE will not happen. But when the job scales to two TaskManagers, the
TaskManager will crash at Create stage. The Flink version I use is 1.11.1.

The NPE exception stack is:

2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task [] -
Window(TumblingProcessingTimeWindows(5000), ProcessingTimeTrigger,
AggregateDataEntry, PassThroughWindowFunction) -> Flat Map -> Sink: Unnamed
(7/10) (7244f264349013ca7d5336fcd565bc9f) switched from RUNNING to FAILED.
java.io.IOException: Exception while applying AggregateFunction in
aggregating state
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
HeapAggregatingState.java:107) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
.processElement(WindowOperator.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1
]
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:161) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:178) ~[flink-dist_2.11-1.11.1
.jar:1.11.1]
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:153) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.11.1.jar:
1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:345) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxStep(MailboxProcessor.java:191) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:181) ~[flink-dist_2.11-1.11.1.jar:1.11
.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:558) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:530) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.11-1.11.1.jar:1.11.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: java.lang.NullPointerException
    at org.apache.flink.runtime.state.heap.StateTable.transform(StateTable
.java:203) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    at org.apache.flink.runtime.state.heap.HeapAggregatingState.add(
HeapAggregatingState.java:105) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
    ... 13 more

My aggregate code is

public class AggregateDataEntry implements
AggregateFunction<Tuple2<DataKey, DataIndex>, Map<DataKey, DataIndex>,
Map<DataKey, DataIndex>> {

    @Override
    public Map<DataKey, DataIndex> createAccumulator() {
        return new HashMap<>();
    }

    @Override
    public Map<DataKey, DataIndex> add(Tuple2<DataKey, DataIndex>
value, Map<DataKey, DataIndex> accumulator) {
        accumulator.merge(value.f0, value.f1, DataIndex::add);
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> getResult(Map<DataKey, DataIndex>
accumulator) {
        return accumulator;
    }

    @Override
    public Map<DataKey, DataIndex> merge(Map<DataKey, DataIndex> a,
Map<DataKey, DataIndex> b) {
        a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex,
DataIndex::add));
        return b;
    }
}

Could anyone know something about this NPE, thanks!
-- 
Best regards

Sili Liu

Reply via email to