[ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16381959#comment-16381959 ]
Aljoscha Krettek commented on FLINK-7756: ----------------------------------------- Try "Without Bundled Hadoop", but "with hadoop 2.7" should also work. > RocksDB state backend Checkpointing (Async and Incremental) is not working > with CEP. > ------------------------------------------------------------------------------------- > > Key: FLINK-7756 > URL: https://issues.apache.org/jira/browse/FLINK-7756 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing, Streaming > Affects Versions: 1.4.0, 1.3.2 > Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend > Reporter: Shashank Agarwal > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > Attachments: jobmanager.log, jobmanager_without_cassandra.log, > taskmanager.log, taskmanager_without_cassandra.log > > > When i try to use RocksDBStateBackend on my staging cluster (which is using > HDFS as file system) it crashes. But When i use FsStateBackend on staging > (which is using HDFS as file system) it is working fine. > On local with local file system it's working fine in both cases. > Please check attached logs. I have around 20-25 tasks in my app. > {code:java} > 2017-09-29 14:21:31,639 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=0). > 2017-09-29 14:21:31,640 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,020 INFO > org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink - No state > to restore for the BucketingSink (taskIdx=1). > 2017-09-29 14:21:32,022 INFO > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - > Initializing RocksDB keyed state backend from snapshot. > 2017-09-29 14:21:32,078 INFO com.datastax.driver.core.NettyUtil > - Found Netty's native epoll transport in the classpath, using > it > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (1/2) > (b879f192c4e8aae6671cdafb3a24c00a). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (2/2) > (1ea5aef6ccc7031edc6b37da2912d90b). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Co-Flat Map (2/2) > (4bac8e764c67520d418a4c755be23d4d). > 2017-09-29 14:21:34,178 INFO org.apache.flink.runtime.taskmanager.Task > - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched > from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Co-Flat Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Co-Flat Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException] > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (1/2) > (a06925261e74b4efdf50a30089e2b778). > 2017-09-29 14:21:34,177 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to fail task externally Map (1/2) > (1747902c96e63fefd977ac4d4a01d2fa). > 2017-09-29 14:21:34,180 INFO org.apache.flink.runtime.taskmanager.Task > - Map (1/2) (a06925261e74b4efdf50a30089e2b778) switched from > RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 > for operator Map (1/2).} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for > operator Map (1/2). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) > ... 5 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.IllegalStateException > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 7 more > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > [CIRCULAR REFERENCE:java.lang.IllegalStateException] > {code} > That same printed for around 12-13 tasks. Than following logs printed : > {code:java} > 2017-09-29 14:21:35,039 INFO org.apache.flink.runtime.taskmanager.Task > - Ensuring all FileSystem streams are closed for task Source: > Custom Source (2/2) (77c896e2a2063e98f399244cae21c260) [CANCELED] > 2017-09-29 14:21:35,041 WARN org.apache.hadoop.ipc.Client > - interrupted waiting to send rpc request to server > java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059) > at org.apache.hadoop.ipc.Client.call(Client.java:1454) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy12.delete(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy13.delete(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2017-09-29 14:21:35,042 WARN > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could > not delete the checkpoint stream file > hdfs://static.175.87.9.5.clients.your-server.de:8020/flink/flink-checkpoints/rocksDB/events/e10dbe09aa2ecccb22737ddce8b4dc9f/chk-2/a28796de-978a-4f1a-8ff5-5f5c654b0ffc. > java.io.IOException: java.lang.InterruptedException > at org.apache.hadoop.ipc.Client.call(Client.java:1460) > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) > at com.sun.proxy.$Proxy12.delete(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy13.delete(Unknown Source) > at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707) > at > org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353) > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.InterruptedException > at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404) > at java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059) > at org.apache.hadoop.ipc.Client.call(Client.java:1454) > ... 31 more > 2017-09-29 14:21:35,054 INFO org.apache.flink.runtime.taskmanager.Task > - Attempting to cancel task KeyedCEPPatternOperator -> Flat Map > -> (Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee). > 2017-09-29 14:21:35,055 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) > (1/2) (8c6eff62d47c4a624a7554065bac36ee) switched from RUNNING to CANCELING. > {code} > Than same printed for 12-13 tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)