See this thread:
http://search-hadoop.com/m/Flink/VkLeQm2nZm1Wa7Ny1?subj=Re+Painful+KryoException+java+lang+IndexOutOfBoundsException+on+Flink+Batch+Api+scala

which mentioned FLINK-6398
<https://issues.apache.org/jira/browse/FLINK-6398> fixed in 1.2.2 / 1.3

On Mon, Jun 19, 2017 at 5:53 PM, Philip Doctor <philip.doc...@physiq.com>
wrote:

> Dear Flink Users,
>
> I have a Flink (v1.2.1) process I left running for the last five days.  It
> aggregates a bit of state and exposes it via Queryable State.  It ran
> correctly for the first 3 days.  There were no code changes or data
> changes, but suddenly Queryable State got weird.  The process logs the
> current value of the queryable state, and from the logs I discerned that
> the state was correctly being aggregated.  However they Queryable State
> that was returned was unable to be deserialized.  Rather than the list of
> longs I expect, instead I get 2 bytes (0x 57 02).  It seemed quite clear
> that the state in the Task Manager was not the state I was getting out of
> Queryable State.
>
>
>
> I next reasoned that my data was being check pointed and possibly I could
> restore.  So I restarted the process to recover from a check point.  At
> this point the process fails with the following error
>
>
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:293)
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initializeState(AbstractStreamOperator.java:204)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeOperators(StreamTask.java:653)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> initializeState(StreamTask.java:640)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:246)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
>
>     at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IndexOutOfBoundsException: Index: 28, Size: 0
>
>     at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>
>     at java.util.ArrayList.get(ArrayList.java:429)
>
>     at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(
> MapReferenceResolver.java:42)
>
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>
>     at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:231)
>
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
>
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.
> restorePartitionedState(HeapKeyedStateBackend.java:340)
>
>     at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(
> HeapKeyedStateBackend.java:243)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.
> createKeyedStateBackend(StreamTask.java:788)
>
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> initKeyedState(AbstractStreamOperator.java:284)
>
>     ... 6 more
>
>
>
>
>
> This looks to me like Flink has serialized out state incorrectly.
>
>
>
> I was running Flink 1.2.1, I upgraded to Flink 1.3 after this happened so
> I could manually set the Kafka partition offset, I backed it up 5 days to
> replay all the data and now everything is working fine again.
>
>
>
> However I’m more than a little worried.  Was there a serialization bug
> fixed in 1.3 ?  I don’t believe there’s anything in my code that could be
> causing such an issue, but is there something in my jobs that could make
> something like this happen?  Is this a known bug?  The fact that it not
> only results in bad data in the query but appears to take down my disaster
> recovery plan makes me a bit nervous here.
>
>
>
> Thanks for your time,
>
> Phil
>
>
>

Reply via email to