Error restarting job from Savepoint

2021-05-26 Thread Yashwant Ganti
Hello,

We are facing an error restarting a job from a savepoint. We believe it is
because one of the common classes used across all of our jobs was changed
but there was no *serialVersionUID* assigned to the class. There error we
are facing is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:254)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .initializeState(AbstractStreamOperator.java:272)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .initializeStateAndOpenOperators(OperatorChain.java:425)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .lambda$beforeInvoke$2(StreamTask.java:535)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> .runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:525)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:565)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_
> (2/4) from any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:160)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.keyedStatedBackend(
> StreamTaskStateInitializerImpl.java:345)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.streamOperatorStateContext(
> StreamTaskStateInitializerImpl.java:163)
> ... 9 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore heap backend
> at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder
> .build(HeapKeyedStateBackendBuilder.java:115)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:559)
> at org.apache.flink.runtime.state.filesystem.FsStateBackend
> .createKeyedStateBackend(FsStateBackend.java:101)
> at org.apache.flink.runtime.state.StateBackend
> .createKeyedStateBackend(StateBackend.java:181)
> at org.apache.flink.streaming.api.operators.
> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(
> StreamTaskStateInitializerImpl.java:328)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:168)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:135)
> ... 11 more
> Caused by: java.io.InvalidClassException: com..**.***; local
> class incompatible: stream classdesc serialVersionUID = -
> 7317586767482317266, local class serialVersionUID = -8797204481428423223
> at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
> at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source
> )
> at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown
> Source)
> at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
> at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:194)
> at org.apache.beam.sdk.coders.SerializableCoder.decode(
> SerializableCoder.java:54)
> at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:669)
> at org.apache.beam.sdk.io.
> Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(
> Read.java:642)
> at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
> .deserialize(CoderTypeSerializer.java:118)
> at org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders
> .lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
> at org.apache.flink.runtime.state.
> KeyGroupPartitioner$PartitioningResultKeyGroupReader
> .readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
> at org.apache.flink.runtime.state.heap.HeapRestoreOperation
> .readKeyGroupS

Flink and Avro for state serialization

2021-06-10 Thread Yashwant Ganti
Hello all,

We are running some Flink jobs - we wrote the job in Beam but are using the
Flink Runner and are seeing the following error when restarting the job
from a Savepoint

Caused by: java.io.InvalidClassException: com.xxx.xxx; local class
> incompatible: stream classdesc serialVersionUID = -5544933308624767500,
> local class serialVersionUID = -7822035950742261370


Here is what happened

   - The Type in question is an Avro Type - so we have a
   *`PCollection` in the job.
   - We updated the Avro schema and by default the generated class will
   have a new serialVersionUID in Avro (the serialVersionUIDs above line up
   with the ones in the generated Avro classes)
   - We did not use any custom serializers for this type so I believe it
   would have been covered by Flink's POJO serializer (through Beam) and that
   is breaking because of the serialVersionUID change


I am wondering how to work around this without losing my savepoint. We are
going to try the following way and was wondering if the community had any
suggestions

   - Add flink-avro into the job jar as mentioned in
   
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#avro.
   I am not sure this would work because the original bytes were written out
   by the POJO serializer and that is probably going to be used for
   deserialization? There must be some record of which serializer wrote out
   the bytes and I am not sure how to override that
   - I also wanted to make sure for future use cases that including the
   avro jar on the classpath will only affect Avro types by default

Thanks,
Yash