[ 
https://issues.apache.org/jira/browse/BEAM-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17130913#comment-17130913
 ] 

Ismaël Mejía commented on BEAM-10223:
-------------------------------------

Sorry which one is the github related pull request?

> AvroCoder has references to the encoded/decoded class
> -----------------------------------------------------
>
>                 Key: BEAM-10223
>                 URL: https://issues.apache.org/jira/browse/BEAM-10223
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.22.0
>            Reporter: Ivan San Jose
>            Priority: P2
>
> Hi, may be the JIRA issue title is not so much descriptive, but I couldn't 
> find anything better sorry.
> Let me explain the problem:
> When using Flink as runner, Beam coders are wrapped into Flink's 
> TyperSerializers, and, according to 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java#L95
>  , those coders are java serialized inside the checkpoint.
> The problem is that coders have a reference to the class which is being 
> coded/decoded (Class<T>), so when the coder is serialized, is done along the 
> POJO model, using Java serialization.
> Why this is a problem?
> This is a problem is you are working with checkpoints, because checkpoint 
> restoring will be broken as soon as you change anything in POJO model (even 
> you are using a coder which supports schema evolution and the change is 
> following its evolution rules):
> {code}
> Caused by: java.io.InvalidClassException: 
> internal.model.dimension.POJODimension; local class incompatible: stream 
> classdesc serialVersionUID = -223148029368332375, local class 
> serialVersionUID = 4489864664852536553
>       at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1884)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
>       at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1715)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>       at 
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>       at 
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>       ... 13 more
> {code}
> So, in order to be able to use existing checkpoints after compatible changes 
> has been done in the Java model, references to Class<T> in Beam Coder should 
> be removed.
> Note that this JIRA ticket is only referencing to AvroCoder, and is the only 
> one fixed into GitHub related pull request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to