Hi,
So we are running Flink 19.1 on AWS EMR using Yarn as resource manager.
We have a fairly large cluster with 120 parallelism and 30 task managers
running with 4 task slots.

Here are some of the important configs:
taskmanager.numberOfTaskSlots: 4
jobmanager.memory.process.size: 12g
taskmanager.memory.process.size: 24g
taskmanager.memory.task.off-heap.size: 1g
taskmanager.memory.managed.fraction: 0.5
taskmanager.memory.network.fraction: 0.05
taskmanager.memory.jvm-overhead.fraction: 0.05
state.backend.type: rocksdb
state.checkpoints.dir: s3://bucket/flink-checkpoints
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://bucket/changelog
dstl.dfs.compression.enabled: 'true'

My job checkpoint configs are:

env.enableCheckpointing(900000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(900000L);
env.getCheckpointConfig().setCheckpointTimeout(900000L);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableUnalignedCheckpoints();

Also checkpoint full size can be very large upto 50 GB.
Now it was running fine for a long time but randomly we got the following
exception:

java.io.IOException: Could not perform checkpoint 70 for operator ... (80/
120)#0.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:
1326)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:
147)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:
287)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$
100(SingleCheckpointBarrierHandler.java:64)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:
488)
at
org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java:
78)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java:
56)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$
2(SingleCheckpointBarrierHandler.java:234)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:
262)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:
231)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:
181)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:
159)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:
122)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:
65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:
579)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:
231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:
909)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:
858)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:
958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could
not complete snapshot 70 for operator ... (80/120)#0. Failure reason:
Checkpoint was declined.
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:
281)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:
185)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:
348)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:
228)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:
213)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:
192)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:
720)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:
352)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$
16(StreamTask.java:1369)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:
1357)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:
1314)
... 22 more
Caused by: java.io.IOException: The upload for 1439 has already failed
previously
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java:
463)
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java:
234)
at
org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java:
217)
at
org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java:
406)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:
258)
... 33 more
Caused by: java.util.concurrent.TimeoutException: Attempt 3 timed out after
1000ms
at
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.fmtError(RetryingExecutor.java:
319)
at
org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.lambda$scheduleTimeout$
1(RetryingExecutor.java:314)

After this I start getting many timeout exceptions where randomly one or
other task manager becomes unreachable and restarts start happening very
frequently. Here are few of these stack traces:


java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
container_... timed out.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:
1550)
at
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java:
158)
...

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connection unexpectedly closed by remote task manager '...'. This might
indicate that the remote task manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:
134)
at org.apache.flink.shaded.netty4
.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:
305)
at org.apache.flink.shaded.netty4
.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:
281)
at org.apache.flink.shaded.netty4
.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:
274)
...

org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id
... is no longer reachable.
at
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:
1566)
at
org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:
126)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:
275)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:
267)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:
262)
at
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$
0(HeartbeatManagerImpl.java:248)
...

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Lost connection to task manager '...'. This indicates that the remote task
manager was lost.
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:
165)
at org.apache.flink.shaded.netty4
.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:
346)
at org.apache.flink.shaded.netty4
.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:
325)
...
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
recvAddress(..) failed: Connection reset by peer

Reply via email to