Re:Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error
Hi 非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈 感谢 在 2021-05-20 10:46:22,"Yun Tang" 写道: >Hi > >BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 >state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer >的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 >RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data >stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。 > >祝好 >唐云 > >From: 王炳焱 <15307491...@163.com> >Sent: Tuesday, May 18, 2021 20:02 >To: user-zh@flink.apache.org >Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report >an error > >我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下: > > >2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup >[] - The operator name Calc(select=[((CAST((log_info >get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME >_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 >_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 >_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 >_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info >get_json_object2 _UTF-16LE'status') SEARCH >Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info >get_json_object2 _UTF-16LE'data.itemType') SEARCH >Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), >(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info >get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 >characters length limit and was truncated. >2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup >[] - The operator name >SourceConversion(table=[default_catalog.default_database.wkb_crm_order], >fields=[log_info, proctime]) exceeded the 80 characters length limit and was >truncated. >2021-05-14 22:02:44,879 ERROR >org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - >Caught unexpected exception. >java.io.IOException: Could not find class >'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' > in classpath. >at >org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] >at >org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation
Flink upgraded from 1.10.0 to 1.12.0
When I upgraded from Flink1.10.0 to Flink1.12.0. Unable to restore SavePoint And prompt the following error 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
Re:Re:Re:flink sql写mysql中文乱码问题
那你就要看一下你数据库表的每个字段的编码格式是什么?有没有调整编码格式?我这边是可以的 在 2021-05-18 18:19:31,"casel.chen" 写道: >我的URL连接串已经使用了 useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码 > > > > > > > > > > > > > > > > > >在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道: >>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE >> TABLE jdbc_sink(id INT COMMENT '订单id',goods_name >>VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格', >> user_name VARCHAR(64) COMMENT '用户名称') WITH ( 'connector' = >>'jdbc', 'url' = >>'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8', >> 'username' = 'mysqluser', 'password' = 'mysqluser', >>'table-name' = 'jdbc_sink') >>在 2021-05-18 11:55:46,"casel.chen" 写道: >>>我的flink sql作业如下 >>> >>> >>>SELECT >>>product_name, >>>window_start, >>>window_end, >>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >>>CAST(COUNT(order_no)ASBIGINT) trans_cnt, >>>-- LOCALTIMESTAMP AS insert_time, >>>'微支付事业部'AS bus_name >>>FROM( >>> >>> >>>mysql sink表的定义如下 >>>CREATE TABLE XXX ( >>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; >>> >>> >>>运行起来后写入mysql表的数据带有中文乱码 ?? >>> >>> >>> >>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >>>2021-05-17 18:02:25,010 INFO >>>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task >>>GroupAggregate(groupBy=[product_name, window_start, window_end], >>>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS >>>$f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS >>>product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd >>>HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT >>>_UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, >>>CAST($f4) AS trans_cnt, CAST(()) AS insert_time, >>>_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" >>>AS bus_name]) -> Sink: >>>Sink(table=[default_catalog.default_database.all_trans_5m_new], >>>fields=[product_name, window_start, window_end, trans_amt, trans_cnt, >>>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy >>>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - >>>GroupAggregate(groupBy=[product_name, window_start, window_end, id, >>>data_type, mer_cust_id, order_no, trans_date], select=[product_name, >>>window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, >>>MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, >>>window_start, window_end, trans_amt, order_no]) (1/1)#0 >>>(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Flink upgraded to version 1.12.0 and started from SavePoint to report an error
我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下: 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at
Re:flink sql写mysql中文乱码问题
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE TABLE jdbc_sink(id INT COMMENT '订单id',goods_name VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格', user_name VARCHAR(64) COMMENT '用户名称') WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8', 'username' = 'mysqluser', 'password' = 'mysqluser', 'table-name' = 'jdbc_sink') 在 2021-05-18 11:55:46,"casel.chen" 写道: >我的flink sql作业如下 > > >SELECT >product_name, >window_start, >window_end, >CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt, >CAST(COUNT(order_no)ASBIGINT) trans_cnt, >-- LOCALTIMESTAMP AS insert_time, >'微支付事业部'AS bus_name >FROM( > > >mysql sink表的定义如下 >CREATE TABLE XXX ( >) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4; > > >运行起来后写入mysql表的数据带有中文乱码 ?? > > > >查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么? >2021-05-17 18:02:25,010 INFO >org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task >GroupAggregate(groupBy=[product_name, window_start, window_end], >select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, >COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS >product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') >AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') >AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS >insert_time, _UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET >"UTF-16LE" AS bus_name]) -> Sink: >Sink(table=[default_catalog.default_database.all_trans_5m_new], >fields=[product_name, window_start, window_end, trans_amt, trans_cnt, >insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy >into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0. >2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - >GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, >mer_cust_id, order_no, trans_date], select=[product_name, window_start, >window_end, id, data_type, mer_cust_id, order_no, trans_date, >MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, >window_start, window_end, trans_amt, order_no]) (1/1)#0 >(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.
Flink upgraded to version 1.12.0 and started from SavePoint to report an error
When I upgraded from Flink1.10.0 to Flink1.12.0. Unable to restore SavePoint And prompt the following error 2021-05-14 22:02:44,716 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Calc(select=[((CAST((log_info get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME _UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 _UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 _UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 _UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 _UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 _UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info get_json_object2 _UTF-16LE'status') SEARCH Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info get_json_object2 _UTF-16LE'data.itemType') SEARCH Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), (_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,752 WARN org.apache.flink.metrics.MetricGroup [] - The operator name SourceConversion(table=[default_catalog.default_database.wkb_crm_order], fields=[log_info, proctime]) exceeded the 80 characters length limit and was truncated. 2021-05-14 22:02:44,879 ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - Caught unexpected exception. java.io.IOException: Could not find class 'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot' in classpath. at org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ~[flink-dist_2.12-1.12.0.jar:1.12.0] at