社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。

版本从1.9.3 升级到1.11.0或1.11.3.
连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下

oldUnionOffsetStates = stateStore.getUnionListState(
        new ListStateDescriptor<>(
                OFFSETS_STATE_NAME,
                TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() {
                })));
oldUnionSubscriptionNameStates =
        stateStore.getUnionListState(
                new ListStateDescriptor<>(
                        OFFSETS_STATE_NAME + "_subName",
                        TypeInformation.of(new TypeHint<String>() {
                        })));
我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 
1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。
任务在启动后,会遇到下面的错误


2021-03-11 10:02:25
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any 
of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when 
trying to restore operator state backend
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
    at 
org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.io<http://java.io>.EOFException: No more bytes left.
    at 
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79)
    at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readVarInt(Input.java:355)
    at 
com.esotericsoftware.kryo.io<http://com.esotericsoftware.kryo.io>.Input.readInt(Input.java:350)
    at 
com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46)
    at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
    at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
    at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
    at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191)
    at 
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165)
    at 
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
    ... 15 more

请问题大佬们可以提供排查问题的办法或者解决方案吗?


Jianyun8023
2021-3-11

回复