?????? Flink ????????join

2021-06-07 Thread Jason Lee


??Flink
 SQL Join


??
| |
JasonLee
|
|
jasonlee1...@163.com
|
??


??2021??02??25?? 14:40??Suhan ??
benchao??joinrocketmqflink??kafka
 + rocket mq
??flink??
 




--  --
??: 
   "user-zh"



Flink 日志和输出中文乱码

2021-06-09 Thread Jason Lee
大家好
我们在Flink数据写入Kafka时出现中文乱码问题,之后为了只管确定数据将Sink 
换成Print打印出来,结果发现还是乱码。然后查看TM的日志,发现日志里面的中文也是乱码,而且显示执行的是cast(_UTF-16LE0),这一点也很不解,如下:
"promotions":[{"amount":1000,"stage":1,"scope":0,"itemIndex":[1],"name":"å\u008D¡é¡¹æ\u009D\u0083ç\u009B\u008Aæ\u008Aµæ\u0089£","type":1,"value":1}],"deleted":0,"closeStateDesc":"æ\u009Cªå\u0085³é\u0097­","refundAbleDesc":"","createTime":1623206122000,"reserveOrderId":"","paid":true,"wipeOutAmount":0,"payChannel":13}':VARCHAR(2147483647)
 CHARACTERSET"UTF-16LE") AS dubbo_data2, CAST(_UTF-16LE'æ— 
需支付':VARCHAR(2147483647) CHARACTERSET"UTF-16LE") AS value_1], 
where=[(behavior = _UTF-16LE'CREATED':VARCHAR(2147483647) 
CHARACTERSET"UTF-16LE")]) -> SinkConversionToTuple2 -> Map -> Sink: Unnamed 
(1/1).




我们采用多种方式解决:
(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通


上述方式都行不通,而且都是乱码,请问社区伙伴有遇到过类似问题,有好的解决方案吗?




| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



回复:flink sql写mysql中文乱码问题

2021-06-09 Thread Jason Lee


同遇到这个问题,看TM的日志的执行Vertic可以看到Cast(_UTF-16LE),然后我们是想往Kafka写入数据,结果写入乱码。


然后想过通过(1)在集群机器的flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”
(2) 通过DECODE()ENCODE()将字符串编码解码成UTF-8或者GBK,都行不通


上述两种方式对乱码数据处理吗但是都是还会出现中文乱码。不知道你尝试过什么方法?有没有成功解决的?


| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


在2021年05月25日 23:31,casel.chen 写道:



我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8



上述例子的结果是:

```text == Abstract Syntax Tree == LogicalUnion(all=[true]) 
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) 
FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) 
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, 
word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) 
DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], 
fields=[count, word])

== Physical Execution Plan == Stage 1 : Data Source content : collect elements 
with CollectionInputFormat

















在 2021-05-25 10:40:46,"casel.chen"  写道:
数据库字符编码设置如下


character_set_client,utf8mb4
character_set_connection,utf8mb4
character_set_database,utf8mb4
character_set_filesystem,binary
character_set_results,utf8mb4
character_set_server,utf8
character_set_system,utf8
character_sets_dir,/u01/mysql57_20200229/share/charsets/


客户端连接串是
jdbc:mysql://host:3306/datav_test?useUnicode=true&characterEncoding=utf8


本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!

















在 2021-05-19 17:52:01,"Michael Ran"  写道:



数据库的字段字符编码














在 2021-05-18 18:19:31,"casel.chen"  写道:
我的URL连接串已经使用了  useUnicode=true&characterEncoding=UTF-8 结果还是会有乱码

















在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8,像这样CREATE
 TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT '商品价格',   
 user_name VARCHAR(64) COMMENT '用户名称') WITH (   'connector' = 
'jdbc',   'url' = 
'jdbc:mysql://127.0.0.1:3306/database?useUnicode=true&characterEncoding=UTF-8',
   'username' = 'mysqluser',   'password' = 'mysqluser',   
'table-name' = 'jdbc_sink')
在 2021-05-18 11:55:46,"casel.chen"  写道:
我的flink sql作业如下


SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(


mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;


运行起来后写入mysql表的数据带有中文乱码 ??



查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
2021-05-17 18:02:25,010 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor 
[] - Received task GroupAggregate(groupBy=[product_name, window_start, 
window_end], select=[product_name, window_start, window_end, 
SUM_RETRACT(trans_amt) AS $f3, COUNT_RETRACT(order_no) AS $f4]) -> 
Calc(select=[CAST(product_name) AS product_name, (CAST(window_start) 
DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_start, (CAST(window_end) 
DATE_FORMAT _UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS 
trans_amt, CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS 
bus_name]) -> Sink: 
Sink(table=[default_catalog.default_database.all_trans_5m_new], 
fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy into 
slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] - 
GroupAggregate(groupBy=[product_name, window_start, window_end, id, data_type, 
mer_cust_id, order_no, trans_date], select=[product_name, window_start, 
window_end, id, data_type, mer_cust_id, order_no, trans_date, 
MAX_RETRACT(trans_amt) AS trans_amt]) -> Calc(select=[product_name, 
window_start, window_end, trans_amt, order_no]) (1/1)#0 
(ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to DEPLOYING.


回复: flinksql ttl不生效

2021-06-09 Thread Jason Lee


checkpoint文件大小不断增加的原因是由于任务的状态不断累积导致的;所以如果任务状态很大的情况下,比如Group by 
的字段过多等等,可以考虑开启增量state.backend.incremental,同时可以考虑任务的类型,如果任务是按天进行聚合指标的情况可以考虑设置状态过期清理时间idlestate.retention.time为一天等方式来防止chekcpoint保留状态数据的不断增加,或者增加速度过快导致任务的内存不够而被Kill掉;


但是看您的描述,并不是设置State 
TTL不生效,而是要考虑状态时间戳的更新方式,因为状态时间戳被更新存在两种模式StateTtlConfig.UpdateType.OnCreateAndWrite
 - 只在创建和写的时候更新(默认),StateTtlConfig.UpdateType.OnReadAndWrite - 
在读和写的时候更新,所以可以考虑您的任务情况采用哪种设定状态的更新模式;


同时过期数据的清理策略和您设定的checkpoint保留是增量、全量或者增量RocksDB保留的策略都有关了,您可以综合考虑自己的checkpoint保留策略和任务类型合理设定状态过期清理时间idlestate.retention.time
 和状态时间戳的更新方式以及任务的checkpoint的保留策略


Best ,
JasonLee
| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


在2021年06月10日 12:33,HunterXHunter<1356469...@qq.com> 写道:
建议关闭state.backend.incremental ,因为我遇到过,开启增量ckp导致ckp一直增大,关掉就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Checkpoint时内存不够TaskManager被Kill掉

2021-06-10 Thread Jason Lee
各位社区伙伴大家好
首先描述一下我的问题:最近我们发现有几个任务平时运行将近一个月都没发生问题,最近在进行checkpoint的时候经常失败,然后一直容错恢复重启,我们通过日志查看,发现某个TM在进行checkpoint的时候内存使用过大导致内存不够被Kill掉了;
报错日志:


java.lang.Exception: Container 
[pid=24859,containerID=container_e01_1618927404581_0181_01_002010] is running 
beyond physical memor
y limits. Current usage: 4.0 GB of 4 GB physical memory used; 6.9 GB of 8.4 GB 
virtual memory used. Killing container.


但是我们从他历史的checkpoint记录中可以看到它发生问题那次checkpoint时候并不是进行全量checkpoint的时候(很抱歉当时失败的时候忘记截图,给任务重启了,现在这幅图是任务重启之后的历史checkpoint截图,);


我们为了减少增量checkpoint的大小设置了State 
DDL,但是这种任务还是发生了TM内存不够被KIll掉的问题,我们现在的解决途径是增大内存,或者并发,但是我们现在任务已经配置了64个并发,每个TM内存给了4G,如果不断增加并发或者内存的话会对资源有很大的浪费,因为我们可以看到平时堆内存的使用率是有限的,所以一味增加会造成很大的资源浪费。






综上。想问一下社区的伙伴有没有遇到类似checkpoint时候某个TM内存不够被Kill的问题,除了不断增加并发内存外,有没有好的解决方案,谢谢大家。


Best,
Jason Lee


| |
Jason Lee
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



回复:检查点失败 Checkpoint Coordinator is suspending 。

2021-06-10 Thread Jason Lee
您好,
Checkpoint Coordinator is 
suspending是那些等待执行的checkpoint检查点因为任务发生异常在停止顶定时任务的stopCheckpointScheduler()方法中被释放掉,所以日志中这个异常了;
具体什么问题害得详细看下是什么原因导致任务异常,是脏数据未处理异常还是怎样的,可能需要具体查看一下TM和JM日志,如果您这边发现具体原因可以同步一下


Best,
JasonLee
| |
JaosnLee
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制


在2021年06月10日 15:39,yidan zhao 写道:
如题,Checkpoint Coordinator is suspending
这种检查点失败是什么情况,timeout那种我理解是检查点执行时间长,超时了。但是 Checkpoint Coordinator is
suspending 这个是什么含义呢?


Task Container 被Kill, Managed memory使用情况查看

2021-06-10 Thread Jason Lee


各位社区的伙伴大家好


目前我们在使用Flink SQL 开发任务过程中遇到一个问题,有比较大状态的任务在运行一段时间后Task 
Container会由于使用申请内存过多被Yarn集群Kill掉。


针对这个问题我们任务可能是在Checkpoint时候状态过大引起的,因此我们调整了State 
ttl,也是增量Checkpoint,之后还是会出现类似情况,我们只能通过增加并发和内存来保证任务运行,但是这回造成了很大的资源浪费,因为平时查看任务的堆内存使用并不多,所以我们在考虑是不是Managed
 memory不足导致的,因为Managed memory 负责RocksDB, 我们想确定一下是不是Managed memory不足导致的任务异常。


但是现在通过Flink Web UI界面查看不到Managed memory的使用情况,所以请教一下社区小伙伴有没有好的方式查看Managed 
memory的使用情况,或者有没有遇到类Tm container 被kill的情况有没有好的解决方法,感谢大家,希望一起交流


Best,
Jason
| |
Jason Lee1781
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



Flink1.10 SQL支持消费Kafka多个Topic吗

2021-06-12 Thread Jason Lee
Hi,各位社区伙伴


我这里有一个问题想请教大家,Flink 1.10版本中可以写Flink SQL任务建表的时候指定多个Kafka的Topic吗?我发现Flink 
1.12版本中Flink 
SQL任务可以通过用’topic’='topic-1;topic-2’这种方式消费多个Topic数据,但是想问一下大家知道在Flink 
1.10版本中支持吗?可以通过正则匹配的方式实现消费多个Topic吗?


当然可以换一种方式比如建多个表每个表一个Topic饭后union,但是这种方式比较繁琐。


Best,
JasonLee1781
| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



Flink SQL向下兼容吗?

2021-08-09 Thread Jason Lee
各位大佬好,


请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗?
比如我升级到1.13,那我1.10的SQL语法能被兼容吗?


感恩


| |
Chuang Li
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制