[ 
https://issues.apache.org/jira/browse/FLINK-10483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy closed FLINK-10483.
------------------------------
    Resolution: Invalid

> Can't restore from a savepoint even with Allow Non Restored State enabled
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10483
>                 URL: https://issues.apache.org/jira/browse/FLINK-10483
>             Project: Flink
>          Issue Type: Bug
>          Components: State Backends, Checkpointing, Type Serialization System
>    Affects Versions: 1.4.2
>            Reporter: Elias Levy
>            Priority: Major
>
> A trimmed streaming job fails a restore from a savepoint with an Unloadable 
> class for type serializer error, even though the case class in question has 
> been eliminated from the job and Allow Non Restored State is enabled.
> We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, 
> one of the streams is processed by an async function, and the output of the 
> async function and the other original stream are consumed by a 
> CoProcessOperator, that intern emits Scala case class instances, that go into 
> a stateful ProcessFunction filter, and then into a sink.  I.e.
> {code:java}
> source 1 -> async function --\
>                                                |---> co process --> process 
> --> sink
> source 2 --------------------------/
> {code}
> I eliminated most of the DAG, leaving only the source 1 --> async function 
> portion of it.  This removed the case class in question from the processing 
> graph.  When I try to restore from the savepoint, even if Allow Non Restored 
> State is selected, the job fails to restore with the error "Deserialization 
> of serializer erroed".
> This is the error being generated:
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>       at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>       at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>       at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>       at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>       at java.io.ObjectInputStream.readObject0(Unknown Source)
>       at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>       at java.io.ObjectInputStream.readSerialData(Unknown Source)
>       at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>       at java.io.ObjectInputStream.readObject0(Unknown Source)
>       at java.io.ObjectInputStream.readObject(Unknown Source)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>       ... 14 more
> Caused by: java.lang.ClassNotFoundException: 
> com.somewhere.TestJob$$anon$13$$anon$3
>       at java.net.URLClassLoader.findClass(Unknown Source)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Unknown Source)
>       at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>       at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>       ... 24 more
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>       at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
>       at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>       at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>       at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>       at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>       at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>       at java.io.ObjectInputStream.readObject0(Unknown Source)
>       at java.io.ObjectInputStream.readObject(Unknown Source)
>       at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>       ... 18 more
> Caused by: java.lang.ClassNotFoundException: 
> com.somewhere.TestJob$$anon$13$$anon$3
>       at java.net.URLClassLoader.findClass(Unknown Source)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at 
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>       at java.lang.ClassLoader.loadClass(Unknown Source)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Unknown Source)
>       at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>       at 
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>       ... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error 
> during disposal of stream operator.
> java.lang.NullPointerException
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: 
> Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) 
> switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state backend.
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state 
> [_async_wait_operator_state_]. The previous serializer of the operator state 
> must be present; the serializer could have been removed from the classpath, 
> or its implementation have changed and could not be loaded. This is a 
> temporary restriction that will be fixed in future versions.
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>       ... 6 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to