[ 
https://issues.apache.org/jira/browse/FLINK-11662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-11662.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.9.0

Merged in:
master: b760d55

> Discarded checkpoint can cause Tasks to fail
> --------------------------------------------
>
>                 Key: FLINK-11662
>                 URL: https://issues.apache.org/jira/browse/FLINK-11662
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.7.0, 1.8.0
>            Reporter: madong
>            Assignee: Yun Tang
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.9.0
>
>         Attachments: jobmanager.log, taskmanager.log
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Flink's {{CheckpointCoordinator}} discards an ongoing checkpoint as soon as 
> it receives the first decline message. Part of the discard operation is the 
> deletion of the checkpointing directory. Depending on the underlying 
> {{FileSystem}} implementation, concurrent write and read operation to files 
> in the checkpoint directory can then fail (e.g. this is the case with HDFS). 
> If there is still a local checkpointing operation running for some {{Task}} 
> and belonging to the discarded checkpoint, then it can happen that the 
> checkpointing operation fails (e.g. an {{AsyncCheckpointRunnable}}). 
> Depending on the configuration of the {{CheckpointExceptionHandler}}, this 
> can lead to a task failure and a job recovery which is caused by an already 
> discarded checkpoint.
> {code:java}
> 2019-02-16 11:26:29.378 [Checkpoint Timer] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 1389046 @ 1550287589373 for job 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
> checkpoint 1389046 by task 7239e5d29203c4c720ed2db6f5db33fc of job 
> 599a6ac3c371874d12ebf024978cadbc.
> 2019-02-16 11:26:29.630 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 1389046 of job 599a6ac3c371874d12ebf024978cadbc.
> org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
>  Task Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (3/3) was 
> not running
> at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1166)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2019-02-16 11:26:29.697 [flink-akka.actor.default-dispatcher-68] INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: KafkaSource 
> -> mapOperate -> Timestamps/Watermarks (1/3) 
> (a5657b784d235731cd468164e85d0b50) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
> java.lang.Exception: Could not materialize checkpoint 1389046 for operator 
> Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 1389046 for 
> operator Source: KafkaSource -> mapOperate -> Timestamps/Watermarks (1/3).
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 common frames omitted
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
>  in order to obtain the stream state handle
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:326)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:767)
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackend.java:696)
> at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 common frames omitted
> Caused by: org.apache.hadoop.ipc.RemoteException: java.io.IOException: Path 
> doesn't exist: 
> /.../flink/checkpoints/599a6ac3c371874d12ebf024978cadbc/chk-1389046/84631771-01e2-41bc-950d-c9e39eac26f9
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkFilePath(FSNamesystem.java:3063)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3089)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3043)
> at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:938)
> at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:743)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1175)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1171)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1169)
> at org.apache.hadoop.ipc.Client.call(Client.java:863)
> at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:227)
> at com.sun.proxy.$Proxy5.complete(Unknown Source)
> at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
> at com.sun.proxy.$Proxy5.complete(Unknown Source)
> at org.apache.hadoop.hdfs.DFSClient.closeFile(DFSClient.java:1208)
> at org.apache.hadoop.hdfs.DFSClient.access$5300(DFSClient.java:151)
> at 
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:6693)
> at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:6567)
> at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at 
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:312)
> ... 12 common frames omitted
> {code}
> Especially for larger jobs with sources which recover state and, hence, take 
> some time before they accept a checkpoint request, this can lead to an 
> unstable job which can be stuck in some restart loop. 
> A workaround for this problem is to disable {{failTaskOnCheckpointError}} in 
> the {{ExecutionConfig}} via 
> {{ExecutionConfig#setFailTaskOnCheckpointError(false)}}. With this setting 
> checkpoint failures won't fail the owning {{Task}}.
> In order to properly solve this problem, failing local checkpoint operations 
> belonging to a discarded checkpoint should simply be ignored. A good solution 
> could be to centralize the checkpoint operation failure handling in the 
> {{CheckpointCoordinator}}. The {{CheckpointCoordinator}} knows which 
> checkpoints are still valid and, hence, can distinguish between valid and 
> invalid checkpoint operation failures. It would, furthermore, allow to 
> implement more sophisticated failure handling strategies such as accepting 
> {{n}} checkpoint failures before failing a task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to