Hi Ganti,

If you could ensure that newer class could keep backwards compatibility as 
previous class, you can try to set serialVesionUID explicitly of current class 
to -7317586767482317266.

If you want to avoid such issue later, you must set the serialVesionUID 
explicitly first if not using customized serializer for those classes. Another 
better solution is to ensure the class backwards compatibility with customized 
serializer or leverage apache avro.

You could refer to [1] for more details.

[1] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

Best
Yun Tang
________________________________
From: Yashwant Ganti <yashwan...@gmail.com>
Sent: Thursday, May 27, 2021 1:14
To: user@flink.apache.org <user@flink.apache.org>
Subject: Error restarting job from Savepoint

Hello,

We are facing an error restarting a job from a savepoint. We believe it is 
because one of the common classes used across all of our jobs was changed but 
there was no serialVersionUID assigned to the class. There error we are facing 
is

java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for SplittableDoFnOperator_60af72bbf6b3989cb3e849280faa23d8_(2/4) from 
any of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore heap backend
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:115)
    at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:559)
    at 
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedStateBackend(FsStateBackend.java:101)
    at 
org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:181)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 11 more
Caused by: java.io<http://java.io>.InvalidClassException: 
com.****.******.*******; local class incompatible: stream classdesc 
serialVersionUID = -7317586767482317266, local class serialVersionUID = 
-8797204481428423223
    at java.base/java.io<http://java.io>.ObjectStreamClass.initNonProxy(Unknown 
Source)
    at 
java.base/java.io<http://java.io>.ObjectInputStream.readNonProxyDesc(Unknown 
Source)
    at 
java.base/java.io<http://java.io>.ObjectInputStream.readClassDesc(Unknown 
Source)
    at 
java.base/java.io<http://java.io>.ObjectInputStream.readOrdinaryObject(Unknown 
Source)
    at java.base/java.io<http://java.io>.ObjectInputStream.readObject0(Unknown 
Source)
    at 
java.base/java.io<http://java.io>.ObjectInputStream.defaultReadFields(Unknown 
Source)
    at 
java.base/java.io<http://java.io>.ObjectInputStream.readSerialData(Unknown 
Source)
    at 
java.base/java.io<http://java.io>.ObjectInputStream.readOrdinaryObject(Unknown 
Source)
    at java.base/java.io<http://java.io>.ObjectInputStream.readObject0(Unknown 
Source)
    at java.base/java.io<http://java.io>.ObjectInputStream.readObject(Unknown 
Source)
    at java.base/java.io<http://java.io>.ObjectInputStream.readObject(Unknown 
Source)
    at 
org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:194)
    at 
org.apache.beam.sdk.coders.SerializableCoder.decode(SerializableCoder.java:54)
    at 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:669)
    at 
org.apache.beam.sdk.io<http://org.apache.beam.sdk.io>.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceRestrictionCoder.decode(Read.java:642)
    at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:118)
    at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:77)
    at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:289)
    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:323)
    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:285)
    at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:172)
    at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:112)
    ... 17 more

The change was to have the failing class implement serializable. My questions 
are

  *   What are our options now to get this job to restart? In the non 
production environments we can delete the savepoint but we really don't want to 
do that in production
  *   Any best practices/guidance to follow to avoid such issues in the future. 
Should we have just implemented a serialVersionUID for the class? Should we/can 
we write custom serializers/deserializers for this class. The class is just a 
factory which creates connection objects so we normally don't think twice in 
standard java applications

Any help is appreciated.

Thanks!

Reply via email to