HOP窗口较短导致checkpoint失败

2021-09-17 文章 xiaohui zhang
FLink:1.12.1 源: kafka create table dev_log ( devid, ip, op_ts ) with ( connector = kafka ) sink: Hbase connect 2.2 目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。 执行SQL如下 insert into h_table select devid as rowkey row(hop_end, ip_cnt) from ( select devid,

flink sql是否支持动态创建sink table?

2021-09-17 文章 casel.chen
上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink sql支持动态创建sink table吗?

Flink SQL是否支持Count Window函数?

2021-09-17 文章 casel.chen
今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time window,问一下官方是否打算sql支持count window呢? 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!

Re:Flink on native k8s如何自定义挂载盘?

2021-09-17 文章 东东
升级到1.13用pod template吧,这之前的版本没有官方支持的方式 在 2021-09-17 16:43:53,"casel.chen" 写道: >为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数 >-Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError >-XX:HeapDumpPath=/var/log/oom.bin" >想在OOM发生的时候能生成HeapDumpFile,以便事后分析。 >但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。

Flink on native k8s如何自定义挂载盘?

2021-09-17 文章 casel.chen
为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数 -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/oom.bin" 想在OOM发生的时候能生成HeapDumpFile,以便事后分析。 但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。 请问Flink on native k8s要如何自定义挂载盘呢?使用的Flink版本是1.12.5

Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-17 文章 casel.chen
redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢? ps: behair库已经很久没更新了,对应的flink版本太低。

flink cdc data stream api sourceRecord解析

2021-09-17 文章 Fisher Xiang
Hi, RT,对于data stream 消费binlog得到的sourceRecord,是一些string类型的struct类型数据, 请问官方有什么好的办法去解析这些string类型的struct数据吗?使用反射? 目标是解析成Java对象。 附上数据: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.01, pos=720, row=1, snapshot=true}}