Hi Gordon,

explicitly specifying the serialversionuid did the job, thank you! The
failing task was latest_time -> (cassandra-map -> Sink:
cassandra-active-sink, map_active_stream, map_history_stream) like the
following:

val events = keyedstream
  .window(Time.seconds(20))
  .maxBy("field").name("latest-time")

CassandraSink.addSink(
   events.map(_.toCassandraTuple).name("cassandra-map").javaStream)
.setQuery(...)
.setClusterBuilder(...)
.build().name("cassandra-sink")

with cassandra-map, map_history_stream and map_active_stream, stateless map
functions
So, I guess the culprit was either the window/maxBy operator or the
cassandra sink. I guess the window/maxBy operator, since the initialization
of a keyed state is specified.
I'm attaching the complete log.

Cheers,
Federico


2017-11-28 15:32 GMT+01:00 Tzu-Li (Gordon) Tai <tzuli...@apache.org>:

> Hi Federico,
>
> It seems like the state cannot be restored because the class of the state
> type (i.e., Event) had been modified since the savepoint, and therefore has
> a conflicting serialVersionUID with whatever it is in the savepoint.
> This can happen if Java serialization is used for some part of your state,
> and the class of the written data was modified while a fixed
> serialVersionUID was not explicitly specified for that class.
>
> To avoid this, you should explicitly set a serialVersionUID for the Event
> class.
> You can actually also do that now without losing state while also
> incorporating the modifications you were trying to do for your updated job.
> Explicitly declare the serialVersionUID of the Event class to what is was
> before your modifications (i.e., 8728793377941765980, according to your
> error log).
>
> One side question: are you experiencing this restore failure for one of
> your custom operator states, or is this failing state part of some Flink
> built-in operator / connector?
> I’m asking just to have an idea of which Flink built-in operator /
> connectors still use Java serialization for user state; ideally we would
> want that to be completed removed in the future.
>
> Cheers,
> Gordon
>
>
> On 28 November 2017 at 10:02:19 PM, Federico D'Ambrosio (
> federico.dambro...@smartlab.ws) wrote:
>
> Hi,
>
> I recently had to do a code update of a long running Flink Stream job
> (1.3.2) and on the restart from the savepoint I had to deal with:
>
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>
> Caused by: java.io.InvalidClassException: lab.vardata.events.Event; local
> class incompatible: stream classdesc serial
> VersionUID = 8728793377341765980, local class serialVersionUID =
> -4253404384162522764
>
> because I have changed a method used to convert the Event to a Cassandra
> writable Tuple (in particular, I changed the return type from Tuple10 to
> Tuple11, after adding a field). I reverted those changes back since it
> wasn't much of a problem per se.
>
> Now, I understand the root cause of this issue and I wanted to ask if
> there are any "best practices" to prevent this kind of issues, without
> losing the state of the job, because of restarting it from the very
> beginning.
>
> --
> Federico D'Ambrosio
>
>


-- 
Federico D'Ambrosio
11/28/2017 14:53:28     latest_time -> (cassandra-map -> Sink: 
cassandra-active-sink, map_active_stream, map_history_stream)(1/1) switched to 
FAILED
java.lang.IllegalStateException: Could not initialize keyed state backend.
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: 
lab.vardata.events.AirTrafficEventWithId; local class incompatible: stream 
classdesc serialVersionUID = 8728793377341765980, local class serialVersionUID 
= -4253404384162522764
        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
        at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1484)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1334)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:305)
        at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot.read(TupleSerializerConfigSnapshot.java:64)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:510)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:315)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:271)
        at 
org.apache.flink.runtime.state.KeyedBackendStateMetaInfoSnapshotReaderWriters$KeyedBackendStateMetaInfoReaderV3.readStateMetaInfo(KeyedBackendStateMetaInfoSnapshotReaderWriters.java:198)
        at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:130)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:432)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
        ... 6 more

Reply via email to