See this thread: http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala
which mentioned FLINK-6398 <https://issues.apache.org/jira/browse/FLINK-6398> fixed in 1.2.2 / 1.3 On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <philip.doc...@physiq.com> wrote: > Dear Flink Users, > > I have a Flink (v1.2.1) process I left running for the last five days. It > aggregates a bit of state and exposes it via Queryable State. It ran > correctly for the first 3 days. There were no code changes or data > changes, but suddenly Queryable State got weird. The process logs the > current value of the queryable state, and from the logs I discerned that > the state was correctly being aggregated. However they Queryable State > that was returned was unable to be deserialized. Rather than the list of > longs I expect, instead I get 2 bytes (0x 57 02). It seemed quite clear > that the state in the Task Manager was not the state I was getting out of > Queryable State. > > > > I next reasoned that my data was being check pointed and possibly I could > restore. So I restarted the process to recover from a check point. At > this point the process fails with the following error > > > > java.lang.IllegalStateException: Could not initialize keyed state backend. > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initKeyedState(AbstractStreamOperator.java:293) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initializeState(AbstractStreamOperator.java:204) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeOperators(StreamTask.java:653) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > initializeState(StreamTask.java:640) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:246) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0 > > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > > at java.util.ArrayList.get(ArrayList.java:429) > > at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject( > MapReferenceResolver.java:42) > > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > > at org.apache.flink.api.java.typeutils.runtime.kryo. > KryoSerializer.deserialize(KryoSerializer.java:231) > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend. > readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend. > restorePartitionedState(HeapKeyedStateBackend.java:340) > > at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore( > HeapKeyedStateBackend.java:243) > > at org.apache.flink.streaming.runtime.tasks.StreamTask. > createKeyedStateBackend(StreamTask.java:788) > > at org.apache.flink.streaming.api.operators.AbstractStreamOperator. > initKeyedState(AbstractStreamOperator.java:284) > > ... 6 more > > > > > > This looks to me like Flink has serialized out state incorrectly. > > > > I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so > I could manually set the Kafka partition offset, I backed it up 5 days to > replay all the data and now everything is working fine again. > > > > However I’m more than a little worried. Was there a serialization bug > fixed in 1.3 ? I don’t believe there’s anything in my code that could be > causing such an issue, but is there something in my jobs that could make > something like this happen? Is this a known bug? The fact that it not > only results in bad data in the query but appears to take down my disaster > recovery plan makes me a bit nervous here. > > > > Thanks for your time, > > Phil > > >