https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
 
If heap state backend is used with synchronous snapshotting, the global 
iterator keeps a copy of all keys while iterating because of its specific 
implementation which does not support concurrent modifications. Enabling of 
this feature will increase memory consumption then. Asynchronous snapshotting 
does not have this problem.

我找到这段信息,感觉和你的操作类似




athlon...@gmail.com
 
发件人: 戴嘉诚
发送时间: 2019-07-25 18:24
收件人: user-zh
主题: Re: Flink checkpoint 并发问题
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
 
athlon...@gmail.com <athlon...@gmail.com> 于2019年7月25日周四 下午6:20写道:
 
> setMaxConcurrentCheckpoints 这个参数你设置过么?
>
>
>
> athlon...@gmail.com
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:07
> 收件人: user-zh
> 主题: Flink checkpoint 并发问题
> 大家好:
>
>     我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
>
>
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
>
>
>
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
>
>
> java.lang.Exception: Could not perform checkpoint 550 for operator
> KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> (16/20).
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
>
>          at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
>
>          at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>
>          at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
>          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>          at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Could not complete snapshot 550 for
> operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存 (16/20).
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
>
>          at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
>
>          ... 8 more
>
> Caused by: java.util.ConcurrentModificationException
>
>          at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
>
>          at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
>
>          at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>
>          at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>
>          at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
>
>          at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
>
>          at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68)
>
>          at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
>
>          at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
>
>          at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
>
>          ... 13 more
>

回复