Hi, I don’t think this is the root cause. This exception said Flink wanted to clear the failed checkpoint but the clearance also failed. Can the increased checkpoint size be the cause for that? I suggest you 1. check the log of first failed checkpoint after 03/15 to confirm what happened; 2. Check if there are any rate limit for writing s3 file. If so, you could try to update the limit or increase the checkpoint interval.
> On 18 Mar 2022, at 3:48 AM, Vijayendra Yadav <[email protected]> wrote: > > Hi Hunag, > > It says checkpoint Expired with following Log: > > 2022-03-16 03:03:22,641 INFO > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher - > Shutting down the shard consumer threads of subtask 13 ... > 2022-03-16 03:03:22,641 WARN org.apache.hadoop.fs.s3a.S3ABlockOutputStream > - Interrupted object upload > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.fs.s3a.S3ABlockOutputStream.putObject(S3ABlockOutputStream.java:441) > at > org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:360) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > at > org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101) > at > org.apache.flink.fs.s3hadoop.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:306) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:225) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$abort$4(ChannelStateWriteRequest.java:89) > at > org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:176) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:73) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:52) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:94) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:74) > at java.lang.Thread.run(Thread.java:750) > 2022-03-16 03:03:22,641 WARN org.apache.hadoop.fs.s3a.S3AInstrumentation > - Closing output stream statistics while data is still marked > as pending upload in OutputStreamStatistics{blocksSubmitted=1, > blocksInQueue=1, blocksActive=0, blockUploadsCompleted=0, > blockUploadsFailed=0, bytesPendingUpload=1107015, bytesUploaded=0, > blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, > exceptionsInMultipartFinalize=0, transferDuration=0 ms, queueDuration=0 ms, > averageQueueTime=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 > bytes/s} > 2022-03-16 03:03:22,641 WARN > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Cannot > delete closed and discarded state stream for > s3://aeg-prod-bigdatadl-meta/flink/checkpoint/ams/5c658fc5f325f40baf063a78a20b1bb2/chk-535996/138d89f7-1514-42eb-b4bd-5e89d87c5a02 > <>. > java.io.InterruptedIOException: getFileStatus on > s3://aeg-prod-bigdatadl-meta/flink/checkpoint/ams/5c658fc5f325f40baf063a78a20b1bb2/chk-535996/138d89f7-1514-42eb-b4bd-5e89d87c5a02 > <>: com.amazonaws.AbortedException: > at > org.apache.hadoop.fs.s3a.S3AUtils.translateInterruptedException(S3AUtils.java:340) > at > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:171) > at > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2187) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697) > at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.delete(HadoopFileSystem.java:147) > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:150) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:107) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:311) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateCheckpointWriter.fail(ChannelStateCheckpointWriter.java:225) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest.lambda$abort$4(ChannelStateWriteRequest.java:89) > at > org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.execute(ChannelStateWriteRequest.java:176) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:73) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:52) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:94) > at > org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:74) > at java.lang.Thread.run(Thread.java:750) > Caused by: com.amazonaws.AbortedException: > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleInterruptedException(AmazonHttpClient.java:862) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:740) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) > at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) > at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) > at > com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235) > at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) > at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:280) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169) > ... 14 more > Caused by: com.amazonaws.http.timers.client.SdkInterruptedException > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:917) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.checkInterrupted(AmazonHttpClient.java:903) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1097) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) > > > On Thu, Mar 17, 2022 at 9:11 AM yu'an huang <[email protected] > <mailto:[email protected]>> wrote: > Hi, so the problem is about checkpoints. We need to understand why there are > checkpoint failure. Can you provide more logs. We need to check the log to > see more details about the first failed checkpoint. > >> On 17 Mar 2022, at 9:41 AM, Vijayendra Yadav <[email protected] >> <mailto:[email protected]>> wrote: >> >> >> Hi Flink Team, >> >> I am using Flink 1.11 with kinsisesis consumer and s3 file streaming write >> with s3 checkpoint backend. This is streaming service. >> >> Usually a couple of checkpoints fails but no issues, After a week or so of >> running checkpoint failures becomes ir·re·cov·er·a·ble and although the >> application keeps running but in bad state and data flow blocks. >> >> Refer Graph below: >> <image.png> >> >> Flink Checkpoint configurations as below: >> Note: Time units in Milliseconds >> flink.checkpoint.interval=10000 >> flink.checkpoint.minPauseInterval=500 >> flink.checkpoint.Timeout=10000 >> flink.checkpoint.maxConcurrent=1 >> flink.checkpoint.preferCheckPoint=true >> >> >> kinesis.shard.getrecords.max=10000 >> kinesis.shard.getrecords.interval=10000 >> kinesis.initial.position=LATEST >> >> >> EXCEPTION On Job: >> >> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable >> failure threshold. >> at >> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90) >> at >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736) >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:750) >> >> >> >> >> >
