Hi Gordon, explicitly specifying the serialversionuid did the job, thank you! The failing task was latest_time -> (cassandra-map -> Sink: cassandra-active-sink, map_active_stream, map_history_stream) like the following:
val events = keyedstream .window(Time.seconds(20)) .maxBy("field").name("latest-time") CassandraSink.addSink( events.map(_.toCassandraTuple).name("cassandra-map").javaStream) .setQuery(...) .setClusterBuilder(...) .build().name("cassandra-sink") with cassandra-map, map_history_stream and map_active_stream, stateless map functions So, I guess the culprit was either the window/maxBy operator or the cassandra sink. I guess the window/maxBy operator, since the initialization of a keyed state is specified. I'm attaching the complete log. Cheers, Federico 2017-11-28 15:32 GMT+01:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>: > Hi Federico, > > It seems like the state cannot be restored because the class of the state > type (i.e., Event) had been modified since the savepoint, and therefore has > a conflicting serialVersionUID with whatever it is in the savepoint. > This can happen if Java serialization is used for some part of your state, > and the class of the written data was modified while a fixed > serialVersionUID was not explicitly specified for that class. > > To avoid this, you should explicitly set a serialVersionUID for the Event > class. > You can actually also do that now without losing state while also > incorporating the modifications you were trying to do for your updated job. > Explicitly declare the serialVersionUID of the Event class to what is was > before your modifications (i.e., 8728793377941765980, according to your > error log). > > One side question: are you experiencing this restore failure for one of > your custom operator states, or is this failing state part of some Flink > built-in operator / connector? > I’m asking just to have an idea of which Flink built-in operator / > connectors still use Java serialization for user state; ideally we would > want that to be completed removed in the future. > > Cheers, > Gordon > > > On 28 November 2017 at 10:02:19 PM, Federico D'Ambrosio ( > federico.dambro...@smartlab.ws) wrote: > > Hi, > > I recently had to do a code update of a long running Flink Stream job > (1.3.2) and on the restart from the savepoint I had to deal with: > > java.lang.IllegalStateException: Could not initialize keyed state backend. > > Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local > class incompatible: stream classdesc serial > VersionUID = 8728793377341765980, local class serialVersionUID = > -4253404384162522764 > > because I have changed a method used to convert the Event to a Cassandra > writable Tuple (in particular, I changed the return type from Tuple10 to > Tuple11, after adding a field). I reverted those changes back since it > wasn't much of a problem per se. > > Now, I understand the root cause of this issue and I wanted to ask if > there are any "best practices" to prevent this kind of issues, without > losing the state of the job, because of restarting it from the very > beginning. > > -- > Federico D'Ambrosio > > -- Federico D'Ambrosio
11/28/2017 14:53:28 latest_time -> (cassandra-map -> Sink: cassandra-active-sink, map_active_stream, map_history_stream)(1/1) switched to FAILED java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: lab.vardata.events.AirTrafficEventWithId; local class incompatible: stream classdesc serialVersionUID = 8728793377341765980, local class serialVersionUID = -4253404384162522764 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1484) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1334) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.read(TupleSerializerConfigSnapshot.java:64) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:510) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:315) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:271) at org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:198) at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:130) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:432) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397) at org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311) ... 6 more