> 有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。
确实是这样的,checkpoint 把 serializer 也 snapshot 了。 重新看了下 stack,应该是 deserialize `MessageId` 的时候出错的。你可以看下,pulsar 的版本是不是也有变动?有的话,这两个版本之间的某个 `MessageId` 实现是不是有字段变动?感觉你们应该用 `MessageId.toByteArray`。 On March 11, 2021 at 20:26:15, 赵 建云 (zhaojianyu...@outlook.com) wrote: 你好,我参考StatefulSinkWriterOperator重写了迁移方法,进行测试,还是遇到了上面的错误。 这个错误似乎发生在source的 initializeState之前。有没有可能flink在读取savepoint文件时,就做了反序列化,在这个过程中失败了。请问有什么办法能帮忙定位吗? 感谢~ 2021年3月11日 上午11:36,Kezhu Wang <kez...@gmail.com> 写道: 新的集群使用的是更新之后的 pulsar connector ?我看了下 pulsar-flink 的代码,这个更新对 state 是破坏性的。 + unionOffsetStates = stateStore.getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() { + }))); 解决方法 :? 1. 尝试通过 state-processor-api 重写下 state ? 2. 1.9 的 pulsar-flink connector 可以跑在 1.11 的 flink 上吗? 感觉后面还有不兼容的更新 new ListStateDescriptor<>( OFFSETS_STATE_NAME, - TypeInformation.of(new TypeHint<Tuple3<String, MessageId, String>>() { + TypeInformation.of(new TypeHint<Tuple3<TopicRange, MessageId, String>>() { }))); 不兼容的 state 更新,如果不好做 schema migration,可以试试 StatefulSinkWriterOperator 的方法:同时读取新旧 state + 只写新 state。 可以等 streamnative 的人确认下。 On March 11, 2021 at 10:43:53, 赵 建云 (zhaojianyu...@outlook.com) wrote: 社区各位大佬,我遇到了一个Flink版本升级后,savepoint不会正确恢复问题。 版本从1.9.3 升级到1.11.0或1.11.3. 连接器使用的pulsar的连接器。在Source的checkpoint数据结构如下 oldUnionOffsetStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<String, MessageId>>() { }))); oldUnionSubscriptionNameStates = stateStore.getUnionListState( new ListStateDescriptor<>( OFFSETS_STATE_NAME + "_subName", TypeInformation.of(new TypeHint<String>() { }))); 我在本地通过bin/flink savepoint 命令保存下1.9.3的state,然后停止flink 1.9.3的集群,启动1.11.3的flink集群,使用bin/flink run -s 参数恢复任务。 任务在启动后,会遇到下面的错误 2021-03-11 10:02:25 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_15541fe0b20ca4f01df784b178a8cc9d_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:283) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:156) ... 9 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createOperatorStateBackend(MemoryStateBackend.java:314) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:274) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734207224%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=MUHizKkvqUL0ebXqE5BXu2eewLmBRUuakj44pV0bZfg%3D&reserved=0> <http://java.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fjava.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=VU%2Bgy0%2B6u%2BVe0Qj1maU3nvijfSH9hBdQApSnfFjq9G8%3D&reserved=0>>.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:79) at com.esotericsoftware.kryo.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734217175%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=GOPIGWBHoGVAMJG0viu7heBsBPl%2BfUi5cwBwZcFe8zo%3D&reserved=0> <http://com.esotericsoftware.kryo.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0> >.Input.readVarInt(Input.java:355) at com.esotericsoftware.kryo.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0> <http://com.esotericsoftware.kryo.io <https://apac01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fcom.esotericsoftware.kryo.io%2F&data=04%7C01%7C%7C06cd7ca48a8f481f58d708d8e43ed127%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637510305734227129%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yk%2FHHiP8ik858sh%2FPHRpyaqBgYYylIgbJKAlyZVAS%2Fk%3D&reserved=0> >.Input.readInt(Input.java:350) at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeIntField.read(UnsafeCacheFields.java:46) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:191) at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:165) at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ... 15 more 请问题大佬们可以提供排查问题的办法或者解决方案吗? Jianyun8023 2021-3-11