Hi, So we are running Flink 19.1 on AWS EMR using Yarn as resource manager. We have a fairly large cluster with 120 parallelism and 30 task managers running with 4 task slots.
Here are some of the important configs: taskmanager.numberOfTaskSlots: 4 jobmanager.memory.process.size: 12g taskmanager.memory.process.size: 24g taskmanager.memory.task.off-heap.size: 1g taskmanager.memory.managed.fraction: 0.5 taskmanager.memory.network.fraction: 0.05 taskmanager.memory.jvm-overhead.fraction: 0.05 state.backend.type: rocksdb state.checkpoints.dir: s3://bucket/flink-checkpoints state.backend.incremental: 'true' state.backend.local-recovery: 'true' state.backend.changelog.enabled: 'true' state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket/changelog dstl.dfs.compression.enabled: 'true' My job checkpoint configs are: env.enableCheckpointing(900000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(900000L); env.getCheckpointConfig().setCheckpointTimeout(900000L); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableUnalignedCheckpoints(); Also checkpoint full size can be very large upto 50 GB. Now it was running fine for a long time but randomly we got the following exception: java.io.IOException: Could not perform checkpoint 70 for operator ... (80/ 120)#0. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java: 1326) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java: 147) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java: 287) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$ 100(SingleCheckpointBarrierHandler.java:64) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java: 488) at org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned.barrierReceived(AlternatingWaitingForFirstBarrierUnaligned.java: 78) at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlternatingAlignedBarrierHandlerState.barrierReceived(AbstractAlternatingAlignedBarrierHandlerState.java: 56) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$ 2(SingleCheckpointBarrierHandler.java:234) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java: 262) at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java: 231) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java: 181) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java: 159) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java: 122) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java: 65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java: 579) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java: 231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java: 909) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java: 858) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java: 958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 70 for operator ... (80/120)#0. Failure reason: Checkpoint was declined. at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: 281) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: 185) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java: 348) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java: 228) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java: 213) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java: 192) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java: 720) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java: 352) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$ 16(StreamTask.java:1369) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 .runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java: 1357) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java: 1314) ... 22 more Caused by: java.io.IOException: The upload for 1439 has already failed previously at org.apache.flink.changelog.fs.FsStateChangelogWriter.ensureCanPersist(FsStateChangelogWriter.java: 463) at org.apache.flink.changelog.fs.FsStateChangelogWriter.persistInternal(FsStateChangelogWriter.java: 234) at org.apache.flink.changelog.fs.FsStateChangelogWriter.persist(FsStateChangelogWriter.java: 217) at org.apache.flink.state.changelog.ChangelogKeyedStateBackend.snapshot(ChangelogKeyedStateBackend.java: 406) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java: 258) ... 33 more Caused by: java.util.concurrent.TimeoutException: Attempt 3 timed out after 1000ms at org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.fmtError(RetryingExecutor.java: 319) at org.apache.flink.changelog.fs.RetryingExecutor$RetriableActionAttempt.lambda$scheduleTimeout$ 1(RetryingExecutor.java:314) After this I start getting many timeout exceptions where randomly one or other task manager becomes unreachable and restarts start happening very frequently. Here are few of these stack traces: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_... timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java: 1550) at org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.run(DefaultHeartbeatMonitor.java: 158) ... org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Connection unexpectedly closed by remote task manager '...'. This might indicate that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java: 134) at org.apache.flink.shaded.netty4 .io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java: 305) at org.apache.flink.shaded.netty4 .io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java: 281) at org.apache.flink.shaded.netty4 .io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java: 274) ... org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id ... is no longer reachable. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java: 1566) at org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java: 126) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java: 275) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java: 267) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java: 262) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$ 0(HeartbeatManagerImpl.java:248) ... org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager '...'. This indicates that the remote task manager was lost. at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java: 165) at org.apache.flink.shaded.netty4 .io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java: 346) at org.apache.flink.shaded.netty4 .io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java: 325) ... Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
