回复: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-02 文章 Liu Join
://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/insert/#insert-into-multiple-tables -- Best! Xuyang 在 2024-07-02 11:42:32,"Liu Join" 写道: >你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢! > >发件人: Xuyang >发送时间: 2024年7月2日 11:25 >收件人: user

回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Liu Join
; ... end; java & scala table api: tableEnv#createStatementSet -- Best! Xuyang 在 2024-07-02 10:04:34,"Liu Join" 写道: >版本:flink1.18、hadoop3.0.0 >提交方式:per-job > >问题: >1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris >sink,以及一个表转流的print,将jar包提交到yarn会生成两个applic

flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Liu Join
版本:flink1.18、hadoop3.0.0 提交方式:per-job 问题: 1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print 2. 使用flink sql编写的程序,数据源相同,输出定义了两个doris sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b 请问这是什么原因

关于RichFlatMapFunction的状态输出

2023-08-10 文章 Liu Join
请问,flink1.17在使用RichFlatMapFunction进行批计算时,如何在数据结束时将状态写入输出的数据流中? 谢谢

关于使用DataStream实现有界流的join

2023-07-28 文章 Liu Join
Hi, 如题,请教一下关于如何使用DataStream API实现有界流的join操作,我在调用join的时候必须要window,怎么避免,还是需要使用SQL API才可以 感谢, 鱼

回复: 关于DataStream API计算批数据的聚合值

2023-07-26 文章 Liu Join
, Weijie Liu Join 于2023年7月26日周三 09:10写道: > 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值 >

关于DataStream API计算批数据的聚合值

2023-07-25 文章 Liu Join
例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值

关于flink批计算

2023-06-29 文章 Liu Join
请教下,如果用flink进行批计算,使用DataStream API有没有什么优化的地方,是否可以直接将数据作为矩阵在算子之间传递进行计算

回复: keyBy 后的 getKey 函数调用了两次

2022-03-01 文章 Liu Join
Reduce函数中,a可以认为是状态,你应该返回a试试,最好还是根据时间或者别的做个判断,然后输出,当然这些前提都是你的数据间隔小于10s 从 Windows 版邮件发送 发件人: Lei Wang 发送时间: 2022年3月1日 11:20 收件人: user-zh@flink.apache.org 主题: Re: keyBy 后的 getKey 函数调用了两次 谢谢,

回复: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 Liu Join
有没有考虑过将多条数据拼接为一条replace SQL写入数据库,这样也不会对数据库造成太大的压力,至于多少条拼接为一条可以去测试再决定,我用过的有500,1000条数据拼接为一条sql写入数据库。 从 Windows 版邮件发送 发件人: yidan zhao 发送时间: 2022年2月28日 10:20 收件人: user-zh 主题: Re: 实时数据入库怎样过滤中

flink执行任务失败,taskmanager内存不释放

2022-01-20 文章 Liu Join
环境:flink1.13.5,Standalone模式,taskmanager内存4GB,两个slot 任务数据量很少,不到10MB,任务执行时,taskmanager内存就一直上涨知道报错重启,但因为任务重启后taskmanager内存没有释放,导致任务彻底失败,taskmanager服务也挂了 从 Windows 版邮件发送

回复: flink任务提交到集群执行一段时间报错Java heap space

2022-01-20 文章 Liu Join
com> 发送时间: 2022年1月21日 10:52 收件人: flink中文邮件组<mailto:user-zh@flink.apache.org> 主题: Re: flink任务提交到集群执行一段时间报错Java heap space Hi! 5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump 出来看一下哪里占比较多的堆内存。 Liu Join 于2022年1月20日周四 13:28写道: > 环境: > > flink1.13.5,Standalone模式集群,jo

flink任务提交到集群执行一段时间报错Java heap space

2022-01-19 文章 Liu Join
环境: flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。 报错内容: java.lang

使用flinkcdc2.1.1读取MySQL数据报错

2022-01-19 文章 Liu Join
环境:flink1.13.5,flinkcdc2.1.1,mysql5.7,idea2020 报错出现的时间没有规律,有可能很长时间不出现,有可能刚启动任务就报错, 报错如下: Caused by: io.debezium.DebeziumException at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ... 5 more Caused by: java.lang.ArrayIndexOutOfBo

flinkCDC2.1.1

2021-12-29 文章 Liu Join
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错 图床链接:报错图片[cid:image003.png@01D7FD86.BF5AE890] 从 Windows 版邮件发送

flink cdc2.1.1

2021-12-29 文章 Liu Join
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错 [cid:image003.png@01D7FD73.DAE77FF0] 从 Windows 版邮件发送

撤回流如何进行窗口分组聚合

2021-09-28 文章 Liu Join
我将数据流进行去重后,无法进行窗口聚合操作,一直报错GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

回复: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 文章 Liu Join
Exception in thread "main" org.apache.flink.table.api.TableException: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[dnt, ordernum, t1, csq, num, type], order=[ROWTIME]) at org.apache.flink.table.planner.plan.