[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2018-10-18 Thread Paul Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656236#comment-16656236
 ] 

Paul Lin commented on FLINK-8098:
-

+1 for this issue. We get the same problem on Flink 1.5.3 with CDH-5.6.0. 

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>Priority: Major
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659)
>   

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2018-09-10 Thread Henrique Barros (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608991#comment-16608991
 ] 

Henrique Barros commented on FLINK-8098:


It happens on flink 1.3.2. I tried with versio 1.6.0 also and it is the same.

However, the problem could be in hadoop (server side) - I opened a bug about 
that - https://issues.apache.org/jira/browse/HDFS-13833

Let's wait for the conclusion of that one.

 

Thanks Stefan

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>Priority: Major
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 8121486

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2018-09-06 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16605446#comment-16605446
 ] 

Stefan Richter commented on FLINK-8098:
---

Maybe you could first give us some more details, for example which Flink 
version you are using and ideally also some stack trace or logs.

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>Priority: Major
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>   at 
> org.apache.hadoop.h

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2018-08-14 Thread Henrique Barros (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579598#comment-16579598
 ] 

Henrique Barros commented on FLINK-8098:


Please reopen the issue. We've tested with many many settings in cloudera 
cluster. The cluster load is perfect, it is sleeping and it still happens very 
randomly.

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>Priority: Major
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2018-08-14 Thread Henrique Barros (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579595#comment-16579595
 ] 

Henrique Barros commented on FLINK-8098:


I've come across this problem with my flink checkpointing pointing to 
Cloudera's hadoop.

It happens very randomly exactly the same problem. It IS NOT user application 
exception since during 1 day running time we did not send any messages to our 
streaming application. It happens very randomly  only with the checkpoints. If 
we turn checkpointing paralelism to 1 it does not happen, at least for 2 days 
straight that we tested.

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>Priority: Major
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.serv

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-23 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264222#comment-16264222
 ] 

Shashank Agarwal commented on FLINK-8098:
-

yes, agree with the scenario. 

About file cleanup, i have checked manually file was there. So maybe this is 
ongoing checkpoint issue. We can close this now. If I find again I'll reopen 
with more info.

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates:

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-23 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264175#comment-16264175
 ] 

Stefan Richter commented on FLINK-8098:
---

Parallel scripts was just another option that I have seen before causing a 
similar problem. 

I think I know where the exception comes from: your task is failing from some 
other error and Flink going through the shutdown procedure. This includes 
closing the HDFS output streams for ongoing checkpoints to terminate 
potentially blocking IO calls. This closing also deletes the files in HDFS to 
avoid lingering checkpoint files. At the same time, it is possible that the 
async checkpoint was already in the process of finalizing exactly this file and 
the file already was removed by the shutdown procedure's close. This means two 
things: 1. the exception is not the cause, but an effect of your job failing 
from something else, and 2. it is a sideeffect of the fast shutdown procedure 
closing and cleaning up things. While this logs an exception, it is actually 
all good: the file was just cleaned up already from the shutdown and the 
already canceled (!) checkpoint fails because of this.

If you agree, can you please close the issue?

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAn

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-23 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264148#comment-16264148
 ] 

Shashank Agarwal commented on FLINK-8098:
-

I got the main issue after another crash. I had another HDFS exception
where I was trying to create a path more than 255 character limit. So why
the application was crashing. During cancel tasks and restart again I got
this logs.

But I am sure I am not running any other scripts to read or write that
files in checkpointing folder. About parallelism, i thought multiple
threads trying to write in the same file during checkpointing. But as I
checked different threads and operators are handling different files so
that should not be the issue.



‌

On Wed, Nov 22, 2017 at 4:49 PM, Stefan Richter (JIRA) 



> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolE

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-22 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16262324#comment-16262324
 ] 

Stefan Richter commented on FLINK-8098:
---

What do you mean by "parallelism of checkpointing"? Each backend is using a 
separate file to write its snapshot for a checkpoint. Maybe you could also 
provide a complete logfile?

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>   at 
> org.apa

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-20 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259659#comment-16259659
 ] 

Shashank Agarwal commented on FLINK-8098:
-

Actually, we are not running any script on that checkpointing folder. Only that 
flink job have access to that. Maybe this is some race condition during 
parallelism of checkpointing. 

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>   at 
>

[jira] [Commented] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-20 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16259045#comment-16259045
 ] 

Stefan Richter commented on FLINK-8098:
---

>From a first glance, this does not look like a Flink problem, but more like a 
>HDFS problem or some outside influence. Mappers always have their own output 
>files, so concurrent writes should not be the reason for this. I have seen a 
>similar problem like this in a case where a script from the user was deleting 
>checkpoint files in the background and deleted the wrong files. Can we rule 
>out such problems?

It would be very helpful if you could also report which backend you are using 
(e.g. RocksDB or FS) and in which mode(s) (e.g. incremental/full, async,...).

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
> Fix For: 1.4.0
>
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$