[ https://issues.apache.org/jira/browse/BEAM-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130913#comment-17130913 ]
Ismaël Mejía commented on BEAM-10223: ------------------------------------- Sorry which one is the github related pull request? > AvroCoder has references to the encoded/decoded class > ----------------------------------------------------- > > Key: BEAM-10223 > URL: https://issues.apache.org/jira/browse/BEAM-10223 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.22.0 > Reporter: Ivan San Jose > Priority: P2 > > Hi, may be the JIRA issue title is not so much descriptive, but I couldn't > find anything better sorry. > Let me explain the problem: > When using Flink as runner, Beam coders are wrapped into Flink's > TyperSerializers, and, according to > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95 > , those coders are java serialized inside the checkpoint. > The problem is that coders have a reference to the class which is being > coded/decoded (Class<T>), so when the coder is serialized, is done along the > POJO model, using Java serialization. > Why this is a problem? > This is a problem is you are working with checkpoints, because checkpoint > restoring will be broken as soon as you change anything in POJO model (even > you are using a coder which supports schema evolution and the change is > following its evolution rules): > {code} > Caused by: java.io.InvalidClassException: > internal.model.dimension.POJODimension; local class incompatible: stream > classdesc serialVersionUID = -223148029368332375, local class > serialVersionUID = 4489864664852536553 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750) > at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116) > at > org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > at > org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119) > at > org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83) > ... 13 more > {code} > So, in order to be able to use existing checkpoints after compatible changes > has been done in the Java model, references to Class<T> in Beam Coder should > be removed. > Note that this JIRA ticket is only referencing to AvroCoder, and is the only > one fixed into GitHub related pull request. -- This message was sent by Atlassian Jira (v8.3.4#803005)