I don't see anything suspicious in your code. The stacktrace is also for a
MapSerializer. Do you have another operator where you put Map into a custom
state?

On Fri, Aug 20, 2021 at 6:43 PM yidan zhao <hinobl...@gmail.com> wrote:

> But, I do not know why this leads to the job's failure and recovery
> since I have set the tolerable failed checkpoint to Integer.MAX_VALUE.
> Due to the failure, my task manager failed because of the task cancel
> timeout, and about 80% of task managers went down due to cancel
> timeout.
>
> yidan zhao <hinobl...@gmail.com> 于2021年8月21日周六 上午12:35写道:
> >
> > Ok, thanks. I have some result, and you can give some ensure. Here is
> > the issue code:
> >
> > The async function's implementation. It do async redis query, and fill
> > some data back.
> > In code [ currentBatch.get(i).getD().put("ipLabel",
> > objects.getResponses().get(i)); ] the getD() returns a map attr in
> > OriginalUserAccessLog class.
> > The issue occurred when ckpt is doing, and the redis query result
> > returns concurrently when the async function's input queue is being
> > serialized.
> >
> >
> > @Override
> > public void asyncInvoke0(OriginalUserAccessLog input,
> >                          ResultFuture<OriginalUserAccessLog>
> resultFuture) {
> >     inputBuffer.add(input);
> >     if (inputBuffer.size() >= 1000) {
> >         List<OriginalUserAccessLog> currentBatch = inputBuffer;
> >         inputBuffer = new ArrayList<>();
> >
> >         RBatch rBatch = redissonClient.createBatch();
> >
> >         for (OriginalUserAccessLog i : currentBatch) {
> >             rBatch.getBucket("ip:" + i.getIp()).getAsync();
> >         }
> >
> >         rBatch.executeAsync().onComplete((objects, throwable) -> {
> >             if (throwable == null) {
> >                 for (int i = 0; i < currentBatch.size(); i++) {
> >                     currentBatch.get(i).getD().put("ipLabel",
> > objects.getResponses().get(i));
> >                 }
> >             }
> >             resultFuture.complete(currentBatch);
> >         });
> >
> >     } else {
> >         resultFuture.complete(Collections.emptyList());
> >     }
> > }
> >
> > Chesnay Schepler <ches...@apache.org> 于2021年8月20日周五 上午1:56写道:
> > >
> > > Essentially this exception means that the state was modified while a
> > > snapshot was being taken.
> > >
> > > We usually see this when users hold on to some state value beyond a
> > > single call to a user-defined function, particularly from different
> threads.
> > >
> > > We may be able to pinpoint the issue if you were to provide us with the
> > > functions.
> > >
> > > On 19/08/2021 16:59, yidan zhao wrote:
> > > > Flink web ui shows the exception as follows.
> > > > In the task (ual_transform_UserLogBlackUidJudger ->
> > > > ual_transform_IpLabel ), the first one is a broadcast process
> > > > function, and the second one is an async function. I do not know
> > > > whether the issues have some relation to it.
> > > >
> > > > And the issues not occurred before, it occurred after I upgraded to
> > > > flink 1.13.2.
> > > >
> > > >
> > > >
> > > > _____exception info from flink web ui:_____
> > > > java.io.IOException: Could not perform checkpoint 58 for operator
> > > > ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
> > > > (29/60)#0.
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> > > >
> > > > at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> > > >
> > > > at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> > > >
> > > > at org.apache.flink.streaming.runtime.io
> .StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> > > >
> > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> > > >
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> > > >
> > > > at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: org.apache.flink.runtime.checkpoint.CheckpointException:
> > > > Could not complete snapshot 58 for operator
> > > > ual_transform_UserLogBlackUidJudger -> ual_transform_IpLabel
> > > > (29/60)#0. Failure reason: Checkpoint was declined.
> > > >
> > > > at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
> > > >
> > > > at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
> > > >
> > > > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
> > > >
> > > > ... 20 more
> > > >
> > > > Caused by: java.util.ConcurrentModificationException
> > > >
> > > > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
> > > >
> > > > at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
> > > >
> > > > at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
> > > >
> > > > at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
> > > >
> > > > at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
> > > >
> > > > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> > > >
> > > > at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:260)
> > > >
> > > > at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:234)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
> > > >
> > > > at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
> > > >
> > > > at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
> > > >
> > > > at
> org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:64)
> > > >
> > > > at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:76)
> > > >
> > > > at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
> > > >
> > > > at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
> > > >
> > > > at
> org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
> > > >
> > > > at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
> > > >
> > > > at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:226)
> > > >
> > > > ... 30 more
> > >
> > >
>

Reply via email to