Agree with Boyuan & Kyle. That PR is the problem, and we probably do not have adequate testing. We have a cultural understanding of not breaking encoded data forms but this is the encoded form of the TypeSerializer, and actually there are two problems.
1. When you have a serialized object that does not have the serialVersionUid explicitly set, the UID is generated based on many details that are irrelevant for binary compatibility. Any Java-serialized object that is intended for anything other than transient transmission *must* have a serialVersionUid set and an explicit serialized form. Else it is completely normal for it to break due to irrelevant changes. The serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep it the same forever, and any versioning or compat scheme exists within the single serialVersionUid. 2. In this case there was an actual change to the fields of the object stored, so you need to explicitly add the serialized form and also the ability to read from prior serialized forms. I believe explicitly setting the serialVersionUid to the original (and keeping it that way forever) and adding the ability to decode prior forms will regain the ability to read the snapshot. But also this seems like something that would be part of Flink best practice documentation since naive use of Java serialization often hits this problem. Kenn On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kcwea...@google.com> wrote: > This raises a few related questions from me: > > 1. Do we claim to support resuming Flink checkpoints made with previous > Beam versions? > 2. Does 1. require full binary compatibility between different versions of > runner internals like CoderTypeSerializer? > 3. Do we have tests for 1.? > Kenn > On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <boyu...@google.com> wrote: > >> https://github.com/apache/beam/pull/13240 seems suspicious to me. >> >> +Maximilian Michels <m...@maximilianmichels.com> Any insights here? >> >> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <antonio...@gmail.com> wrote: >> >>> Hi, >>> >>> I would like to followup with this question to see if there is a >>> solution/workaround for this issue. >>> >>> Thanks. >>> >>> Antonio. >>> >>> On 2020/12/19 18:33:48, Antonio Si <antonio...@gmail.com> wrote: >>> > Hi, >>> > >>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam >>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and >>> --fasterCopy=true. >>> > >>> > We run into this exception when we resume our pipeline: >>> > >>> > Caused by: java.io.InvalidClassException: >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local >>> class incompatible: stream classdesc serialVersionUID = >>> 5241803328188007316, local class serialVersionUID = 7247319138941746449 >>> > at >>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) >>> > at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) >>> > at >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) >>> > at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) >>> > at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) >>> > at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) >>> > at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) >>> > at >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) >>> > at >>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) >>> > at >>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) >>> > at >>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) >>> > >>> > It looks like it is not able to deserialize objects from our existing >>> checkpoints. Is there any way we could resume our v2.23 checkpoints by >>> v2.26? >>> > >>> > Thanks for any suggestions. >>> > >>> > Antonio. >>> > >>> >>