Re:Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 Thread
Hi
  非常感谢您的回复,state-processor-api我也尝试过,SQL生成的job 
graph没办法获取到每个算子的UID,所以state-processor-api也无法获取原来的state信息,没办法操作state,如果有更好的解决方案麻烦再回复一下邮件哈


感谢



















在 2021-05-20 10:46:22,"Yun Tang"  写道:
>Hi
>
>BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 
>state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 
>的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 
>RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data 
>stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。
>
>祝好
>唐云
>
>From: 王炳焱 <15307491...@163.com>
>Sent: Tuesday, May 18, 2021 20:02
>To: user-zh@flink.apache.org 
>Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report 
>an error
>
>我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:
>
>
>2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup 
>[] - The operator name Calc(select=[((CAST((log_info 
>get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
>_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
>_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
>_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
>_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
>get_json_object2 _UTF-16LE'status') SEARCH 
>Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
>get_json_object2 _UTF-16LE'data.itemType') SEARCH 
>Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
>(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
>(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
>get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
>characters length limit and was truncated.
>2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup 
>[] - The operator name 
>SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
>fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
>truncated.
>2021-05-14 22:02:44,879 ERROR 
>org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
>Caught unexpected exception.
>java.io.IOException: Could not find class 
>'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
> in classpath.
>at 
>org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>at 
>org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation

Flink upgraded from 1.10.0 to 1.12.0

2021-05-18 Thread
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  
And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 

Re:Re:Re:flink sql写mysql中文乱码问题

2021-05-18 Thread
那你就要看一下你数据库表的每个字段的编码格式是什么?有没有调整编码格式?我这边是可以的
在 2021-05-18 18:19:31,"casel.chen"  写道:
>我的URL连接串已经使用了  useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
>>你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE
>> TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
>>VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格', 
>>   user_name VARCHAR(64) COMMENT '用户名称') WITH (   'connector' = 
>>'jdbc',   'url' = 
>>'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8',
>>   'username' = 'mysqluser',   'password' = 'mysqluser',   
>>'table-name' = 'jdbc_sink')
>>在 2021-05-18 11:55:46,"casel.chen"  写道:
>>>我的flink sql作业如下
>>>
>>>
>>>SELECT
>>>product_name,
>>>window_start,
>>>window_end,
>>>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>>>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>>>-- LOCALTIMESTAMP AS insert_time,
>>>'微支付事业部'AS bus_name
>>>FROM(
>>>
>>>
>>>mysql sink表的定义如下
>>>CREATE TABLE XXX (
>>>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>>>
>>>
>>>运行起来后写入mysql表的数据带有中文乱码 ??
>>>
>>>
>>>
>>>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>>>2021-05-17 18:02:25,010 INFO 
>>>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
>>>GroupAggregate(groupBy=[product_name, window_start, window_end], 
>>>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS 
>>>$f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS 
>>>product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd 
>>>HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT 
>>>_UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, 
>>>CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
>>>_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
>>>AS bus_name]) -> Sink: 
>>>Sink(table=[default_catalog.default_database.all_trans_5m_new], 
>>>fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
>>>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy 
>>>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>>>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>>>GroupAggregate(groupBy=[product_name, window_start, window_end, id, 
>>>data_type, mer_cust_id, order_no, trans_date], select=[product_name, 
>>>window_start, window_end, id, data_type, mer_cust_id, order_no, trans_date, 
>>>MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
>>>window_start, window_end, trans_amt, order_no]) (1/1)#0 
>>>(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.


Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-18 Thread
我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 

Re:flink sql写mysql中文乱码问题

2021-05-18 Thread
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE
 TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格',   
 user_name VARCHAR(64) COMMENT '用户名称') WITH (   'connector' = 
'jdbc',   'url' = 
'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8',
   'username' = 'mysqluser',   'password' = 'mysqluser',   
'table-name' = 'jdbc_sink')
在 2021-05-18 11:55:46,"casel.chen"  写道:
>我的flink sql作业如下
>
>
>SELECT
>product_name,
>window_start,
>window_end,
>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>-- LOCALTIMESTAMP AS insert_time,
>'微支付事业部'AS bus_name
>FROM(
>
>
>mysql sink表的定义如下
>CREATE TABLE XXX (
>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>
>
>运行起来后写入mysql表的数据带有中文乱码 ??
>
>
>
>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>2021-05-17 18:02:25,010 INFO 
>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
>GroupAggregate(groupBy=[product_name, window_start, window_end], 
>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS $f3, 
>COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) AS 
>product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') 
>AS window_start, (CAST(window_end) DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') 
>AS window_end, CAST($f3) AS trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS 
>insert_time, _UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET 
>"UTF-16LE" AS bus_name]) -> Sink: 
>Sink(table=[default_catalog.default_database.all_trans_5m_new], 
>fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy 
>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
>GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, 
>mer_cust_id, order_no, trans_date], select=[product_name, window_start, 
>window_end, id, data_type, mer_cust_id, order_no, trans_date, 
>MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
>window_start, window_end, trans_amt, order_no]) (1/1)#0 
>(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.


Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-17 Thread
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  
And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at