[ https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stefan Richter closed FLINK-11662. ---------------------------------- Resolution: Fixed Fix Version/s: 1.9.0 Merged in: master: b760d55 > Discarded checkpoint can cause Tasks to fail > -------------------------------------------- > > Key: FLINK-11662 > URL: https://issues.apache.org/jira/browse/FLINK-11662 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.7.0, 1.8.0 > Reporter: madong > Assignee: Yun Tang > Priority: Critical > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: jobmanager.log, taskmanager.log > > Time Spent: 10m > Remaining Estimate: 0h > > Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as > it receives the first decline message. Part of the discard operation is the > deletion of the checkpointing directory. Depending on the underlying > {{FileSystem}} implementation, concurrent write and read operation to files > in the checkpoint directory can then fail (e.g. this is the case with HDFS). > If there is still a local checkpointing operation running for some {{Task}} > and belonging to the discarded checkpoint, then it can happen that the > checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). > Depending on the configuration of the {{CheckpointExceptionHandler}}, this > can lead to a task failure and a job recovery which is caused by an already > discarded checkpoint. > {code:java} > 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc. > 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline > checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job > 599a6ac3c371874d12ebf024978cadbc. > 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding > checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc. > org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException: > Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was > not running > at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource > -> mapOperate -> Timestamps/Watermarks (1/3) > (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED. > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1389046 for operator > Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for > operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) > ... 6 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) > ... 5 common frames omitted > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696) > at > org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50) > ... 7 common frames omitted > Caused by: org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path > doesn't exist: > /.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9 > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkFilePath(FSNamesystem.java:3063) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3089) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3043) > at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:938) > at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:743) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1175) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1171) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1169) > at org.apache.hadoop.ipc.Client.call(Client.java:863) > at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:227) > at com.sun.proxy.$Proxy5.complete(Unknown Source) > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > at com.sun.proxy.$Proxy5.complete(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.closeFile(DFSClient.java:1208) > at org.apache.hadoop.hdfs.DFSClient.access$5300(DFSClient.java:151) > at > org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:6693) > at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:6567) > at > org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) > at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52) > at > org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312) > ... 12 common frames omitted > {code} > Especially for larger jobs with sources which recover state and, hence, take > some time before they accept a checkpoint request, this can lead to an > unstable job which can be stuck in some restart loop. > A workaround for this problem is to disable {{failTaskOnCheckpointError}} in > the {{ExecutionConfig}} via > {{ExecutionConfig#setFailTaskOnCheckpointError(false)}}. With this setting > checkpoint failures won't fail the owning {{Task}}. > In order to properly solve this problem, failing local checkpoint operations > belonging to a discarded checkpoint should simply be ignored. A good solution > could be to centralize the checkpoint operation failure handling in the > {{CheckpointCoordinator}}. The {{CheckpointCoordinator}} knows which > checkpoints are still valid and, hence, can distinguish between valid and > invalid checkpoint operation failures. It would, furthermore, allow to > implement more sophisticated failure handling strategies such as accepting > {{n}} checkpoint failures before failing a task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)