[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638834#comment-16638834
 ] 

Elias Levy commented on FLINK-10493:
------------------------------------

This issue was noted [~tzulitai] back in June 2017 
[here|https://github.com/apache/flink/pull/4090#issuecomment-307109692].

> Macro generated CaseClassSerializer considered harmful
> ------------------------------------------------------
>
>                 Key: FLINK-10493
>                 URL: https://issues.apache.org/jira/browse/FLINK-10493
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>    Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 
> 1.5.4
>            Reporter: Elias Levy
>            Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>       at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>       at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>       at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>       at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>       at java.io.ObjectInputStream.readObject0(Unknown Source)
>       at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>       at java.io.ObjectInputStream.readSerialData(Unknown Source)
>       at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>       at java.io.ObjectInputStream.readObject0(Unknown Source)
>       at java.io.ObjectInputStream.readObject(Unknown Source)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>       ... 14 more
> Caused by: java.lang.ClassNotFoundException: 
> com.somewhere.TestJob$$anon$13$$anon$3
>       at java.net.URLClassLoader.findClass(Unknown Source)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Unknown Source)
>       at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>       at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>       ... 24 more
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>       at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
>       at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>       at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>       at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>       at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>       at java.io.ObjectInputStream.readObject0(Unknown Source)
>       at java.io.ObjectInputStream.readObject(Unknown Source)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>       ... 18 more
> Caused by: java.lang.ClassNotFoundException: 
> com.somewhere.TestJob$$anon$13$$anon$3
>       at java.net.URLClassLoader.findClass(Unknown Source)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Unknown Source)
>       at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>       at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>       ... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error 
> during disposal of stream operator.
> java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: 
> Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) 
> switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state 
> [_async_wait_operator_state_]. The previous serializer of the operator state 
> must be present; the serializer could have been removed from the classpath, 
> or its implementation have changed and could not be loaded. This is a 
> temporary restriction that will be fixed in future versions.
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>       ... 6 more
> {noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to