[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16381777#comment-16381777
 ] 

tarun razdan commented on FLINK-7756:
-------------------------------------

[~aljoscha] 
This example is running fine on local and without rocksdb state Backend. But 
when we submit the job with rocksdb state backend on the cluster, the job fails 
with the following exception.
{code:java}
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 
for operator ....
{code}
The job is a word count which fetches the input from kafka. So you can pass any 
words from kafka and it will print the results. To test the restore behaviour 
we have added a CEP in which if you enter the string "exception" in kafka, the 
job fails and tries to restart, and on restart we except it to restore from 
last checkpoint. But it's not starting with rocksdb backend.

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-7756
>                 URL: https://issues.apache.org/jira/browse/FLINK-7756
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, State Backends, Checkpointing, Streaming
>    Affects Versions: 1.4.0, 1.3.2
>         Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>            Reporter: Shashank Agarwal
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.5.0, 1.4.1
>
>         Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil              
>               - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>       ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>       at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>       at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>       ... 5 more
>       Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>               at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>               ... 5 more
>       Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>               at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>               at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>               at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>               at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>               at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>               ... 7 more
>       Caused by: java.lang.IllegalStateException
>               at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>               at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
>               at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
>               at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
>               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>               at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>               ... 5 more
>       [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Map (1/2) 
> (a06925261e74b4efdf50a30089e2b778).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to fail task externally Map (1/2) 
> (1747902c96e63fefd977ac4d4a01d2fa).
> 2017-09-29 14:21:34,180 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Map (1/2) (a06925261e74b4efdf50a30089e2b778) switched from 
> RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Map (1/2).}
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Map (1/2).
>       ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>       at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>       at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>       ... 5 more
>       Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>               at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>               ... 5 more
>       Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>               at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>               at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>               at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>               at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>               at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>               ... 7 more
>       Caused by: java.lang.IllegalStateException
>               at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>               at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
>               at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
>               at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
>               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>               at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>               ... 5 more
>       [CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}
> That same printed for around 12-13 tasks. Than following logs printed :
> {code:java}
> 2017-09-29 14:21:35,039 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Ensuring all FileSystem streams are closed for task Source: 
> Custom Source (2/2) (77c896e2a2063e98f399244cae21c260) [CANCELED]
> 2017-09-29 14:21:35,041 WARN  org.apache.hadoop.ipc.Client                    
>               - interrupted waiting to send rpc request to server
> java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1454)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>       at com.sun.proxy.$Proxy12.delete(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy13.delete(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
>       at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
>       at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> 2017-09-29 14:21:35,042 WARN  
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory  - Could 
> not delete the checkpoint stream file 
> hdfs://static.175.87.9.5.clients.your-server.de:8020/flink/flink-checkpoints/rocksDB/events/e10dbe09aa2ecccb22737ddce8b4dc9f/chk-2/a28796de-978a-4f1a-8ff5-5f5c654b0ffc.
> java.io.IOException: java.lang.InterruptedException
>       at org.apache.hadoop.ipc.Client.call(Client.java:1460)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>       at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>       at com.sun.proxy.$Proxy12.delete(Unknown Source)
>       at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>       at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>       at com.sun.proxy.$Proxy13.delete(Unknown Source)
>       at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
>       at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>       at 
> org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
>       at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
>       at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
>       at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException
>       at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
>       at java.util.concurrent.FutureTask.get(FutureTask.java:191)
>       at 
> org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
>       at org.apache.hadoop.ipc.Client.call(Client.java:1454)
>       ... 31 more
> 2017-09-29 14:21:35,054 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Attempting to cancel task KeyedCEPPatternOperator -> Flat Map 
> -> (Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee).
> 2017-09-29 14:21:35,055 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) 
> (1/2) (8c6eff62d47c4a624a7554065bac36ee) switched from RUNNING to CANCELING.
> {code}
> Than same printed for 12-13 tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to