[ https://issues.apache.org/jira/browse/FLINK-10483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Elias Levy closed FLINK-10483. ------------------------------ Resolution: Invalid > Can't restore from a savepoint even with Allow Non Restored State enabled > ------------------------------------------------------------------------- > > Key: FLINK-10483 > URL: https://issues.apache.org/jira/browse/FLINK-10483 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Type Serialization System > Affects Versions: 1.4.2 > Reporter: Elias Levy > Priority: Major > > A trimmed streaming job fails a restore from a savepoint with an Unloadable > class for type serializer error, even though the case class in question has > been eliminated from the job and Allow Non Restored State is enabled. > We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, > one of the streams is processed by an async function, and the output of the > async function and the other original stream are consumed by a > CoProcessOperator, that intern emits Scala case class instances, that go into > a stateful ProcessFunction filter, and then into a sink. I.e. > {code:java} > source 1 -> async function --\ > |---> co process --> process > --> sink > source 2 --------------------------/ > {code} > I eliminated most of the DAG, leaving only the source 1 --> async function > portion of it. This removed the case class in question from the processing > graph. When I try to restore from the savepoint, even if Allow Non Restored > State is selected, the job fails to restore with the error "Deserialization > of serializer erroed". > This is the error being generated: > {noformat} > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > at java.io.ObjectInputStream.readSerialData(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 14 more > Caused by: java.lang.ClassNotFoundException: > com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) > ... 24 more > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 18 more > Caused by: java.lang.ClassNotFoundException: > com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) > ... 24 more > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error > during disposal of stream operator. > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > INFO org.apache.flink.runtime.taskmanager.Task - Source: > Kafka Topic -> Async Function (1/1) (1de078fb77acdd16b7e021fb3e70339f) > switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize operator state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.IOException: Unable to restore operator state > [_async_wait_operator_state_]. The previous serializer of the operator state > must be present; the serializer could have been removed from the classpath, > or its implementation have changed and could not be loaded. This is a > temporary restriction that will be fixed in future versions. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > ... 6 more > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)