Thanks for mentioning me here @Boyan.
In Beam there is no guarantee that checkpoints work across Beam
releases. Checkpoint compatibility can break due to a lot of reasons
(primarily DAG changes and serializer changes). Even though in this case
the serialization id might have guaranteed compatibility, we make
internal changes to Beam all the time. There is currently no process
that we follow to ensure compatibility.
I do want to note that Flink has a serializer migration strategy which
we currently do not leverage:
https://github.com/apache/beam/blob/d8966d640549932d7551461ff59fa1085730f768/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L182
However, this requires that in addition to the new serializer, the old
serializer is kept around. Flink will then migrate the state by reading
first with the old serializer and then subsequently writing with the new
one.
-Max
On 07.01.21 09:43, Jan Lukavský wrote:
Hi Antonio,
can you please create one?
Thanks,
Jan
On 1/6/21 10:31 PM, Antonio Si wrote:
Thanks for the information. Do we have a jira to track this issue or
do you want me to create a jira for this?
Thanks.
Antonio.
On 2021/01/06 17:59:47, Kenneth Knowles <[email protected]> wrote:
Agree with Boyuan & Kyle. That PR is the problem, and we probably do not
have adequate testing. We have a cultural understanding of not breaking
encoded data forms but this is the encoded form of the
TypeSerializer, and
actually there are two problems.
1. When you have a serialized object that does not have the
serialVersionUid explicitly set, the UID is generated based on many
details
that are irrelevant for binary compatibility. Any Java-serialized object
that is intended for anything other than transient transmission
*must* have
a serialVersionUid set and an explicit serialized form. Else it is
completely normal for it to break due to irrelevant changes. The
serialVersionUid has no mechanism for upgrade/downgrade so you *must*
keep
it the same forever, and any versioning or compat scheme exists
within the
single serialVersionUid.
2. In this case there was an actual change to the fields of the object
stored, so you need to explicitly add the serialized form and also the
ability to read from prior serialized forms.
I believe explicitly setting the serialVersionUid to the original (and
keeping it that way forever) and adding the ability to decode prior
forms
will regain the ability to read the snapshot. But also this seems like
something that would be part of Flink best practice documentation since
naive use of Java serialization often hits this problem.
Kenn
On Tue, Jan 5, 2021 at 4:30 PM Kyle Weaver <[email protected]> wrote:
This raises a few related questions from me:
1. Do we claim to support resuming Flink checkpoints made with previous
Beam versions?
2. Does 1. require full binary compatibility between different
versions of
runner internals like CoderTypeSerializer?
3. Do we have tests for 1.?
Kenn
On Tue, Jan 5, 2021 at 4:05 PM Boyuan Zhang <[email protected]> wrote:
https://github.com/apache/beam/pull/13240 seems suspicious to me.
+Maximilian Michels <[email protected]> Any insights here?
On Tue, Jan 5, 2021 at 8:48 AM Antonio Si <[email protected]>
wrote:
Hi,
I would like to followup with this question to see if there is a
solution/workaround for this issue.
Thanks.
Antonio.
On 2020/12/19 18:33:48, Antonio Si <[email protected]> wrote:
Hi,
We were using Beam v2.23 and recently, we are testing upgrade to
Beam
v2.26. For Beam v2.26, we are passing
--experiments=use_deprecated_read and
--fasterCopy=true.
We run into this exception when we resume our pipeline:
Caused by: java.io.InvalidClassException:
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
local
class incompatible: stream classdesc serialVersionUID =
5241803328188007316, local class serialVersionUID =
7247319138941746449
at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
at
org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
at
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
at
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
at
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
It looks like it is not able to deserialize objects from our
existing
checkpoints. Is there any way we could resume our v2.23
checkpoints by
v2.26?
Thanks for any suggestions.
Antonio.