Checkpoint 设置如下:
env.setStateBackend((StateBackend) new
RocksDBStateBackend(checkpointDirectory,true));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointInterval(30 * 1000);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

王佩 <wang...@cmcm.com> 于2019年8月24日周六 下午4:49写道:

> Flink 版本 1.8.0
> 采用RocksDBStateBackend
>
> 程序上线后,运行过程中,经常会有如下异常:
> 2019-08-24 00:34:05,192 WARN
>  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-50832': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-08-24 02:58:03,572 WARN
>  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51120': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2019-08-24 03:21:00,784 WARN
>  org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
> Fail to subsume the old checkpoint.
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
> `s3a://****/********/a086ec215da2088daaa07af5ca8e5586/chk-51166': Directory
> is not empty
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.innerDelete(S3AFileSystem.java:1752)
> at
> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:1697)
> at
> org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147)
> at
> org.apache.flink.runtime.state.filesystem.FsCompletedCheckpointStorageLocation.disposeStorageLocation(FsCompletedCheckpointStorageLocation.java:70)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:264)
> at
> org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219)
> at
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$9(JobMaster.java:676)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 请教下,异常的原因,可能是啥。
>

Reply via email to