[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355676#comment-16355676 ] Shashank Agarwal commented on FLINK-7760: - Sure I’ll checkout by weekend. I’ll be back by then. On Wed, 7 Feb 2018 at 10:03 PM, Aljoscha Krettek (JIRA) > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355668#comment-16355668 ] Aljoscha Krettek commented on FLINK-7760: - I believe [~StephanEwen] found the reason for this bug. The issue is that the read code path does not always make sure to read the user-code function fully, which can corrupt the stream. I'll push a fix to both 1.5 and 1.4.1 branches. [~shashank734] after this is in, could you retry if it actually fixes your problem? > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309634#comment-16309634 ] Shashank Agarwal commented on FLINK-7760: - Hi [~kkl0u] , I have checked again and facing the same issue while restore. So unable to checkpoint in Rocksdb and in fsStateBackend after savepoint during restore facing this issue. So unable to restore my state in any case. It's not printing any extra logs in debugging mode also. Please guide I am using CEP, Yarn, HDFS, Scala. Otherwise, i have to use some DB for the state which I don't want. Thanks > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0, 1.5.0, 1.4.1 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305614#comment-16305614 ] Shashank Agarwal commented on FLINK-7760: - No I have’t changed the version or code. I have done savepoint in 1.4.0 and restored also in 1.4.0 , I am using filesystemstatebackend . On Thu, 28 Dec 2017 at 8:32 PM, Kostas Kloudas (JIRA) > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0, 1.5.0, 1.4.1 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16305505#comment-16305505 ] Kostas Kloudas commented on FLINK-7760: --- This looks like either a backwards compatibility issue, or like you changed your code between runs. Is any of the above true? > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0, 1.5.0, 1.4.1 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202238#comment-16202238 ] Kostas Kloudas commented on FLINK-7760: --- No [~shashank734] it shouldn't. I am just saying this as a side note. > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202173#comment-16202173 ] Shashank Agarwal commented on FLINK-7760: - [~kkl0u] Yes that is redundent. Actually i am using this stream in some other places also. Is this a problem ?? cause logically it shouldn't ?? [~aljoscha] Restart strategy also tried automatically and failed. I also tried using same jar and failed again and again. I have tried different jar also but failed. > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Sub-task > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202112#comment-16202112 ] Aljoscha Krettek commented on FLINK-7760: - Are you using exactly the same jar when trying to recover the job? > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202073#comment-16202073 ] Kostas Kloudas commented on FLINK-7760: --- Btw, although this may not be a problem, you {{keyBy}} twice in your code, which is redundant: {code} val stream = env.addSource(kafka10).keyBy(_._someKey.getOrElse(0)) // ONCE val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => (x._someKey.getOrElse(0), x._someSubKey.getOrElse(0))), successOrderPattern) //TWICE {code} > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202018#comment-16202018 ] Shashank Agarwal commented on FLINK-7760: - Ho [~kkl0u], Hi [~kkl0u] , Actually it's too complicated with kafka streams and custom serializer. Above steps are correct but still I try to put some code. I have modified parameter names and some things in code. If you find any issue let me know. {code} object Job { def main(args: Array[String]) { // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val propertiesFile = getClass.getClassLoader.getResource("xyz.properties").getPath val parameter = ParameterTool.fromPropertiesFile(propertiesFile) env.getConfig.setGlobalJobParameters(parameter) env.setStateBackend(new FsStateBackend(parameter.get("hdfsSnapshotPath"))) // enable fault-tolerance env.enableCheckpointing(1000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // enable restarts env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L)) val properties = new Properties() properties.setProperty("bootstrap.servers", parameter.get("kafkaUrl")) properties.setProperty("group.id", parameter.get("kafkaGroupId")) val kafka10 = new FlinkKafkaConsumer010[RawSignal](parameter.get("kafkaBundleName"), new SignalDeserializationSchema(), properties) val stream = env.addSource(kafka10).keyBy(_._someKey.getOrElse(0)) //Creating a pattern for successful event val successOrderPattern = Pattern.begin[RawSignal]("someEvent"). .followedBy("otherEvent") val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => (x._someKey.getOrElse(0), x._someSubKey.getOrElse(0))), successOrderPattern) val ordersStream: DataStream[TransactionSignal] = successOrderPatternStream.select(new TransactionPatternFlatMap) //Put Ip count in the stream with maintaining the state val ipStateStream = ordersStream.keyBy((x) => (x._someKey, x._deviceIp)) .mapWithState((in: OrderSignal, ipState: Option[Int]) => { if(!in._deviceIp.equalsIgnoreCase(parameter.get("defaultIp"))) { val newCount = ipState.getOrElse(0) + 1 val output = in.copy(_numOfOrderSameIp = newCount) (output, Some(newCount)) } else { (in, Some(0)) } } ) ipStateStream.print env.execute("Thirdwatch Mitra") {code} Here is the kafka deserialiser i am using SignalDeserializationSchema {code} import RawSignal import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema import org.json4s.DefaultFormats import org.json4s.jackson.JsonMethods.parse /** * Created by shashank on 13/01/17. * * Deserialize raw json string from kafka to Raw signal object. */ class SignalDeserializationSchema extends AbstractDeserializationSchema[RawSignal] { implicit lazy val formats = DefaultFormats override def deserialize(message: Array[Byte]): RawSignal = { parse(new String(message)).extract[RawSignal] } override def isEndOfStream(nextElement: RawSignal): Boolean = false } {code} and RawSignal Example class... {code} case class RawSignal(name: Option[String], email: Option[String], UserId: Option[String]) {code} > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) >
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16201976#comment-16201976 ] Kostas Kloudas commented on FLINK-7760: --- Hi [~shashank734] Could you provide the code for your conditions in the Pattern? The reason is that the problem seems to appear when trying to deserialize them. > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal >Priority: Blocker > Fix For: 1.4.0 > > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198445#comment-16198445 ] Kostas Kloudas commented on FLINK-7760: --- I will check the state related issues all together this week. This may be related also to this: https://issues.apache.org/jira/browse/FLINK-7756 > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.
[ https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198437#comment-16198437 ] Aljoscha Krettek commented on FLINK-7760: - [~kkl0u] Is this related to any of the other recent issues with CEP state? > Restore failing from external checkpointing metadata. > - > > Key: FLINK-7760 > URL: https://issues.apache.org/jira/browse/FLINK-7760 > Project: Flink > Issue Type: Bug > Components: CEP, State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, Flink 1.3.2, HDFS, FsStateBackend >Reporter: Shashank Agarwal > > My job failed due to failure of cassandra. I have enabled > ExternalizedCheckpoints. But when job tried to restore from that checkpoint > it's failing continuously with following error. > {code:java} > 2017-10-04 09:39:20,611 INFO org.apache.flink.runtime.taskmanager.Task > - KeyedCEPPatternOperator -> Map (1/2) > (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.StreamCorruptedException: invalid type code: 00 > at > java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519) > at > java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553) > at > java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455) > at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) > at > org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) > at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852) > at > org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) > ... 6 more > {code} > I have tried to start new job also after failure with parameter {code:java} > -s [checkpoint meta data path]{code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)