Hi 非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈
感谢 在 2021-05-20 10:46:22,"Yun Tang" <myas...@live.com> 写道: >Hi > >BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 >state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer >的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 >RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data >stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。 > >祝好 >唐云 >________________________________ >From: 王炳焱 <15307491...@163.com> >Sent: Tuesday, May 18, 2021 20:02 >To: user-zh@flink.apache.org <user-zh@flink.apache.org> >Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report >an error > >我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下: > > >2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup > [] - The operator name Calc(select=[((CAST((log_info >get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME >_UTF-16LE'yyyy-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 >_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 >_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info >get_json_object2 _UTF-16LE'status') SEARCH >Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info >get_json_object2 _UTF-16LE'data.itemType') SEARCH >Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info >get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 >characters length limit and was truncated. >2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup > [] - The operator name >SourceConversion(table=[default_catalog.default_database.wkb_crm_order], >fields=[log_info, proctime]) exceeded the 80 characters length limit and was >truncated. >2021-05-14 22:02:44,879 ERROR >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - >Caught unexpected exception. >java.io.IOException: Could not find class >'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' > in classpath. >at >org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > [flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) > [flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) > [flink-dist_2.12-1.12.0.jar:1.12.0] >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >[flink-dist_2.12-1.12.0.jar:1.12.0] >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >[flink-dist_2.12-1.12.0.jar:1.12.0] >at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] >Caused by: java.lang.ClassNotFoundException: >org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot >at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242] >at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242] >at >org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242] >at >org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242] >at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242] >at >org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >... 27 more >2021-05-14 22:02:44,880 WARN >org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - >Exception while restoring keyed state backend for >KeyedProcessOperator_9d9aafaedca3e4d635d2a1193610351d_(1/1) from alternative >(1/1), will retry while more alternatives are available. >org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected >exception. >at >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:316) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:155) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) > [flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501) > [flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) > [flink-dist_2.12-1.12.0.jar:1.12.0] >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) >[flink-dist_2.12-1.12.0.jar:1.12.0] >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) >[flink-dist_2.12-1.12.0.jar:1.12.0] >at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] >Caused by: java.io.IOException: Could not find class >'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' > in classpath. >at >org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >... 16 more >Caused by: java.lang.ClassNotFoundException: >org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot >at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242] >at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242] >at >org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242] >at >org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at java.lang.Class.forName0(Native Method) ~[?:1.8.0_242] >at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_242] >at >org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:719) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >... 16 more > > >看了一下源码好像是BaseRowSerializer这个类改名了,现在任务状态无法恢复,有没有什么办法可以解决此问题,或者可以使状态回复的?求助 > > > > > >