[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656236#comment-16656236 ] Paul Lin commented on FLINK-8098: - +1 for this issue. We get the same problem on Flink 1.5.3 with CDH-5.6.0. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal >Priority: Major > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659) >
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608991#comment-16608991 ] Henrique Barros commented on FLINK-8098: It happens on flink 1.3.2. I tried with versio 1.6.0 also and it is the same. However, the problem could be in hadoop (server side) - I opened a bug about that - https://issues.apache.org/jira/browse/HDFS-13833 Let's wait for the conclusion of that one. Thanks Stefan > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal >Priority: Major > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 8121486
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605446#comment-16605446 ] Stefan Richter commented on FLINK-8098: --- Maybe you could first give us some more details, for example which Flink version you are using and ideally also some stack trace or logs. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal >Priority: Major > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] > at > org.apache.hadoop.h
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579598#comment-16579598 ] Henrique Barros commented on FLINK-8098: Please reopen the issue. We've tested with many many settings in cloudera cluster. The cluster load is perfect, it is sleeping and it still happens very randomly. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal >Priority: Major > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] >
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579595#comment-16579595 ] Henrique Barros commented on FLINK-8098: I've come across this problem with my flink checkpointing pointing to Cloudera's hadoop. It happens very randomly exactly the same problem. It IS NOT user application exception since during 1 day running time we did not send any messages to our streaming application. It happens very randomly only with the checkpoints. If we turn checkpointing paralelism to 1 it does not happen, at least for 2 days straight that we tested. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal >Priority: Major > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.serv
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264222#comment-16264222 ] Shashank Agarwal commented on FLINK-8098: - yes, agree with the scenario. About file cleanup, i have checked manually file was there. So maybe this is ongoing checkpoint issue. We can close this now. If I find again I'll reopen with more info. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates:
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264175#comment-16264175 ] Stefan Richter commented on FLINK-8098: --- Parallel scripts was just another option that I have seen before causing a similar problem. I think I know where the exception comes from: your task is failing from some other error and Flink going through the shutdown procedure. This includes closing the HDFS output streams for ongoing checkpoints to terminate potentially blocking IO calls. This closing also deletes the files in HDFS to avoid lingering checkpoint files. At the same time, it is possible that the async checkpoint was already in the process of finalizing exactly this file and the file already was removed by the shutdown procedure's close. This means two things: 1. the exception is not the cause, but an effect of your job failing from something else, and 2. it is a sideeffect of the fast shutdown procedure closing and cleaning up things. While this logs an exception, it is actually all good: the file was just cleaned up already from the shutdown and the already canceled (!) checkpoint fails because of this. If you agree, can you please close the issue? > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAn
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264148#comment-16264148 ] Shashank Agarwal commented on FLINK-8098: - I got the main issue after another crash. I had another HDFS exception where I was trying to create a path more than 255 character limit. So why the application was crashing. During cancel tasks and restart again I got this logs. But I am sure I am not running any other scripts to read or write that files in checkpointing folder. About parallelism, i thought multiple threads trying to write in the same file during checkpointing. But as I checked different threads and operators are handling different files so that should not be the issue. On Wed, Nov 22, 2017 at 4:49 PM, Stefan Richter (JIRA) > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolE
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262324#comment-16262324 ] Stefan Richter commented on FLINK-8098: --- What do you mean by "parallelism of checkpointing"? Each backend is using a separate file to write its snapshot for a checkpoint. Maybe you could also provide a complete logfile? > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] > at > org.apa
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259659#comment-16259659 ] Shashank Agarwal commented on FLINK-8098: - Actually, we are not running any script on that checkpointing folder. Only that flink job have access to that. Maybe this is some race condition during parallelism of checkpointing. > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] > at >
[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259045#comment-16259045 ] Stefan Richter commented on FLINK-8098: --- >From a first glance, this does not look like a Flink problem, but more like a >HDFS problem or some outside influence. Mappers always have their own output >files, so concurrent writes should not be the reason for this. I have seen a >similar problem like this in a case where a script from the user was deleting >checkpoint files in the background and deleted the wrong files. Can we rule >out such problems? It would be very helpful if you could also report which backend you are using (e.g. RocksDB or FS) and in which mode(s) (e.g. incremental/full, async,...). > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > Fix For: 1.4.0 > > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$