Thanks for the information. Do we have a jira to track this issue or do you want me to create a jira for this?
Thanks. Antonio. On 2021/01/06 17:59:47, Kenneth Knowles <[email protected]> wrote: > Agree with Boyuan & Kyle. That PR is the problem, and we probably do not > have adequate testing. We have a cultural understanding of not breaking > encoded data forms but this is the encoded form of the TypeSerializer, and > actually there are two problems. > > 1. When you have a serialized object that does not have the > serialVersionUid explicitly set, the UID is generated based on many details > that are irrelevant for binary compatibility. Any Java-serialized object > that is intended for anything other than transient transmission *must* have > a serialVersionUid set and an explicit serialized form. Else it is > completely normal for it to break due to irrelevant changes. The > serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep > it the same forever, and any versioning or compat scheme exists within the > single serialVersionUid. > 2. In this case there was an actual change to the fields of the object > stored, so you need to explicitly add the serialized form and also the > ability to read from prior serialized forms. > > I believe explicitly setting the serialVersionUid to the original (and > keeping it that way forever) and adding the ability to decode prior forms > will regain the ability to read the snapshot. But also this seems like > something that would be part of Flink best practice documentation since > naive use of Java serialization often hits this problem. > > Kenn > > On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <[email protected]> wrote: > > > This raises a few related questions from me: > > > > 1. Do we claim to support resuming Flink checkpoints made with previous > > Beam versions? > > 2. Does 1. require full binary compatibility between different versions of > > runner internals like CoderTypeSerializer? > > > 3. Do we have tests for 1.? > > > > Kenn > > > > On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <[email protected]> wrote: > > > >> https://github.com/apache/beam/pull/13240 seems suspicious to me. > >> > >> +Maximilian Michels <[email protected]> Any insights here? > >> > >> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <[email protected]> wrote: > >> > >>> Hi, > >>> > >>> I would like to followup with this question to see if there is a > >>> solution/workaround for this issue. > >>> > >>> Thanks. > >>> > >>> Antonio. > >>> > >>> On 2020/12/19 18:33:48, Antonio Si <[email protected]> wrote: > >>> > Hi, > >>> > > >>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam > >>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read > >>> and > >>> --fasterCopy=true. > >>> > > >>> > We run into this exception when we resume our pipeline: > >>> > > >>> > Caused by: java.io.InvalidClassException: > >>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local > >>> class incompatible: stream classdesc serialVersionUID = > >>> 5241803328188007316, local class serialVersionUID = 7247319138941746449 > >>> > at > >>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > >>> > at > >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942) > >>> > at > >>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808) > >>> > at > >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099) > >>> > at > >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > >>> > at > >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) > >>> > at > >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > >>> > at > >>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > >>> > at > >>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) > >>> > at > >>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) > >>> > at > >>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) > >>> > > >>> > It looks like it is not able to deserialize objects from our existing > >>> checkpoints. Is there any way we could resume our v2.23 checkpoints by > >>> v2.26? > >>> > > >>> > Thanks for any suggestions. > >>> > > >>> > Antonio. > >>> > > >>> > >> >
