Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
have adequate testing. We have a cultural understanding of not breaking
encoded data forms but this is the encoded form of the TypeSerializer, and
actually there are two problems.

1. When you have a serialized object that does not have the
serialVersionUid explicitly set, the UID is generated based on many details
that are irrelevant for binary compatibility. Any Java-serialized object
that is intended for anything other than transient transmission *must* have
a serialVersionUid set and an explicit serialized form. Else it is
completely normal for it to break due to irrelevant changes. The
serialVersionUid has no mechanism for upgrade/downgrade so you *must* keep
it the same forever, and any versioning or compat scheme exists within the
single serialVersionUid.
2. In this case there was an actual change to the fields of the object
stored, so you need to explicitly add the serialized form and also the
ability to read from prior serialized forms.

I believe explicitly setting the serialVersionUid to the original (and
keeping it that way forever) and adding the ability to decode prior forms
will regain the ability to read the snapshot. But also this seems like
something that would be part of Flink best practice documentation since
naive use of Java serialization often hits this problem.

Kenn

On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <kcwea...@google.com> wrote:

> This raises a few related questions from me:
>
> 1. Do we claim to support resuming Flink checkpoints made with previous
> Beam versions?
> 2. Does 1. require full binary compatibility between different versions of
> runner internals like CoderTypeSerializer?
>
3. Do we have tests for 1.?
>

Kenn


> On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <boyu...@google.com> wrote:
>
>> https://github.com/apache/beam/pull/13240 seems suspicious to me.
>>
>>  +Maximilian Michels <m...@maximilianmichels.com> Any insights here?
>>
>> On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <antonio...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I would like to followup with this question to see if there is a
>>> solution/workaround for this issue.
>>>
>>> Thanks.
>>>
>>> Antonio.
>>>
>>> On 2020/12/19 18:33:48, Antonio Si <antonio...@gmail.com> wrote:
>>> > Hi,
>>> >
>>> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
>>> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
>>> --fasterCopy=true.
>>> >
>>> > We run into this exception when we resume our pipeline:
>>> >
>>> > Caused by: java.io.InvalidClassException:
>>> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
>>> class incompatible: stream classdesc serialVersionUID =
>>> 5241803328188007316, local class serialVersionUID = 7247319138941746449
>>> >       at
>>> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>>> >       at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
>>> >       at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
>>> >       at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
>>> >       at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
>>> >       at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
>>> >       at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>>> >       at
>>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>>> >       at
>>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>>> >       at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>>> >       at
>>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>>> >
>>> > It looks like it is not able to deserialize objects from our existing
>>> checkpoints. Is there any way we could resume our v2.23 checkpoints by
>>> v2.26?
>>> >
>>> > Thanks for any suggestions.
>>> >
>>> > Antonio.
>>> >
>>>
>>

Reply via email to