回复: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-02 文章 Liu Join
谢谢!!!


祝好运,
Liu

发件人: Xuyang 
发送时间: 2024年7月2日 14:00
收件人: user-zh@flink.apache.org 
主题: Re:回复: Re:flink1.18 on yarn提交任务生成多个application

可以参考下这[1]




Tips: 社区新语法EXECUTESTATEMENTSET BEGIN ... END; ,也可以用 begin statement set; ... 
end;




[1]https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/insert/#insert-into-multiple-tables







--

Best!
Xuyang





在 2024-07-02 11:42:32,"Liu Join"  写道:
>你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!
>
>发件人: Xuyang 
>发送时间: 2024年7月2日 11:25
>收件人: user-zh@flink.apache.org 
>主题: Re:flink1.18 on yarn提交任务生成多个application
>
>Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
>set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。
>
>
>
>
>sql: begin statement set; ...  end;
>
>java & scala table api: tableEnv#createStatementSet
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2024-07-02 10:04:34,"Liu Join"  写道:
>>版本:flink1.18、hadoop3.0.0
>>提交方式:per-job
>>
>>问题:
>>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris 
>>sink,另一个输出为流转表的print
>>
>>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>>
>>
>>请问这是什么原因


回复: Re:flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Liu Join
你好,感谢回复,请问有官网文档么,我想详细了解下,谢谢!

发件人: Xuyang 
发送时间: 2024年7月2日 11:25
收件人: user-zh@flink.apache.org 
主题: Re:flink1.18 on yarn提交任务生成多个application

Hi, 如果是不想出现两个application的情况,可以试一下使用statement 
set将两个dml放在一起。否则的话,会被视为两个单独的操作,分成两个application。




sql: begin statement set; ...  end;

java & scala table api: tableEnv#createStatementSet




--

Best!
Xuyang





在 2024-07-02 10:04:34,"Liu Join"  写道:
>版本:flink1.18、hadoop3.0.0
>提交方式:per-job
>
>问题:
>1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
>sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print
>
>2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
>sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b
>
>
>请问这是什么原因


flink1.18 on yarn提交任务生成多个application

2024-07-01 文章 Liu Join
版本:flink1.18、hadoop3.0.0
提交方式:per-job

问题:
1. 使用flink sql编写的程序,数据源相同,输出定义了一个doris 
sink,以及一个表转流的print,将jar包提交到yarn会生成两个application,一个输出为doris sink,另一个输出为流转表的print

2.  使用flink sql编写的程序,数据源相同,输出定义了两个doris 
sink,表a和表b,将jar包提交到yarn会生成两个application,一个输出为doris sink表a,另一个输出为doris sink表b


请问这是什么原因


关于RichFlatMapFunction的状态输出

2023-08-10 文章 Liu Join
请问,flink1.17在使用RichFlatMapFunction进行批计算时,如何在数据结束时将状态写入输出的数据流中?
谢谢


关于使用DataStream实现有界流的join

2023-07-28 文章 Liu Join
Hi,
如题,请教一下关于如何使用DataStream API实现有界流的join操作,我在调用join的时候必须要window,怎么避免,还是需要使用SQL 
API才可以

感谢,
鱼


回复: 关于DataStream API计算批数据的聚合值

2023-07-26 文章 Liu Join
你好,感谢回复。我使用reduce解决了问题。
祝好运。

发件人: weijie guo 
发送时间: 2023年7月26日 10:50
收件人: user-zh@flink.apache.org 
主题: Re: 关于DataStream API计算批数据的聚合值

你好:

Batch 模式下的 reduce 操作默认应该就是只输出最后一条数据(per-key)的。Agg 的话可能有点麻烦,可以使用
GlobalWindow + 自定义 Trigger 来 Workaround.

Best regards,

Weijie


Liu Join  于2023年7月26日周三 09:10写道:

> 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
>


关于DataStream API计算批数据的聚合值

2023-07-25 文章 Liu Join
例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值


关于flink批计算

2023-06-29 文章 Liu Join
请教下,如果用flink进行批计算,使用DataStream API有没有什么优化的地方,是否可以直接将数据作为矩阵在算子之间传递进行计算


回复: keyBy 后的 getKey 函数调用了两次

2022-03-01 文章 Liu Join
Reduce函数中,a可以认为是状态,你应该返回a试试,最好还是根据时间或者别的做个判断,然后输出,当然这些前提都是你的数据间隔小于10s
从 Windows 版邮件发送

发件人: Lei Wang
发送时间: 2022年3月1日 11:20
收件人: user-zh@flink.apache.org
主题: Re: keyBy 后的 getKey 函数调用了两次

谢谢,了解了。

另外一个问题,我 timeWindown 之后只想保留最后一条在这个 window 中的数据直接输出:

env.addSource(consumer).keyBy(new KeySelector() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}).*timeWindow(Time.seconds(10)).reduce((a,b)->b).*addSink(new SinkTest(1));

上面的代码我测试了下不符合预期,其实是逆序输出了窗口中左右的记录。

需要用什么方式实现这个功能比较合适呢?


On Tue, Mar 1, 2022 at 10:52 AM yidan zhao  wrote:

>
> keyBy的时候调用一次,sink的时候应该也会调用一次。因为keyBy是hash分区,前后是不chain在一起的。sink部分处理输入的ele的时候需要基于keySelector获取key。
>
> Lei Wang  于2022年3月1日周二 10:49写道:
>
> > 接收 kafka 的数据后 keyBy, sinkTest 中什么也没做。
> >
> > env.addSource(consumer).keyBy(new KeySelector() {
> > @Override
> > public String getKey(String value) throws Exception {
> > System.out.println(value);
> > return value;
> > }
> > }).addSink(new SinkTest(1));
> >
> >
> > 我自己做测试,每发送一条消息console 会打印两次,也就是 System.out.println(value) 被调用了两次。
> >
> > 为什么会这样呢,哪个地方还调用了 getKey 这个函数呢?
> >
> >
> > 谢谢,
> >
> > 王磊
> >
>



回复: 实时数据入库怎样过滤中间状态,保证最终一致

2022-02-27 文章 Liu Join
有没有考虑过将多条数据拼接为一条replace 
SQL写入数据库,这样也不会对数据库造成太大的压力,至于多少条拼接为一条可以去测试再决定,我用过的有500,1000条数据拼接为一条sql写入数据库。

从 Windows 版邮件发送

发件人: yidan zhao
发送时间: 2022年2月28日 10:20
收件人: user-zh
主题: Re: 实时数据入库怎样过滤中间状态,保证最终一致

对数据的实时性、延迟有多严格。
基于process算子处理,来一个orderId,记录status到状态,不要输出,同时设置定时器触发输出。
新数据进入,先判定状态,timestamp更大就更新状态,并更新定时器。

比如说定时器10s,就是一个订单只有连续10s状态不变才会被输出,否则就等待后续可能的更大ts的订单状态。
前提是你的订单必须带ts这个字段,能表达哪个order更晚。

18703416...@163.com <18703416...@163.com> 于2022年2月28日周一 10:00写道:

> keyBy 算子之后接 timewindow 窗口, 每个窗口如果有多条数据就取最新的一条。 至于对数据库的压力,取决于这个窗口的大小
>
> > 2022年2月25日 下午6:45,Lei Wang  写道:
> >
> > 场景描述:
> > Kafka 中的数据直接入到 MySQL 数据库中,数据格式如下:
> > order_id   status
> > 只有两个字段, order_id 为主键,以 replace 覆盖方式写入到数据库中。
> >
> > 对同一个 order_id, status 变化很频繁,为不对数据库造成压力,不会对每一条记录都做入库操作,但一定要保证这个 order_id
> > 最终的状态不丢,但这个最终的状态也不确定是多少。
> >
> > 我的做法是 KeyBy  orderId 后判断两条记录的时间间隔,如果时间间隔太小不做入库操作,但如果这个 order_id
> > 最后来的两条记录时间间隔太小,会导致最终的状态丢失。
> >
> > 请问有什么其他的解决方法吗?
> >
> > 谢谢,
> > 王磊
>
>



flink执行任务失败,taskmanager内存不释放

2022-01-20 文章 Liu Join
环境:flink1.13.5,Standalone模式,taskmanager内存4GB,两个slot

任务数据量很少,不到10MB,任务执行时,taskmanager内存就一直上涨知道报错重启,但因为任务重启后taskmanager内存没有释放,导致任务彻底失败,taskmanager服务也挂了

从 Windows 版邮件发送



回复: flink任务提交到集群执行一段时间报错Java heap space

2022-01-20 文章 Liu Join

我已经将5s的时间窗口替换为100条的countWindowAll,具体实现为使用aggregate函数将窗口内的数据拼接为一条sql语句,sql语句如下:replace
 into table (a1,a2,a3,a4,..) values(…)
但还是没有解决,
heap dump暂时无法提供,
taskmanager内存分配如下:
task heap:2.76G,network:343MB,JVMMetaspace:256MB

我一共运行了两个任务,都会出现这种问题,但之前写过一个简单的数据同步的程序没有出错,就是将一个MySQL库中的500张表同步到另一个MySQL库,不知道对于这种问题有没有解决的方向。
之前在监控任务运行时发现是MySQLsource先失败,然后导致整个任务挂了,在开启checkpoint时,MySQLsource和开窗之前的部分为一个parallelism,这个parallelism的checkpoint大小一直是136MB,从任务开始到结束都是136MB,其他运算的checkpoint不到1MB,是否有这部分原因
从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送

发件人: Caizhi Weng<mailto:tsreape...@gmail.com>
发送时间: 2022年1月21日 10:52
收件人: flink中文邮件组<mailto:user-zh@flink.apache.org>
主题: Re: flink任务提交到集群执行一段时间报错Java heap space

Hi!

5s 的窗口拼接 sql 语句看起来比较可疑,具体是怎么实现的?另外可以把 task manager 的 heap dump
出来看一下哪里占比较多的堆内存。

Liu Join  于2022年1月20日周四 13:28写道:

> 环境:
>
> flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。
>
>
> 任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。
>
> 报错内容:
> java.lang.OutOfMemoryError: Java heap space
>
> 报错表象:
>
> 整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
> 从 Windows 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>发送
>
>



flink任务提交到集群执行一段时间报错Java heap space

2022-01-19 文章 Liu Join
环境:
flink1.13.5,Standalone模式集群,jobmanager内存2GB,taskmanager内存4GB,集群包括一个jobmanager和两个taskmanager,每个taskmanager有2个slot。

任务内容是读取2万张表的数据,数据每1分钟一条,每10分钟输出每张表的最后一条数据。代码中使用了map、filter、watermark、开了一个10分钟的滑动窗口,使用reduce获得最后一条数据,因为sink是mysql,配置不高,所以将最后一条数据拼成批量插入语句才往MySQL写入。开了一个5s的窗口用于拼接sql语句。

报错内容:
java.lang.OutOfMemoryError: Java heap space

报错表象:
整个taskmanager内存被占满,任务失败重启后taskmanager内存仍然是满的,导致任务再次失败。之后任务直接挂了。时间长了之后内存没释放,Taskmanager进程也会挂了。
从 Windows 版邮件发送



使用flinkcdc2.1.1读取MySQL数据报错

2022-01-19 文章 Liu Join
环境:flink1.13.5,flinkcdc2.1.1,mysql5.7,idea2020
报错出现的时间没有规律,有可能很长时间不出现,有可能刚启动任务就报错,
报错如下:
Caused by: io.debezium.DebeziumException
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
... 5 more
Caused by: java.lang.ArrayIndexOutOfBoundsException

已经添加如下配置:

properties.put("database.history.skip.unparseable.ddl","true");
properties.put("database.history.store.only.monitored.tables.ddl","true");

从 Windows 版邮件发送



flinkCDC2.1.1

2021-12-29 文章 Liu Join
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
图床链接:报错图片[cid:image003.png@01D7FD86.BF5AE890]

从 Windows 版邮件发送



flink cdc2.1.1

2021-12-29 文章 Liu Join
使用flinkCDC2.1.1读取MySQL数据,一段时间后报错
[cid:image003.png@01D7FD73.DAE77FF0]

从 Windows 版邮件发送



撤回流如何进行窗口分组聚合

2021-09-28 文章 Liu Join
我将数据流进行去重后,无法进行窗口聚合操作,一直报错GroupWindowAggregate doesn't support consuming update 
and delete changes which is produced by node Deduplicate


回复: GroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate

2021-09-27 文章 Liu Join
Exception in thread "main" org.apache.flink.table.api.TableException: 
GroupWindowAggregate doesn't support consuming update and delete changes which 
is produced by node Deduplicate(keep=[FirstRow], key=[dnt, ordernum, t1, csq, 
num, type], order=[ROWTIME])
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:165)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:322)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:204)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
at