Hi Nikola,

Which Flink version are you using? Can you describe step by step what you
are doing?

This error that you have should have been fixed in Flink 1.9.0+ [1], so if
you are using an older version of Flink, please first upgrade Flink -
without upgrading the job, then upgrade the connector.

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-11249

pon., 10 sie 2020 o 08:14 Tzu-Li (Gordon) Tai <tzuli...@apache.org>
napisaƂ(a):

> Hi Nikola,
>
> If I remember correctly, state is not compatible between
> flink-connector-kafka-0.11 and the universal flink-connector-kafka.
> Piotr (cc'ed) would probably know whats going on here.
>
> Cheers,
> Gordon
>
> On Mon, Aug 10, 2020 at 1:07 PM Nikola Hrusov <n.hru...@gmail.com> wrote:
>
>> Hello,
>>
>> We are trying to update our kafka connector dependency. So far we have
>> been using flink-connector-kafka-0.11 and we would like to update the
>> dependency to flink-connector-kafka.
>> However, when I try to restart the job with a savepoint I get the
>> following exception:
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for StreamSink_351727121bb1ca0d704092960989d25b_(1
>> /10) from any of the 1 provided restore options.
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
>> ... 5 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Failed when trying to restore operator state backend
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> ... 7 more
>> Caused by: java.io.IOException: Could not find class
>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$NextTransactionalIdHint'
>> in classpath.
>> at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:711)
>> at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:681)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:178)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:122)
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:125)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:170)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
>> at
>> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
>> at
>> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
>> at
>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
>> at
>> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
>> at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
>> ... 11 more
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
>> $NextTransactionalIdHint
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:708)
>> ... 23 more
>>
>>
>> It seems like the state is saved including the classes. Is it possible to
>> do a migration in some way where we can update the dependency and keep the
>> state?
>>
>> Regards
>> ,
>> Nikola Hrusov
>>
>

Reply via email to