Hi Yashwant Ganti,

> Caused by: java.io.InvalidClassException: com.****.******.*******; local 
> class incompatible: stream classdesc serialVersionUID = -7317586767482317266, 
> local class serialVersionUID = -8797204481428423223
1. Please try this way: find the com.****.******.*******, add `private
static final long serialVersionUID = -7317586767482317266` (which is
old serialVersionUID mentioned in the above error message). Then
repackage your jar, restart job with new jar.
2. Serializable classes must define a Serial Version UID, please
see(https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization).
Please add a serialVersionUID for Serializable classes, especially
those which would take part in checkpointing/savepointing.

Best,
JING ZHANG

Yashwant Ganti <yashwan...@gmail.com> 于2021年5月27日周四 上午1:15写道:
>
> Hello,
>
> We are facing an error restarting a job from a savepoint. We believe it is 
> because one of the common classes used across all of our jobs was changed but 
> there was no serialVersionUID assigned to the class. There error we are 
> facing is
>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>     at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
>>     at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>     at java.base/java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
>> state backend for 
>> SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from any of 
>> the 1 provided restore options.
>>     at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>>     ... 9 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
>> when trying to restore heap backend
>>     at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
>>     at 
>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559)
>>     at 
>> org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101)
>>     at 
>> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>>     at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>>     at 
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>     ... 11 more
>> Caused by: java.io.InvalidClassException: com.****.******.*******; local 
>> class incompatible: stream classdesc serialVersionUID = 
>> -7317586767482317266, local class serialVersionUID = -8797204481428423223
>>     at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readSerialData(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>>     at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
>>     at 
>> org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194)
>>     at 
>> org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54)
>>     at 
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669)
>>     at 
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642)
>>     at 
>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118)
>>     at 
>> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
>>     at 
>> org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
>>     at 
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
>>     at 
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
>>     at 
>> org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
>>     at 
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
>>     ... 17 more
>
>
> The change was to have the failing class implement serializable. My questions 
> are
>
> What are our options now to get this job to restart? In the non production 
> environments we can delete the savepoint but we really don't want to do that 
> in production
> Any best practices/guidance to follow to avoid such issues in the future. 
> Should we have just implemented a serialVersionUID for the class? Should 
> we/can we write custom serializers/deserializers for this class. The class is 
> just a factory which creates connection objects so we normally don't think 
> twice in standard java applications
>
> Any help is appreciated.
>
> Thanks!

Reply via email to