Re:Re:Re:Re:flink sql connector options如何支持Map数据类型?
需求是要支持任何http header透传,不管是标准http header还是用户自定义http header 在 2022-12-28 09:11:25,"RS" 写道: >Hi, >这个看你的需求啊,用户想自定义哪些Header,怎么定义? > > >比如用户想在Header中添加上报的时间戳,那么这种是随时间变化的,就无法在options里面定义了 >比如用户想在Header中添加上报数据的元信息,数据大小,数据字段个数等,那么这个也是和数据强相关的,无法在options里面定义 > > >所以要看用户想要什么,你们想给用户开放到哪个程度? >至于是不是可以像flink sql kafka connector定义 `properties.*` >,这个是具体实现的方式,现在都不清楚你要做什么,先确定目标,再考虑实现。 > > >Thanks > >在 2022-12-27 13:24:38,"casel.chen" 写道: >> >> >>遇到用户添加自定义请求头Headers问题 >> >>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述 >>>> 是不是可以像flink sql kafka connector定义 `properties.*` 那样定义 `headers.*` 呢? >>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了 >>>> 是说有一些公用headers吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的 >>>> properties.group.id 和 properties.bootstrap.servers >> >>在 2022-12-26 11:12:57,"RS" 写道: >>>Hi, >>> >>> >>>> 遇到用户添加自定义请求头Headers问题 >>>如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述 >>>如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了 >>> >>> >>>> 如何在connector options中支持Map数据类型呢? >>>options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map >>> >>> >>> >>> >>>Thanks >>> >>>在 2022-12-17 10:20:29,"casel.chen" 写道: >>>>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector >>>>options中支持Map数据类型呢?
Re:Re:flink sql connector options如何支持Map数据类型?
遇到用户添加自定义请求头Headers问题 如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述 >> 是不是可以像flink sql kafka connector定义 `properties.*` 那样定义 `headers.*` 呢? 如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了 >> 是说有一些公用headers吗?例如 Content-Type 之类的,对应的flink sql kafka connector中的 >> properties.group.id 和 properties.bootstrap.servers 在 2022-12-26 11:12:57,"RS" 写道: >Hi, > > >> 遇到用户添加自定义请求头Headers问题 >如果自定义Headers是和内容相关,那么就和connector options无关了,应该是需要用表的字段来描述 >如果自定义Headers是定义table的时候确定的,那就是定义connector options的问题了 > > >> 如何在connector options中支持Map数据类型呢? >options里面都是k=v的结构,v都是字符串,所以你要考虑的是如何用 字符串 描述 Map > > > > >Thanks > >在 2022-12-17 10:20:29,"casel.chen" 写道: >>我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector >>options中支持Map数据类型呢?
Re:Re: flink sql connector options如何支持Map数据类型?
看过了,不支持http source table,而且即使http lookup table也不支持map数据类型 在 2022-12-19 14:51:42,"Weihua Hu" 写道: >Hi, 你可以尝试使用独立开源的 http connector > >https://github.com/getindata/flink-http-connector > >Best, >Weihua > > >On Sat, Dec 17, 2022 at 10:21 AM casel.chen wrote: > >> 我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector >> options中支持Map数据类型呢?
flink sql connector options如何支持Map数据类型?
我想开发一个Flink SQL Http Connector,遇到用户添加自定义请求头Headers问题,如何在connector options中支持Map数据类型呢?
flink 1.16 lookup join重试策略问题
我们有场景会发生维表数据后于事实流表数据到达,使用flink 1.16 lookup join重试策略时,如果超过重试次数还没关联上会发生什么?待关联字段值都为null么?
Re:Re: 如何扩展flink sql以实现延迟调用?
interval join的缺点是只能输出关联上的结果,却无法输出未能关联上的结果(后续我需要对未关联上的结果进行特殊处理) 在 2022-12-07 13:33:50,"Lincoln Lee" 写道: >双流 join 的场景下可以考虑关联条件引入时间属性,变成 interval join 可以实现一定的时间对齐( >https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/joins/#interval-joins >) >另外如果可以使用 lookup join 单边驱动关联并且不是所有数据都需要等待的话,可以尝试 lookup join 的延迟重试 >https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup > >Best, >Lincoln Lee > > >casel.chen 于2022年12月7日周三 11:52写道: > >> 有人能够解答一下吗? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2022-11-26 11:20:34,"casel.chen" 写道: >> >双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink >> sql实现?如果当前不支持,需要怎样扩展flink sql呢? >>
Re:Re: 如何扩展flink sql以实现延迟调用?
谢谢你给的建议,不过我们还没有升级到flink 1.16,目前在使用的是flink 1.15。 如果要使用flink sql来实现的话,是不是可以利用窗口去重来达到数据延迟关联的效果? 在每条数据到达后开一个10分钟累加窗口(step和size均为10分钟)根据key去重,在等待窗口结束之时输出的去重结果再跟维表进行lookup join 在 2022-12-07 13:33:50,"Lincoln Lee" 写道: >双流 join 的场景下可以考虑关联条件引入时间属性,变成 interval join 可以实现一定的时间对齐( >https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/joins/#interval-joins >) >另外如果可以使用 lookup join 单边驱动关联并且不是所有数据都需要等待的话,可以尝试 lookup join 的延迟重试 >https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup > >Best, >Lincoln Lee > > >casel.chen 于2022年12月7日周三 11:52写道: > >> 有人能够解答一下吗? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2022-11-26 11:20:34,"casel.chen" 写道: >> >双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink >> sql实现?如果当前不支持,需要怎样扩展flink sql呢? >>
flink on native k8s模式调度能否根据节点磁盘和网络io指标进行调度?
flink on native k8s模式调度能否根据节点磁盘和网络io指标进行调度? 貌似现在只能根据cpu/内存剩余量进行调度,但如果新加一个节点会导致新作业全部的pod都部署到该节点上,造成该节点网络或磁盘IO飙升,这种情况有什么好的对策么?
Re:如何扩展flink sql以实现延迟调用?
有人能够解答一下吗? 在 2022-11-26 11:20:34,"casel.chen" 写道: >双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink >sql实现?如果当前不支持,需要怎样扩展flink sql呢?
flink sql是否支持延迟lookup join?
维表流数据晚于主表流数据到达甚至可能到达不了,所以想设置个5分钟等待窗口,关联上正常处理,关联不上发到另一个kafka topic,这种场景使用flink sql要如何实现?
Re:flink on k8s节点网络io飙高问题如何解决?
flink on native kubernetes如何使用 affinity 配置软互斥?即同一个作业的不同pod分布在不同的节点node上 在 2022-12-05 19:51:02,"casel.chen" 写道: >我司flink作业运行在k8s集群上,日前发现有一些k8s集群节点的网络io在某些时间段超过了告警阈值180MB/s,最多达到430MB/s,最少的只有4MB/s,导致新作业无法部署到网络负载高的节点上,哪怕cpu和内存还有很多剩余。 >目前我想的办法是利用节点亲和性手动从负载高的节点上迁移出那些耗网络io高的作业pod到负载低的节点,但是过一段时间又会出现类似的问题,请问: >1. 有什么办法可以彻底消除这种网络负载不均衡问题么? >2. k8s能否根据pod网络io负载进行合理调度吗?
flink on k8s节点网络io飙高问题如何解决?
我司flink作业运行在k8s集群上,日前发现有一些k8s集群节点的网络io在某些时间段超过了告警阈值180MB/s,最多达到430MB/s,最少的只有4MB/s,导致新作业无法部署到网络负载高的节点上,哪怕cpu和内存还有很多剩余。 目前我想的办法是利用节点亲和性手动从负载高的节点上迁移出那些耗网络io高的作业pod到负载低的节点,但是过一段时间又会出现类似的问题,请问: 1. 有什么办法可以彻底消除这种网络负载不均衡问题么? 2. k8s能否根据pod网络io负载进行合理调度吗?
Re:回复:flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?
好吧,难怪我没找到设置开始消费位置的参数,谢谢! 在 2022-12-05 18:34:49,"JasonLee" <17610775...@163.com> 写道: >hi > > >Upsert-kafka 不支持指定消费者位置,默认是从 earliest 位置开始消费的,你可以自己修改代码支持 scan.startup.mode 参数。 > > >Best >JasonLee > > > 回复的原邮件 >| 发件人 | casel.chen | >| 发送日期 | 2022年12月5日 18:24 | >| 收件人 | user-zh@flink.apache.org | >| 主题 | flink sql消费upsert-kafka源表如何指定从哪个位点开始消费? | >flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?仿照kafka source表添加了 scan.startup.mode >参数会报非法参数
flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?
flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?仿照kafka source表添加了 scan.startup.mode 参数会报非法参数
Re:回复: Re: 怎样从flink执行计划json生成StreamGraph?
sql作业最终也会转换成stream api生成jobgraph,因此同样可以支持修改每个算子并行度 在 2022-11-30 11:24:50,"仙路尽头谁为峰" 写道: >Sql作业好像不支持修改每个算子并行度吧,修改并行度需要从头开始重新生成JobGraph提交作业。 >Json主要是贴到Plan Visualizer 开发和调试用。 >https://flink.apache.org/visualizer/ >从 Windows 版邮件发送 > >发件人: yidan zhao >发送时间: 2022年11月30日 10:12 >收件人: user-zh@flink.apache.org >主题: Re: Re: 怎样从flink执行计划json生成StreamGraph? > >好吧,sql我具体不了解,我用的stream api比较多,我了解是stream >api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。 > >casel.chen 于2022年11月30日周三 00:16写道: >> >> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2022-11-29 10:07:40,"yidan zhao" 写道: >> >并不需要从执行计划json生成streamGraph呀~ >> >streamGraph提交之前直接转jobGraph。 >> > >> >casel.chen 于2022年11月28日周一 08:53写道: >> >> >> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教 >
Re:Re: flink sql作业无缝升级问题
拿kafka source作业为例,新老作业使用相同的consumer group,老作业先做savepoint,完了在老作业还在运行的同时启动新作业从刚才的savepoint恢复会有问题么? 如何设置一个流量开关让新作业“准备”好再打开流量呢?有没有具体实操的例子?还是说需要自己修改flink源码,具体要看哪一个类方法? 在 2022-11-30 20:08:44,"Jiangang Liu" 写道: >Flink目前无法做到无缝升级,需要走stop-with-savepoint、start >job的流程,但是在这之间可以做一些优化来缩短恢复时间。比如,把新作业先启动起来,申请好资源,同时停掉老作业,将做好的savepoint用来触发新作业的执行。 > >casel.chen 于2022年11月29日周二 08:38写道: > >> 线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb >> 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢? >> 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka >> group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?
Re:Re: 怎样从flink执行计划json生成StreamGraph?
如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗? 在 2022-11-29 10:07:40,"yidan zhao" 写道: >并不需要从执行计划json生成streamGraph呀~ >streamGraph提交之前直接转jobGraph。 > >casel.chen 于2022年11月28日周一 08:53写道: >> >> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
flink sql有办法获取到rowkind元数据字段吗?
flink sql有办法获取到rowkind元数据字段吗?比如按rowkind进行case when处理或者过滤
flink sql作业无缝升级问题
线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb 数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢? 常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?
flink sql接cdc数据源按最新数据统计问题
业务需求是mysql订单表按天按供应商实时统计交易金额,订单表会发生修改和删除,用flink sql要如何实现呢?开窗取最新一条记录再聚合吗?如果遇到delete记录会不会减去相应的price呢?试着写了如下flink sql不知道对不对 select s.biddate, s.supplier, sum(s.price) from ( select * from ( select biddate, supplier, price, ROW_NUMBER() OVER ( PARTITION BY biddate, supplier ORDER BY bidtime DESC ) as rownum from ( select bidtime, date_format(bidtime, '-MM-dd-HH') as biddate, supplier, price from orders ) ) as t where t.rownum = 1 ) as s group by s.biddate, s.supplier ;
怎样从flink执行计划json生成StreamGraph?
源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
如何扩展flink sql以实现延迟调用?
双流关联场景下,流A数据到达后并不马上和流B进行关联,而是经过设置一段时间后再关联,这种场景下如何用flink sql实现?如果当前不支持,需要怎样扩展flink sql呢?
如何使用flink sql优雅的处理大量嵌套if-else逻辑
我有一个flink sql作业需要根据不同字段值满足不同条件来设置另一个字段值,还有一些嵌套if-else逻辑,这块逻辑不是固定的,业务方会过一段时间调整一次。 想请问如何使用flink sql优雅的处理嵌套if-else逻辑呢?我有想到使用drools规则引擎,通过udf来调用,不知道还有没有更好的办法?
Re:Re: flink作业提交运行后如何监听作业状态发生变化?
可以使用 execution.job-listeners 参数吗?这个参数的用法是怎样的? 在 2022-11-24 10:41:35,"Yang Wang" 写道: >其实可以参考Flink Kubernetes >Operator里面的做法,设置execution.shutdown-on-application-finish参数为false >然后通过轮询Flink RestAPI拿到job的状态,job结束了再主动停掉Application cluster > >Best, >Yang > >JasonLee <17610775...@163.com> 于2022年11月24日周四 09:59写道: > >> Hi >> >> >> 可以通过 Flink 的 Metric 和 Yarn 的 Api 去获取任务的状态(任务提交到 yarn 的话) >> >> >> Best >> JasonLee >> >> >> 回复的原邮件 >> | 发件人 | casel.chen | >> | 发送日期 | 2022年11月23日 08:32 | >> | 收件人 | user-zh@flink.apache.org | >> | 主题 | flink作业提交运行后如何监听作业状态发生变化? | >> >> 请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?
Re:Re:flink作业提交运行后如何监听作业状态发生变化?
如果作业挂了就不会发metrics了吧,控台感知会滞后,有没有更及时感知的办法呢? 在 2022-11-23 09:28:59,"RS" 写道: >Hi, > > >Flink的Metric了解下,里面应该有作业的状态 >https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#reporter > > >配置不同的Metric方式,有的是拉取,有的是推送的机制, >https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/ > > > >Thanks > > > > >在 2022-11-23 08:32:11,"casel.chen" 写道: >>请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?
Re:Re: 如何扩展flink sql以支持CTAS/CDAS语句?
是的,类似阿里云和腾讯云上面的功能 在 2022-11-23 10:02:09,"Shengkai Fang" 写道: >想问一下你想实现的功能是咋样的呢?是阿里云上的那种吗? > >Best, >Shengkai > >casel.chen 于2022年11月23日周三 08:29写道: > >> flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink >> sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!
flink作业提交运行后如何监听作业状态发生变化?
请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?
如何扩展flink sql以支持CTAS/CDAS语句?
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!
如何扩展flink sql以支持CTAS/CDAS语句?
flink sql社区版貌似还不支持CTAS/CDAS语句,请问要如何扩展flink sql以支持CTAS/CDAS语句?可以给个思路或代码示例吗?谢谢!
如何正确扩展jdbc connector以支持更多的数据库方言?
如何正确扩展jdbc connector以支持更多的数据库方言?我们目前的做法是拉下flink源码直接进行修改添加方言支持,有没有更优雅的方式来实现呢?
Re:Re: Flink CDC2.2.1 设置server id范围
如果一张表被多个不同flink cdc作业消费的话岂不是要记住之前分配过哪些serverId? 如果作业包含了重复的serverId会造成消费出错吧? 如果flink sql作业不指定serverId的话,随机分配也有可能产生重复serverId吗? 这类serverId信息是不是在服务端维护更合理? 在 2022-10-31 17:18:41,"林影" 写道: >ok, thx! > >Leonard Xu 于2022年10月31日周一 17:01写道: > >> >> > 2022年10月31日 下午4:57,林影 写道: >> > >> > Hi, Leonard. >> > >> > 我也有类似的疑惑。 >> > >> > 有个线上的Flink Application之前配置的serverid 是 >> > 6416-6418,并行度之前是3,后来缩容的时候并行度改成2了,在这种场景下serverid的范围需要进行调整吗? >> >> 缩容并不需要的,你的case里只会用6416 和 6417这两个id,只有扩容需要考虑,并且扩容时如果没有夸大范围,目前是会报错提示的。 >> >> 祝好, >> Leonard >> >> >> >> >> > >> > casel.chen 于2022年10月31日周一 16:50写道: >> > >> >> >> >> >> >> >> >> >> >> >> server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度? >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2022-10-31 16:04:32,"Leonard Xu" 写道: >> >>> Hi, >> >>> >> >> >> >>> >> 你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’. >> >>> 另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。 >> >>> >> >>> >> >>> Best, >> >>> Leonard >> >>> >> >>> >> >>>> 2022年10月31日 下午4:00,Fei Han 写道: >> >>>> >> >>>> 大家好! >> >>>> 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC >> >> 打宽表。但是在任务跑一段时间后,还是出现如下报错: >> >>>> Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A >> >> slave with the same server_uuid/server_id as this slave has connected to >> >> the master; >> >>>> 请教下各位,还有什么解决方案没有 >> >>> >> >> >> >>
flink sql作业动态设置告警规则问题
配置了prometheus收集flink sql作业指标,现在想根据这些指标动态设置一些告警规则,请问要如何实现? 查了下prometheus告警需要配置alert rule之后重启才生效,有没有办法不重启呢?常规实现方案是什么?
Remote system has been silent for too long. (more than 48.0 hours)
今天线上 Flink 1.13.2 作业遇到如下报错,请问是何原因,要如何解决? 作业内容是从kafka topic消费canal json数据写到另一个mysql库表 2022-09-17 19:40:03,088 ERROR akka.remote.Remoting [] - Association to [akka.tcp://flink-metrics@172.19.193.15:34101] with UID [-633015504] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:387) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2] 2022-09-25 17:17:21,581 ERROR akka.remote.Remoting [] - Association to [akka.tcp://flink-metrics@172.19.193.15:38805] with UID [1496738655] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:387) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive(Actor.scala:517) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive$(Actor.scala:515) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ~[flink-dist_2.12-1.13.2.jar:1.13.2]
flink sql client取消sql-clients-default.yaml后那些预置catalogs建议在哪里定义呢?
flink新版本已经找不到sql-clients-default.yaml文件了,那么之前配置的那些预置catalogs建议在哪里定义呢?通过初始化sql么?
Re:Re: Flink CDC2.2.1 设置server id范围
server-id配置范围对于后面修改并发度是不是不太友好?每改一次并发度就得重新调整server-id范围么?还是说先配置一个较大的server-id范围,在在这个较大的范围内调整并发度? 在 2022-10-31 16:04:32,"Leonard Xu" 写道: >Hi, > >你5张表对应的source并发是多少呀?如果是多并发需要把server-id设置成一个范围,范围和并发数匹配,比如4个并发,应该类似’1101-1104’. >另外 server-id 是全局唯一的,你需要确保下你使用的server-id 和其他作业、其他同步工具都不冲突才可以。 > > >Best, >Leonard > > >> 2022年10月31日 下午4:00,Fei Han 写道: >> >> 大家好! >> 现在我在 Flink CDC2.2.1设置了server id。有5张表且server id的范围都不同,通过Flink CDC >> 打宽表。但是在任务跑一段时间后,还是出现如下报错: >> Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave >> with the same server_uuid/server_id as this slave has connected to the >> master; >> 请教下各位,还有什么解决方案没有 >
flink web ui cancel job时能否指定要不要生成savepoint?
flink web ui cancel job时能否指定要不要生成savepoint? 目前是直接cancel job without savepoint的。
flink cdc什么时候支持flink 1.15.x?
当前flinlk cdc master分支的snapshot版本最高支持到flink 1.14.4,尝试使用flink 1.15.2编译会出错,请问flink cdc什么时候支持flink 1.15.x?
Re:Re: flink cdc能否同步DDL语句?
可以给一些hints吗?看哪些类? 在 2022-10-11 10:22:07,"yuxia" 写道: >用 datastream api,自己解析一下 DDL。 > >Best regards, >Yuxia > >- 原始邮件 - >发件人: "yh z" >收件人: "user-zh" >发送时间: 星期二, 2022年 10 月 11日 上午 10:23:43 >主题: Re: flink cdc能否同步DDL语句? > >目前,社区的 Flink CDC,是可以读取到 binlog 中的 DDL 语句的,但是不会传递给下游,即下游无法感知 schema 的变化。 > >Xuyang 于2022年10月10日周一 16:46写道: > >> Hi, 目前应该是不行的 >> 在 2022-09-26 23:27:05,"casel.chen" 写道: >> >flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate >> table等 >>
flink sql cdc 2.2.1消费mysql binlog异常
flink sql cdc 2.2.1消费mysql binlog遇到如下异常,有谁遇到过?发现作业自己做了重试后过去了,想知道异常的root cause是什么?手动重起了作业重新消费后还是会出现。 Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:72) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ... 1 more Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1665303438000, eventType=TABLE_MAP, serverId=1940348705, headerLength=19, dataLength=91, nextPosition=457067313, flags=0} at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ... 5 more Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1665303438000, eventType=TABLE_MAP, serverId=1940348705, headerLength=19, dataLength=91, nextPosition=457067313, flags=0} at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:281) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:228) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945) ... 3 more Caused by: java.io.EOFException at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:209) at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:51) at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.readMetadata(TableMapEventDataDeserializer.java:91) at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:42) at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:27) at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303) ... 7 more
如何实现flink作业失败实时通知告警?
当flink作业失败时如何第一时间发通知告警到相关方?现有方式 方式一:flink作业本身提供的rest api需要client不断去请求,不是实时不说还浪费资源,而且受网络抖动影响有时候还会超时获取不到,但不代表作业有问题。 方式二:通过作业暴露指标给promemtheus,因为prometheus是周期性(10s~20s) 来pull指标的,所以也达不到实时性要求。 flink作业能否在failure之前调用某个hook去通知相关方呢?如果要自己改的话,是要动哪个类呢?谢谢!
依赖flink cdc如何达到kafka connect with schema registry效果?
kafka connect with schema registry运行的时候会将表的schema信息注册到schema registry,同时消息以avro格式发到kafka topic 请问flink cdc要如何实现达到上述一样的效果?因为接下来我想依赖以下hudi博文提到的debezium入湖工具完成数据入湖 https://hudi.apache.org/blog/2022/01/14/change-data-capture-with-debezium-and-apache-hudi/
控制流方式能否改变作业ExecutionGraph?
我有一个数据同步场景是希望通过修改配置来实时动态修改数据同步的目标,例如使用flink cdc将mysql中的变更数据实时同步进kafka,如果后来业务又要求同一份数据再同步进mongodb的话,我是否可以通过修改同步配置来达到不停止原来作业来动态修改数据同步的目标(由一个变多个)?又或者是flink cdc整库同步mysql变更数据到kafka一个topic,后来业务又要求按表划分topic,这种能否同样通过修改配置来实现呢?
Re:Re: flink的消费速率是否可以调整
kafka consumer config里面有一些配置参数可以达到限速功能,例如 max.partition.fetch.bytes fetch.max.bytes max.poll.records 详情可以参考 https://kafka.apache.org/24/documentation.html#consumerconfigs 在 2022-09-26 23:27:30,"yidan zhao" 写道: >应该不行吧,kafka client本身就没有限速的功能。 > >Jason_H 于2022年9月26日周一 10:17写道: >> >> Hi,各位大佬: >> 我们在使用flink消费kafka的时候,是否可以在代码中自定义消费速率,来调整源端的消费能力。 >> >> >> | | >> Jason_H >> | >> | >> hyb_he...@163.com >> |
Re:Re: flink cdc + kafka场景下增加kafka分区数问题
是的,消息key是由 `库名+表名+主键值` 组成的 在 2022-09-26 23:29:18,"yidan zhao" 写道: >之前是如何实现的,通过 kafka 的record key? > >casel.chen 于2022年9月26日周一 23:21写道: >> >> flink cdc >> 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?
flink cdc能否同步DDL语句?
flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate table等
flink cdc同只步表的schema到下游kafka topic吗?
flink cdc同只步表的schema到下游kafka topic吗?类似于confluent kafka schema registry,在下游kafka新建一个_schema的topic,key是表名,value是avro格式的schema。如果可以的话要如何实现?
flink cdc + kafka场景下增加kafka分区数问题
flink cdc 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?
咨询多条flink cdc作业消费同一个库下不同表优化方案
目前业内针对多条flink cdc作业消费同一个库下不同表为了防止对数据库方产生很大查询压力,一般都是发到kafka,但这样的话下游作业只能获取到实时增量数据进行处理,如果下游作业需要获取全量数据处理的话,还得再回过头来使用cdc connector,但这样会产生上述副作用。我在想作业是否能够在获取到全量数据之后做一个checkpoint,接下来就可以改使用kafka connector? 续接的点是binlog offset,即cdc connector消费到的binlog offset要续接上kafka connector某个消息带的binlog offset。不知道这种想法是否可行?
flink cdc作业是否支持将湖表作为源表source?
多条flink cdc作业场景直接接mysql会对数据库造成很大压力,一种办法是flink cdc下游接kafka,但这种只适用于多个下游作业只需要消费增量数据情况,如果多个下游作业需要消费存量+增量的话是不是可以考虑使用hudi/iceberg这种湖表替代kafka,像普通mysql一样flink cdc在全量快照阶段先查询湖表已有数据,再在增量快照阶段依赖湖表支持streaming query能力获取到实时全量数据?
Re:Re: flink实时双流驱动join问题
我这里只是举了一个例子表示Flink用于OLAP实时关联场景会遇到的一个问题,实际业务中确实会出现两张关联表都需要更新情况,不管哪一边更新数据业务都想获取到最新关联结果,而不是旧的关联状态。引出我想问的另一个问题是如果查询模式固定,Flink实时关联是否能取代OLAP系统例如Doris呢? 1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月? 确切地说不应该设置ttl,业务数据有长尾效应,大多数都在当天更新完毕,短的几秒种,长的甚至会在半年后还发生更新 2. order流和user流在业务场景上要求的state ttl时长是不是不一样? 同上 3. order流和user流的数据规模/state size规模大概可以到什么级别? TB级别 在 2022-09-20 10:28:49,"Jinzhong Li" 写道: >hi,casel, 关于你们的业务场景,我有几个问题, 希望可以交流一下。 >1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月? >2. order流和user流在业务场景上要求的state ttl时长是不是不一样? >(从你描述上来看,user流的ttl需要几个月,order流可以比较短些?) >3. order流和user流的数据规模/state size规模大概可以到什么级别? > >casel.chen 于2022年9月17日周六 10:59写道: > >> 请教一个flink实现实时双流驱动join问题: >> >> >> order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键) >> user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键) >> 关联结果流字段:order_id, order_status, order_time, user_name, user_phone, >> user_address(order_id是主键) >> 期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct >> id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。 >> >> >> 请问这种场景下要如何使用flink实现实时双流驱动join?
flink hybrid source问题
我有一个flink实时计算场景是需要先从MaxCompute读取一张表的存量数据,再从相应的kafka topic读取增量数据,一并进行计算处理。 看了一下需要用到hybrid source,目前最新flink社区版提供了Kafka/Hive/File Source,其他数据源的source是需要自己开发吗?社区有没有一个贡献source的地方? 有没有介绍如何自定义基于新版source架构的source文章或博客呢?谢谢!
flink实时双流驱动join问题
请教一个flink实现实时双流驱动join问题: order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键) user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键) 关联结果流字段:order_id, order_status, order_time, user_name, user_phone, user_address(order_id是主键) 期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。 请问这种场景下要如何使用flink实现实时双流驱动join?
如何监控flink sql作业端到端延迟?
线上运行了多个flink sql作业,现在想监控端到端延迟。我配置了 metrics.latency.interval=3 metrics.latency.granularity=operator metrics.latency.history-size=128 参数,延迟指标已经发到了prometheus,看到该指标有50、75、95、98,99,999分位线,另外还有operator_id和operator_id_subtask_index,细到了算子子task级别。 1. 想知道怎样根据这些暴露指标统计出该flink sql作业的端到端延迟分位线?是需要把所有同一个job的同一个算子同一分位值取平均再把不同算子得到的值相加么? 2. 另外,我们大部分sql作业都是从kafka接入的,消息格式是canal json,想进一步统计canal json中的binlog发生时间与kafka消息metadata里的timestamp时间差 和 kafka消息metadata里的timestamp与flink开始处理该消息的时间差,请问有办法不修改flink源码获取吗? | flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="tb-bipusr-outcome-bank-record-binlog2mongo", component="taskmanager", host="172_19_193_104", instance="172.19.193.104:9249", job="kubernetes-pods", job_id="2ea0a87e69f0d485859a9108d595dd8d", job_name="tb_bipusr_outcome_bank_record_binlog2mongo", kubernetes_namespace="bfj", kubernetes_pod_name="tb-bipusr-outcome-bank-record-binlog2mongo-taskmanager-1-8", operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="2", quantile="0.95", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id="tb_bipusr_outcome_bank_record_binlog2mongo_taskmanager_1_8", type="flink-native-kubernetes"} | 11.943 | | flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="tb-bipusr-outcome-bank-record-binlog2mongo", component="taskmanager", host="172_19_193_104", instance="172.19.193.104:9249", job="kubernetes-pods", job_id="2ea0a87e69f0d485859a9108d595dd8d", job_name="tb_bipusr_outcome_bank_record_binlog2mongo", kubernetes_namespace="bfj", kubernetes_pod_name="tb-bipusr-outcome-bank-record-binlog2mongo-taskmanager-1-8", operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="2", quantile="0.98", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id="tb_bipusr_outcome_bank_record_binlog2mongo_taskmanager_1_8", type="flink-native-kubernetes"} | 21 | | flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency{app="tb-bipusr-outcome-bank-record-binlog2mongo", component="taskmanager", host="172_19_193_104", instance="172.19.193.104:9249", job="kubernetes-pods", job_id="2ea0a87e69f0d485859a9108d595dd8d", job_name="tb_bipusr_outcome_bank_record_binlog2mongo", kubernetes_namespace="bfj", kubernetes_pod_name="tb-bipusr-outcome-bank-record-binlog2mongo-taskmanager-1-8", operator_id="570f707193e0fe32f4d86d067aba243b", operator_subtask_index="2", quantile="0.99", source_id="cbc357ccb763df2852fee8c4fc7d55f2", tm_id="tb_bipusr_outcome_bank_record_binlog2mongo_taskmanager_1_8", type="flink-native-kubernetes"} |
flink作业生成保存点失败
有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的? +5 [2022-08-29 15:38:32] content: 2022-08-29 15:38:32,617 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Failed to transfer file from TaskExecutor sqrc-session-prod-taskmanager-1-30. +6 [2022-08-29 15:38:32] content: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. +7 [2022-08-29 15:38:32] content: at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064) ~[flink-dist_2.12-1.13.2.jar:1.13.2] +8 [2022-08-29 15:38:32] content: at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_312] +9 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_312] +10 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_312] +11 [2022-08-29 15:38:32] content: at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] +12 [2022-08-29 15:38:32] content: Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. +13 [2022-08-29 15:38:32] content: ... 5 more +14 [2022-08-29 15:38:32] content: 2022-08-29 15:38:32,617 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Unhandled exception. +15 [2022-08-29 15:38:32] content: org.apache.flink.util.FlinkException: The file STDOUT does not exist on the TaskExecutor. +16 [2022-08-29 15:38:32] content: at org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064) ~[flink-dist_2.12-1.13.2.jar:1.13.2] +17 [2022-08-29 15:38:32] content: at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ~[?:1.8.0_312] +18 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_312] +19 [2022-08-29 15:38:32] content: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_312] +20 [2022-08-29 15:38:32] content: at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
Re:Re:Flink Native Kubernetes Resources Requests and Limits
内存有相应的requests和limits参数吗?找了一圈没有发现,都设的是requests=limits,是为了作业高可靠保障么?如果要能够调整内存使用大小的话应该要如何实现呢?实际业务朝夕现象很鲜明,希望作业在低峰期能空出一些内存资源来 在 2022-08-05 10:42:48,"spoon_lz" 写道: > > >可以尝试使用个参数: >kubernetes.taskmanager.cpu.limit-factor > > >https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-taskmanager-cpu-limit-factor > > >On 08/5/2022 09:24,casel.chen wrote: >我通过flink native kubernetes部署flink >1.13.2作业到k8s上发现资源实际使用量远小于请求量,特别是CPU,启动的时候CPU消耗多一些,运行一段时间后CPU消耗显著降低,如果设置CPU值较小的话又会造成作业启动慢的问题,查了一下当前实现中将资源requests恒等于limits,请问该如何分别设置cpu和内存的requests和limits以提高资源使用效率呢?谢谢!
k8s环境下application模式flink作业HA的原理是什么?
k8s环境下开启作业HA后,如果JM挂了会重新拉起一个新的JM,想知道这个原理是什么?重启的作业会从上一个checkpoint位置重新消费吗?
Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
;,"consolePreAuthFlag":"123","consoleSubsidyFlag":"123","consoleDcType":"123","consoleAcctDivFlag":"123","actualPayChannel":"123","payChannel":"123","transType":"123","payType":"123","dcType":"123","isAcctDiv":"123","isDelayAcct":"123","creditType":"123","devsId":"123","ordAmt":123.32,"feeAmt":123,"actOrdAmt":123,"actualRefAmt":123,"refAmt":123,"refFeeAmt":123,"subsidyAmt":123,"subsidyRefAmt":123,"payCardId":"123","feeRecType":"123","feeFlag":"123","transStat":"S","transFinishTime":"123","tableName":"123","offset":"123","recordVersion":"123","sign":"123","synModifyTime":null,"merName":null,"bankName":null,"bankRespDesc":null,"bagentId":null,"bankId":null,"cashRespDesc":null,"reqDate":null,"accSplitBunch":null,"acctId":null,"fqFeeAmt":null,"payCardIdEnc":null,"goodsDesc":null,"remark":null,"synTtlDate":null,"outOrdId":null,"devType":null,"feeHuifuId":null,"feeAcctId":null,"orgTransDate":null,"orgOrdAmt":null,"orgCreateTime":null,"userType":null,"userId":null,"userIdExt":null,"settleAmt":null,"refCnt":null,"consoleCountSum":null,"topConsolePayType":null,"orgMerOrdId":null,"feeAllowanceFlag":null,"correctStat":null,"addedOrgFeeAmt":null,"discountFeeAmt":null,"acctFinishTime":null,"pospSeqId":null,"outOrderId":null,"cashTransId":null,"orgPayType":null,"orgPayChannel":null,"branch1HuifuId":null,"branch2HuifuId":null,"branch3HuifuId":null,"branch4HuifuId":null,"branch5HuifuId":null,"branchHuifuId":null,"level":null,"branchChannelId":null,"orgFeeAmt":null,"orgConsoleIsFq":null,"orgCreditType":null,"fqMerDiscountFlag":null,"payScene":null,"labels":null,"orgTransType":null,"orgFeeRecType":null,"orgFeeFlag":null,"orgDiscountFeeAmt":null,"merOperId":null,"operType":null,"batchId":null,"authNo":null,"refNum":null,"bankMerId":null,"bankMerName":null,"posMerId":null,"posMerName":null,"acqrInstId":null,"doubleExempt":null,"pnrDevId":null,"posTermId":null,"realPayType":null,"channelFinishTime":null,"transRefundBankId":null,"transRefundBankName":null,"orgRealPayType":null,"orgDevsId":null,"merPriv":null,"transRefundOutOrdId":null,"orgHfSeqId":null,"synMode":null,"cloudPay":null,"terminalReqDate":null,"terminalPayChannel":null,"huifuFstOrg":null,"huifuSecOrg":null,"huifuThdOrg":null,"huifuForOrg":null,"huifuSales":null,"partnerBd":null,"organizationId":null,"upperOrgId":null,"merOrg":null,"partnerInnerFstOrg":null,"partnerInnerSecOrg":null,"partnerInnerThdOrg":null,"partnerFstOrg":null,"partnerSecOrg":null,"partnerThdOrg":null,"collectMerFstOrg":null,"collectMerSecOrg":null,"collectMerThdOrg":null,"collectMerForOrg":null,"collectMerFivOrg":null,"collectMerSixOrg":null,"fullPath":null}],"type":"INSERT"} [3] https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format 在 2022-08-22 22:57:04,"Xuyang" 写道: >Hi, 请问你的需求是 “debezium数据”- flink -“canal ”么? >如果是这样的话,可以用UDF[1]来尝试下。[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/ >在 2022-08-21 10:49:29,"casel.chen" 写道: >>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka? >>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal >>json格式输出呢?有没有例子或关键代码展示?谢谢!
flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka? flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal json格式输出呢?有没有例子或关键代码展示?谢谢!
flink sql支持监听单个文件内容变化吗?
flink sql支持监听单个文件内容变化吗?文件中每一行是一条记录,对外输出的模式可以全量或者变量。
Re:Re: flink on k8s作业失败后如何自动释放资源?
native模式,发现作业失败后会自动重试几次,最后部署和pod消失 在 2022-08-14 16:55:48,"yu'an huang" 写道: >你的部署模式是native还是standalone,正常作业失败是会释放资源的,可以提供更多信息吗? > > > >> On 14 Aug 2022, at 9:55 AM, casel.chen wrote: >> >> flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源? >
flink on k8s作业支持弹性扩缩容吗?
flink on k8s作业能否在给定资源范围内自动根据上游流量大小实现弹性扩缩容?例如增加并发度和TaskManager数量等
flink on k8s作业失败后如何自动释放资源?
flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?
Flink Native Kubernetes Resources Requests and Limits
我通过flink native kubernetes部署flink 1.13.2作业到k8s上发现资源实际使用量远小于请求量,特别是CPU,启动的时候CPU消耗多一些,运行一段时间后CPU消耗显著降低,如果设置CPU值较小的话又会造成作业启动慢的问题,查了一下当前实现中将资源requests恒等于limits,请问该如何分别设置cpu和内存的requests和limits以提高资源使用效率呢?谢谢!
flink作业延迟监控
想实现flink sql作业延迟监控,例如flink sql作业将kafka数据写入mysql,记kafka记录中的事件时间为T0,发到kafka时间是T1,写入mysql的时间为T2,现要统计如下时间差(延迟) 1. T2 - T1 :flink sql作业延迟 2. T2 - T0 :端到端延迟,包括flink sql作业延迟和数据写入kafka延迟 请问: 1) 要如何暴露这2个时间差作为metrics? 2) 中间算子的处理时长能暴露吗?
如何实现flink作业失败告警功能
想实现flink作业一旦失败就立马告警功能,请问要如何实现?是否有Listener可以进行注册?
Flink消费kafka实时同步到MongoDB出现丢数据
mysql cdc -> kafka -> mongodb 写了一个flink 1.13.2作业从kafka消费mysql整库变更topic并实时同步写入mongodb,也开启了checkpoint,但实测下来发现从savepoint恢复和从groupOffsets恢复会造成数据丢失,请问这应该怎么排查?代码仓库地址:https://github.com/ChenShuai1981/mysql2mongodb.git 我的MongodbSink有实现CheckpointedFunction,并在snapshotState方法中会等待所有子线程完成写mongodb。 flink消费kafka处理数据后提交kafka offset的流程是怎样的?一开始消费kafka获取到pendingOffsets,如何确保这些pendingOffsets都处理完成然后全部提交呢?有没有这块源码解析资料?
flink kubernetes application模式下作业镜像问题
使用flink kubernetes application模式运行flink作业需要将作业打包进镜像,这对于有大量用户个性化作业场景使用不是很方便,需要维护很多作业镜像版本。有没有办法在执行时引用到镜像外部的作业jar包或python文件,例如HDFS或者阿里云OSS。或者有其他workaround办法,之前听过init-container,但不知道具体要怎么使用,还请赐教!谢谢!
Flink MySQL CDC 注册 schema registry 问题
Hi, 我想使用 Flink MySQL CDC Connector 以 DataStream 方式消费 MySQL Binlog 输出变更数据到下游kafka topic (1),同时监听database schema change事件,将最新的schema数据输出到下游另一个kafka topic (2),又或者直接注册schema到 confluent / apicurio schema registry,查了一下flink cdc官方文档[1],并没有这方面的信息。请问应该怎么实现呢?有相关文档或例子么?谢谢! [1] https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
flink cdc 时间格式和时区问题
我在使用flink cdc 2.2.0获取mysql数据变更, mysqlSource设置了 .serverTimeZone("Asia/Shanghai") 发现mysql timestamp 类型的数据在mysql workbench里显示的是 "2021-06-24 16:26:47",通过JsonDebeziumDeserializationSchema解析后得到的json string串是 "2021-06-24T08:26:47Z"。继而在通过org.apache.flink.formats.json.JsonToRowDataConverters转成RowData时解析timestamp失败 (private TimestampData convertToTimestamp(JsonNode jsonNode)方法),因为当前只支持SQL和ISO_8601两种TimestampFormat,我看到 org.apache.flink.formats.common.TimeFormats 类中除了支持这两种timestamp外,还支持 RFC3339_TIMESTAMP_FORMAT。 问题: 1. 为什么RFC3339没有出现在 org.apache.flink.formats.common.TimestampFormat中呢?我在debug窗口试了用RFC3339_TIMESTAMP_FORMAT.parse(jsonNode.asText())是可以出结果的。 2. 开头设置的时区 "Asia/Shanghai" 为什么没有生效?实际返回的时间表示还是UTC时区的 附mysql服务器时区设置如下 mysql> show variables like'%time_zone'; +--++ | Variable_name| Value | +--++ | system_time_zone | CST| | time_zone| SYSTEM | +--++
Re:Flink Kubernetes Operator
The deployment 'cert-manager-webhook' shows Failed to pull image "quay.io/jetstack/cert-manager-webhook:v1.7.1": rpc error: code = Unknown desc = Error response from daemon: Get "https://quay.io/v2/": net/http: TLS handshake timeout 在 2022-04-14 15:40:51,"casel.chen" 写道: >按照其官方文档[1]尝试在mac本地的minikube上运行Flink Kubernetes Operator,结果抛下面的连接错误: > > > >$ helm install flink-kubernetes-operator >flink-operator-repo/flink-kubernetes-operator --set >image.repository=apache/flink-kubernetes-operator > >WARNING: Kubernetes configuration file is group-readable. This is insecure. >Location: /Users/admin/.kube/config > >WARNING: Kubernetes configuration file is world-readable. This is insecure. >Location: /Users/admin/.kube/config > >Error: INSTALLATION FAILED: failed to create resource: Internal error >occurred: failed calling webhook "webhook.cert-manager.io": Post >"https://cert-manager-webhook.cert-manager.svc:443/mutate?timeout=10s": dial >tcp 10.96.143.23:443: connect: connection refused > > > > >实验步骤: > >a) minikube start --kubernetes-version=v1.21.5 --memory 8192 --cpus 4 > >b) kubectl create -f >https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml > >c) helm repo add flink-operator-repo >https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/ > >d) helm install flink-kubernetes-operator >flink-operator-repo/flink-kubernetes-operator--set >image.repository=apache/flink-kubernetes-operator > > > > >[1] >https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/try-flink-kubernetes-operator/quick-start/
Flink Kubernetes Operator
按照其官方文档[1]尝试在mac本地的minikube上运行Flink Kubernetes Operator,结果抛下面的连接错误: $ helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set image.repository=apache/flink-kubernetes-operator WARNING: Kubernetes configuration file is group-readable. This is insecure. Location: /Users/admin/.kube/config WARNING: Kubernetes configuration file is world-readable. This is insecure. Location: /Users/admin/.kube/config Error: INSTALLATION FAILED: failed to create resource: Internal error occurred: failed calling webhook "webhook.cert-manager.io": Post "https://cert-manager-webhook.cert-manager.svc:443/mutate?timeout=10s": dial tcp 10.96.143.23:443: connect: connection refused 实验步骤: a) minikube start --kubernetes-version=v1.21.5 --memory 8192 --cpus 4 b) kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml c) helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-0.1.0/ d) helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator--set image.repository=apache/flink-kubernetes-operator [1] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/docs/try-flink-kubernetes-operator/quick-start/
flink jdbc connector不支持source
现有一个场景是需要用flink一次性批量将某个mysql库下指定表(不同schema)同步到hudi表里面,查了一下官网flink jdbc connector [1] 文档说明只支持sink,不支持source。请问社区有支持计划吗?如果没有的话,自己要如何开发,可以给个例子吗?谢谢! [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/jdbc/
Re:flink on k8s场景,大家一般如何解决访问hdfs的问题呢。
我们是直接使用云存储,像阿里云的oss,没有再搭建hadoop集群。如果flink on k8s的确需要访问hadoop的话,是需要打包hadoop发行包在镜像里面的,配置好core-site.xml, hdfs-site.xml等 在 2022-03-30 12:01:54,"yidan zhao" 写道: >如题,是需要打包hadoop client到镜像中吗。
Re:Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现
用cdc join也需要将事实表缓存下来才能实现吧,这就是普通的regular join,优点是双流驱动,缺点是需要缓存两边的数据,状态会变得很大,建议使用带ssd的rocksdb增量状态后端。 业务上如果可以接受超过一定时间范围不用关联的话,还可以设置state ttl 进一步使状态大小可控。 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道: >Cdc join > >> 2022年3月21日 14:01,JianWen Huang 写道: >> >> 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。 >> 例子: >> 变化前: >> A流: >> name gender >> a male >> b male >> c female >> >> 纬度表B: >> nameage >> a 16 >> b17 >> >> 结果: >> name gender age >> a male 16 >> b male 17 >> >> 发生变化后: >> 纬度表B: >> nameage >> a 16->17 >> b17->18 >> >> 结果: >> name gender age >> a male 17 >> b male 18 >> >> 目前我想到一个做法是将维度表做成流然后关联事实表,最后根据更新时间取top1最新sink到存储里。请问大家有别的更好做法吗
org.apache.flink.runtime.rpc.exceptions.FencingTokenException
Hello, 我有一个Flink 1.13.2 on native kubernetes application作业遇到如下异常,会是什么原因造成的? Starting kubernetes-application as a console application on host dc-ads-ptfz-nspos-sib-trans-sum-6d9dbf587b-tgbmx. ERROR StatusLogger Reconfiguration failed: No configuration found for '135fbaa4' at 'null' in 'null' ERROR StatusLogger Reconfiguration failed: No configuration found for '6c130c45' at 'null' in 'null' ERROR StatusLogger Reconfiguration failed: No configuration found for '6babf3bf' at 'null' in 'null' 18:25:22.588 [flink-akka.actor.default-dispatcher-5] ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Unhandled exception. org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(87e23829e16a630dcaad2c0851744d0c, LocalRpcInvocation(requestExecutionGraphInfo(JobID, Time))) because the fencing token 87e23829e16a630dcaad2c0851744d0c did not match the expected fencing token 92e3b018bda11d4c4598f9335644496a. at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive(Actor.scala:517) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.Actor.aroundReceive$(Actor.scala:515) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.12-1.13.2.jar:1.13.2] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.12-1.13.2.jar:1.13.2]
Re:Re:flink sql jdbc sink事务提交问题
如果mysql配置不是auto commit,那么事务是在哪一步提交呢? 在 2022-02-16 10:24:39,"Michael Ran" 写道: >jdbc 连接 mysql 的driver 记得默认就是AutoCommit。phoenix不太清楚 >在 2022-02-15 13:25:07,"casel.chen" 写道: >>最近在扩展flink sql jdbc >>connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。 >>源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary()方法,里面根据connection.getAutoFlush()与否来执行connection.flush()操作。 >>一开始我没有在phoenix jdbc >>url上添加;autocommit=true参数,发现变化的数据并没有commit到数据库。后来添加了;autocommit=true参数后执行了connection.commit()方法才将数据提交成功。 >> >> >>有几个疑问: >>1. 换成sink进mysql数据库就没有这个问题,难道不同数据库的jdbc sink行为会不一样么? >>2. connection autoflush参数在哪里设置?跟autocommit区别是什么? >>3. >>buffer条数满了或interval周期达到又或者checkpoint时就会执行flush操作,里面执行的是JdbcBatchingOutputFormat.flush方法,这里我也没有找到connection.commit()操作,数据是如何提交到数据库的呢?不开启事务情况下,执行完statement.executeBatch()就会提交么?
flink sql jdbc sink事务提交问题
最近在扩展flink sql jdbc connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。 源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary()方法,里面根据connection.getAutoFlush()与否来执行connection.flush()操作。 一开始我没有在phoenix jdbc url上添加;autocommit=true参数,发现变化的数据并没有commit到数据库。后来添加了;autocommit=true参数后执行了connection.commit()方法才将数据提交成功。 有几个疑问: 1. 换成sink进mysql数据库就没有这个问题,难道不同数据库的jdbc sink行为会不一样么? 2. connection autoflush参数在哪里设置?跟autocommit区别是什么? 3. buffer条数满了或interval周期达到又或者checkpoint时就会执行flush操作,里面执行的是JdbcBatchingOutputFormat.flush方法,这里我也没有找到connection.commit()操作,数据是如何提交到数据库的呢?不开启事务情况下,执行完statement.executeBatch()就会提交么?
[statefun] How to make the input/output topics live in different kafka clusters for flink stateful functions
Hi, I am newbe of flinkd stateful functions. And just want to ask a question: How to make the input/output topics live in different kafka clusters? Thanks!
flink mysql cdc注册confluent schema registry
我想利用flink mysql cdc输出变更数据到kafka,同时将table schema注册到confluent schema registry,以模拟debezium kafka connect效果[1]。还请指教要如何下手呢?谢谢! [1] https://blog.csdn.net/OldDirverHelpMe/article/details/107881170
flink算子级别资源使用设置
flink是否支持算子级别资源使用设置? 如果是flink sql 能否根据生成的Graph配置细粒度资源配置?
flink window支持回撤流吗?
Tumbling / Hopping / Session / Cumulate 这些 window 支持数据来源是回撤流吗?
cumulate window可以在retract流上使用吗?
cumulate window只能在append流上使用吗?可以在retract流或upsert流上使用吗?
flink sql动态累计窗口实现问题
听了FFA2021快手Flink SQL分享有讲到动态累计窗口实现,想问一下Flink开源社区是否有相应实现?或者有相应的JIRA?我们也有这样的使用场景,如果暂时没有的话要如何自己实现?特别是自定义sql语法这块,有没有一些相关的教程?谢谢!
cumulate窗口不支持偏移offset参数吗?
实时统计需求是每隔一小时计算周一到周日每周累计的交易量和总金额,拟使用flink sql cumulate window计算,假定作业启动时间是周四,如果不加offset偏移量参数的话,统计的是这周四到下周四的交易量和总金额,但我查了一下flink官方文档对cumulate window[1]介绍只有两个参数,一个是step,另一个是size,并没有看到offset,请问有没有办法实现?谢谢! SELECTwindow_start,window_end,SUM(price)FROMTABLE(CUMULATE(TABLEBid,DESCRIPTOR(ts),INTERVAL'1'HOUR,INTERVAL'7'DAY))GROUPBYwindow_start,window_end; [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-tvf/#cumulate
FLIP-188 Built-in Dynamic Table Storage 跟当下流行的数据湖技术是什么关系?
问一下FLIP-188 Built-in Dynamic Table Storage 跟当下流行的数据湖技术是什么关系?二者功能是否重复?还是侧重点不同? https://cwiki.apache.org/confluence/display/FLINK/FLIP-188%3A+Introduce+Built-in+Dynamic+Table+Storage
Re:Re: Re: flink sql回撤流sink优化问题
mini-batch优化针对sink算子也有效吗?我是直接aggregate without window然后将聚合结果输出到sink算子。 在 2022-01-06 20:43:00,"Benchao Li" 写道: >这个问题可以用mini-batch[1]来解决呀 > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#minibatch-aggregation > >casel.chen 于2021年12月26日周日 18:01写道: > >> 你说的是upsert-kafka的这两个参数吗? >> >> sink.buffer-flush.max-rows >> sink.buffer-flush.interval >> 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink >> kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2021-12-25 22:54:19,"郭伟权" 写道: >> >> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 >> > >> >casel.chen 于2021年12月23日周四 08:15写道: >> > >> >> flink sql中aggregate without >> >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> >> >> orderid. categorydt amt >> >> >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 >> >> >> >> >> >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> >> >> >> >> INSERT INTO mysql_sink_table >> >> >> >> SELECT category, dt, LAST_VALUE(total) >> >> >> >> OVER ( >> >> >> >> PARTITION BY category >> >> >> >> ORDER BY PROCTIME() >> >> >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> >> >> ) AS var1 >> >> >> >> FROM ( >> >> >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> >> >> ); >> > > >-- > >Best, >Benchao Li
flink作业延迟统计如何实现?
希望统计flink sql作业延迟,包括以下三个指标: 1. kafka消息延迟 (消息进到kafka时间 - 消息事件本身发生时间) 2. flink作业本身延迟 (消息处理完时间 - 消息开始处理时间) 3. 端到端延迟 (消息处理完时间 - 消息事件本身发生时间) 目前Flink 1.13.2是不是只能通过LatencyMarker获取到flink作业本身延迟? 开启的话会有性能影响吗? 据说Flink 1.14 新的source/sink api能够暴露一些metrics,但不清楚要具体怎么使用?谢谢!
Flink on Native K8S作业节点调度问题
k8s集群有2个节点配置了SSD盘,其他节点是普通硬盘,希望实现配置了rocksdb状态后端类型的作业部署到这2个ssd节点,同时希望其他非rocksdb状态后端作业不会被调度到这2个ssd节点 请问:flink 1.13.2有办法实现吗?如果能实现的话,具体应该怎么操作呢?最好给个具体例子,谢谢!
Re:Re: flink sql回撤流sink优化问题
你说的是upsert-kafka的这两个参数吗? sink.buffer-flush.max-rows sink.buffer-flush.interval 确实能达到我想要的效果,但是会引入额外的kafka sink,另外还是从sink kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 在 2021-12-25 22:54:19,"郭伟权" 写道: >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 > >casel.chen 于2021年12月23日周四 08:15写道: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> orderid. categorydt amt >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> INSERT INTO mysql_sink_table >> >> SELECT category, dt, LAST_VALUE(total) >> >> OVER ( >> >> PARTITION BY category >> >> ORDER BY PROCTIME() >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> ) AS var1 >> >> FROM ( >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> );
Re:Re: flink sql回撤流sink优化问题
jdbc sink的buffer-flush不会减少写入的数据量,只是变成微批写入而已,mysql写入的压力并没有减少。 而我想要实现的效果是会减少写的数据量,因为同一个key的数据被聚合成最后一条。 在 2021-12-26 09:43:47,"Zhiwen Sun" 写道: >不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。 > >参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows >参数 > >[1] : >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ > > >Zhiwen Sun > > > >On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? >> >> >> 例如有下面binlog cdc购买数据(订单购买金额会更新): >> >> orderid. categorydt amt >> >> 订单id 商品类型 购买时间(MMddHH) 购买金额 >> >> >> >> >> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 >> >> >> >> INSERT INTO mysql_sink_table >> >> SELECT category, dt, LAST_VALUE(total) >> >> OVER ( >> >> PARTITION BY category >> >> ORDER BY PROCTIME() >> >> RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW >> >> ) AS var1 >> >> FROM ( >> >> SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt >> >> );
Re:Flink SQL Calcite 解析出错
eventInfo_eventTime 我猜测是 BIGINT 类型的吧? order by | range 需要用到 timestamp 类型,需要用计算列转换一下 At 2021-12-24 16:38:00, "Pinjie Huang" wrote: >我的原SQL: >CREATE TABLE consumer_session_created >( >consumer ROW (consumerUuid STRING), >clientIp STRING, >deviceId STRING, >eventInfo ROW < eventTime BIGINT >, >ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, '-MM-dd >HH:mm:ss')), >WATERMARK FOR ts AS ts - INTERVAL '5' SECOND >) WITH ( 'connector'='kafka' >,'topic'='local.dwh.paylater.consumer.session.consumer-session-created.v1' >,'properties.bootstrap.servers'='http://localhost:9092' ,' >properties.group.id'='flink-ato-trusted-consumer' >,'scan.startup.mode'='latest-offset' >,'properties.allow.auto.create.topics'='false' ,'format'='avro-confluent' >,'avro-confluent.basic-auth.credentials-source'='null' >,'avro-confluent.basic-auth.user-info'='null' >,'avro-confluent.schema-registry.url'='http://localhost:8081' >,'avro-confluent.schema-registry.subject'='local.dwh.paylater.consumer.session.consumer-session-created.v1') > >CREATE >TEMPORARY VIEW consumer_session_created_detail as ( >SELECT >csc.consumer.consumerUuid as consumer_consumerUuid, >csc.deviceId as deviceId, >csc.clientIp as clientIp, >csc.eventInfo.eventTime as eventInfo_eventTime >FROM consumer_session_created csc >) > >SELECT >consumer_consumerUuid AS entity_id, >COUNT(DISTINCT deviceId) OVER w AS >sp_c_distinct_device_cnt_by_consumer_id_h1_0, >COUNT (DISTINCT clientIp) OVER w AS sp_d_distinct_ip_cnt_by_consumer_id_h1_0 >FROM consumer_session_created_detail >WINDOW w AS ( >PARTITION BY consumer_consumerUuid >ORDER BY eventInfo_eventTime >RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW >) > >报的错: > >org.apache.flink.client.program.ProgramInvocationException: The main method >caused an error: org.apache.flink.table.api.ValidationException: SQL >validation failed. From line 9, column 15 to line 9, column 31: Data Type >mismatch between ORDER BY and RANGE clause > >at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) > >at >org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) > >at >org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:207) > >at >org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > >at >org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:715) > >at aptflinkjobs.stream.SQLStreamer.lambda$execute$1(SQLStreamer.java:149) > >at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > >at aptflinkjobs.stream.SQLStreamer.execute(SQLStreamer.java:141) > >at aptflinkjobs.stream.SQLStreamer.main(SQLStreamer.java:296) > >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >at >sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >at >sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >at java.lang.reflect.Method.invoke(Method.java:498) > >at >org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) > >at >org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
Flink CDC 2.0 整库同步如何实现?
看文章介绍说Flink CDC 2.0 支持整库同步,见 https://www.jianshu.com/p/b81859d67fec 整库同步:用户要同步整个数据库只需一行 SQL 语法即可完成,而不用每张表定义一个 DDL 和 query。 想知道Flink CDC 2.0 整库同步如何实现?有没有例子?谢谢!
Re:Re:flink on native k8s模式下CPU使用率不高问题
cpu request和limit不同会有什么影响吗?会不会pod竞争不过被kill掉? 在 2021-12-20 11:36:02,"Jeff" 写道: >升级版本没有用的,我用的是flink >1.13.2也遇到这个问题,原因是它request与limit相同,所以后来我改了它的源代码,你可以参考一下:https://github.com/jeff-zou/flink.git > ,我主要是改了KubernetesUtils.java这个类,利用external resource传入参数来替换request > > > > > > > > > > > > > > > > > >在 2021-12-18 09:15:06,"casel.chen" 写道: >>所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 >>kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu >>request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CPU资源要远远高于作业实际运行起来的CPU资源,二者可能相差近5倍左右。如果设置的cpu较低的话,作业启动需要花费很长时间。 >>如何才能够提高作业CPU使用率呢?可以直接修改k8s >>yaml文件将request设置得高一些,而limit设置低一些吗?还有更好的办法吗?升级Flink版本有望解决么?
请教flink sql作业链路延迟监控如何实现
想问一下flink sql作业链路延迟监控如何实现? 我们的flink sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储 想监控如下三种延迟,目前有什么办法实现吗?会有相应的metrics暴露出来吗?目前我们在用的flink版本是1.13.2 1. 端到端的延迟 2. kafka本身的延迟 3. flink处理的延迟
flink sql回撤流sink优化问题
flink sql中aggregate without window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? 例如有下面binlog cdc购买数据(订单购买金额会更新): orderid. categorydt amt 订单id 商品类型 购买时间(MMddHH) 购买金额 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 INSERT INTO mysql_sink_table SELECT category, dt, LAST_VALUE(total) OVER ( PARTITION BY category ORDER BY PROCTIME() RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW ) AS var1 FROM ( SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt );
Re:Re: 回撤流优化
mini-batch只是攒批后再执行,执行数据量并没有减少。而在我这个场景下是需要以攒批内再根据key聚合取最新的结果,执行数据量会大大减少。mini-batch应该还做不到吧? 在 2021-12-16 17:15:45,"Jingsong Li" 写道: >理论上mini-batch就可以优化回撤流。 > >目前是join没有支持mini-batch。 > >On Thu, Dec 16, 2021 at 5:12 PM casel.chen wrote: >> >> 看了《Oceanus的实时流式计算实践与优化》https://jishuin.proginn.com/p/763bfbd5acbf >> 想问一下社区是否有意实现这里说的回撤流优化功能呢? >> 实际业务很多数据是从mysql binlog cdc接入的,在回撤流上做计算是常见的场景,能否在flink sql中支持这些优化呢? > > > >-- >Best, Jingsong Lee
Re:Re:flink on native k8s模式下CPU使用率不高问题
谢谢!我学习一下 在 2021-12-20 11:36:02,"Jeff" 写道: >升级版本没有用的,我用的是flink >1.13.2也遇到这个问题,原因是它request与limit相同,所以后来我改了它的源代码,你可以参考一下:https://github.com/jeff-zou/flink.git > ,我主要是改了KubernetesUtils.java这个类,利用external resource传入参数来替换request > > > > > > > > > > > > > > > > > >在 2021-12-18 09:15:06,"casel.chen" 写道: >>所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 >>kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu >>request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CPU资源要远远高于作业实际运行起来的CPU资源,二者可能相差近5倍左右。如果设置的cpu较低的话,作业启动需要花费很长时间。 >>如何才能够提高作业CPU使用率呢?可以直接修改k8s >>yaml文件将request设置得高一些,而limit设置低一些吗?还有更好的办法吗?升级Flink版本有望解决么?
flink on native k8s模式下CPU使用率不高问题
所用flink版本是1.12.5,部署作业到native k8s设置的不管是 kubernetes.taskmanager.cpu 还是 kubernetes.jobmanager.cpu 最终在k8s yaml文件中显示的cpu request和limit都是一样的。这会导致作业的CPU使用率很低,因为我们发现作业刚启动的时候所需要的CPU资源要远远高于作业实际运行起来的CPU资源,二者可能相差近5倍左右。如果设置的cpu较低的话,作业启动需要花费很长时间。 如何才能够提高作业CPU使用率呢?可以直接修改k8s yaml文件将request设置得高一些,而limit设置低一些吗?还有更好的办法吗?升级Flink版本有望解决么?
紧急bugfix的那些flink jar包在maven中心仓库上找不到
例如 flink 1.13.5,这些jar包有上传到maven中心仓库吗?我没有看到,编译的时候出错了。
双流窗口内join用flink sql实现的语法是什么?
每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么? 需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13