1 可控范围即可。
2 分析阶段可以分开,实际运行阶段看情况,怎样性能高就如何搞。
3 看监控,flink web ui有根据每个节点的反压情况按照不同颜色展示。
星海 <2278179...@qq.com.invalid> 于2023年8月16日周三 22:03写道:
>
> hello。大家好,请教几个问题:
> 1、flink中背压存在是合理的吗?还是在可控范围内就行?还是尽可能没有呢?
> 2、如果出现背压,如果多个operator chain 在一起不好分析,需要先将其拆开分析吗?
>
GC日志看GC原因
2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
>
> 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!
首先你窗口是30min,刚刚开始肯定会是涨的。
其次,后续稳定后,继续涨可能是因为流量在变化。
最后,流量不变情况下,还可能受到延迟的影响。
lxk 于2023年7月25日周二 11:22写道:
>
> 相关配置:
> Flink:1.16
>
> | Checkpointing Mode | Exactly Once |
> | Checkpoint Storage | FileSystemCheckpointStorage |
> | State Backend | EmbeddedRocksDBStateBackend |
> | Interval | 8m 0s |
>
>
>
这个取决于你是什么模型,比如python中sklearn的大多模型都可以导出成pmml格式模型,然后java用jpmml库就可以导入进行预测。
如果是tensorflow模型,也有,只不过我忘记了,你可以找找。
15904502343 <15904502...@163.com> 于2023年8月1日周二 16:48写道:
>
> 您好
> 我想知道是否有代码示例,可以在Flink程序中加载预先训练好的编码模型(用python编写)
可以描述再详细点
1 于2023年6月7日周三 19:55写道:
>
> 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
>
>
>
>
>
k11 是不是用的最新版本,不是的话,觉得也可以尝试一下最新版本。
>
> 如果 jdk11 用的最新版本,可以尝试下使用其他 GC 算法是否也有同样问题。比如 -XX:+UseParallelGC
> -XX:NewRatio=3 -XX:ParallelGCThreads=4 -XX:CICompilerCount=4
> -XX:-CompactStrings
>
> Best,
> Yuxin
>
>
> yidan zhao 于2023年5月26日周五 17:39写道:
>
> > 最近升级fli
你在hive的catalog中定义表的时候就可以定义好event time,以及watermark呀。
ZhaoShuKang 于2023年5月25日周四 08:53写道:
>
> 各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL
> 设置事件时间有三种方式:
> 1、在 DDL 中定义
> 2、在 DataStream 到 Table 转换时定义
> 3、使用 TableSource 定义
>
这个得靠你自己打日志吧,在可能出NPE的地方 try catch 到,然后打印原始记录。
小昌同学 于2023年5月29日周一 18:30写道:
>
> 你好,数据源是kafka,使用的是stream api
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
> 回复的原邮件
> | 发件人 | Weihua Hu |
> | 发送日期 | 2023年5月29日 15:29 |
> | 收件人 | |
> | 主题 | Re: flink 输出异常数据 |
> Hi,
>
>
你好,这个问题和flink无关,看你主键实现机制吧,如果是自增,那就是mysql级别自动实现的自增,跟flink搭不上关系的。
小昌同学 于2023年5月31日周三 09:41写道:
>
> 老师,你好,再请教一下,连接数与并行度有关系的话,如果插入数据的MySQL是有主键的话,是不是连接数据也就是并行度只能为1啦呀,如果是多个并行度的话,可能会造成主键冲突;
> 感谢各位老师的指导
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
> 回复的原邮件
> | 发件人 | lxk |
> | 发送日期 |
没发现你web ui哪里显示了watermark呢?
小昌同学 于2023年5月31日周三 10:22写道:
>
> 你好,老师,感谢回复,我这边将截图放在了腾讯文档中,请查收;
> 感谢各位老师的指导
> 【腾讯文档】flink web ui
> https://docs.qq.com/sheet/DYkZ0Q0prRWJxcER4?tab=BB08J2
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |
> 回复的原邮件
> | 发件人 | Shammon FY |
> | 发送日期 |
最近升级flink版本和jdk版本,flink从1.15.2升级到1.17.0,jdk从8升级到11。然后出现大量full gc。
分析后,发现主要是 System.gc() 导致。 进一步定位到是 redisson 库中 netty 部分用到了 DirectMemory
导致。 直接内存不足,导致频繁调用 System.gc 触发 full gc。
我现在问题是,通过测试对比实验发现,jdk8+flink1.17没问题,jdk11+flink1.17就会有该问题。
有人知道原因嘛?
其他信息:
如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。
小昌同学 于2023年5月25日周四 09:32写道:
>
> 各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题;
> 我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒,
> 但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在:
> 相关代码以及样例数据如下:
> |
>
如题,想知道这个分类的标准是啥呢?
从哪方面考虑,主要根据每个算子的工作复杂性,复杂性越高自然设置越高的并发好点。 其次实际运行时,也可以根据反压情况找到瓶颈进行调整。
Shammon FY 于2023年4月21日周五 09:04写道:
>
> Hi
>
> DataStream作业设置并发度有两种方式
> 1. 在ExecutionEnvironment通过setParallelism设置全局并发
> 2. 在DataStream中通过setParallelism为指定的datastream计算设置并发度
>
> Best,
> Shammon FY
>
> On Fri, Apr 21, 2023 at 8:58
如题,目前看实现,这个 windowStagger 是针对 opeartor 的众多 subtask 之间,针对每个 subtask
生成了一个固定的 offset 作用于该 subtask 处理的元素。因为 staggerOffset 是在 assignWindows
中生成,而且只有第一次会生成,后续复用。如下:
if (staggerOffset == null) {
staggerOffset =
windowStagger.getStaggerOffset(context.getCurrentProcessingTime(),
size);
}
设置 taskmanager.network.tcp-connection.enable-reuse-across-jobs 为
false,设置 taskmanager.network.max-num-tcp-connections 大点。
之前有个bug导致这个问题我记得,不知道1.16修复没有。
zhan...@eastcom-sw.com 于2023年4月3日周一 10:08写道:
>
>
> hi, 最近从1.14升级到1.16后,kafka消费不定时会出现
>
实际使用你肯定不会是console producer吧。或者你换java代码写kafka,方便控制些。
wei_yuze 于2023年2月8日周三 13:30写道:
>
> 非常感谢各位的回答!
>
>
>
> Weihua和飞雨正确定位出了问题。问题出在Flink 并发数大于Kafka分区数,导致部分Flink task slot
> 接收不到数据,进而导致watermark(取所有task slot的最小值)无法推进。
>
>
> 我尝试了Weihua提供的两个解决方案后都可以推进watermark求得窗口聚合结果。
>
>
>
I think it is a bug: https://issues.apache.org/jira/browse/FLINK-25732
Yael Adsl 于2022年12月12日周一 23:56写道:
>
> Hi,
>
> We are running a flink cluster (Flink version 1.14.3) on kubernetes with
> high-availablity.type: kubernetes. We have 3 jobmanagers. When we send jobs
> to the flink cluster, we
话说我也有个问题,stop后基于savepoint恢复 不同于先savepoint,然后cancel后基于savepoint恢复?
Weihua Hu 于2023年1月5日周四 10:38写道:
>
> Hi,
>
> 简单来说是不能,已经 cancel 的 job 状态不能恢复到 running 状态。用 savepoint 恢复的任务是新的 job。
>
> 这个问题的背景是什么呢?什么情况下需要将已经 cancel 的 job 恢复呢?
>
>
> Best,
> Weihua
>
>
> On Fri, Dec 30, 2022 at 5:12 PM 陈佳豪 wrote:
>
看报错 Could not connect to BlobServer at address
localhost/127.0.0.1:33271,你本地的配置是不是不对。提交到什么模式部署的集群,配置是否配对了。
WD.Z 于2023年1月10日周二 10:56写道:
>
> 任务在webui点击submit时报错,看起来是从JM提交到TM时报错,服务器防火墙已关闭,资源足够,还没有安装hadoop,但以standalone模式启动,看了下文档是不需要hadoop?
> 报错中的Caused by列表如下:
>
>
> 2023-01-10 09:46:14,627 INFO
>
这没啥问题,但是你代码不能这么写,try catch放 while 内部去。放外边catch到异常不就退出循环了!
bigdata <1194803...@qq.com.invalid> 于2022年12月11日周日 15:12写道:
>
> java.lang.InterruptedException: sleep interrupted
> at java.lang.Thread.sleep(Native Method)
> at
>
目前感觉和 https://issues.apache.org/jira/browse/FLINK-19249 和
https://issues.apache.org/jira/browse/FLINK-16030
有点类似。网络环境不稳定。相同配置在物理机没问题。
yidan zhao 于2022年12月7日周三 16:11写道:
>
> 谢谢,不过这几个参数和netty关系不大吧。
> heartbeat和akka的可能会和rpc超时有关,不过我这个是netty的报错,不是rpc部分。
> web和rest应该是和client提交任务有关。
>
>
谢谢,不过这几个参数和netty关系不大吧。
heartbeat和akka的可能会和rpc超时有关,不过我这个是netty的报错,不是rpc部分。
web和rest应该是和client提交任务有关。
Stan1005 <532338...@qq.com.invalid> 于2022年12月7日周三 15:51写道:
>
> 我也遇到过,tm的slot数一直是2,并行度高了就很容易出这个报错。tm内存保持为20480mb,相同的job讲并行度降低到256就没有报过这个。
> 另外可以考虑适当增加这几个参数(具体需要改动哪些建议先搜下这些参数的作用)
> set
如题,这个问题长期存在,我想了解几个点:
(1)connection time out
是连接时才会报的错误嘛?作业正常运行期间可能有嘛?我理解是连接时的报错,但是我看部分报错是作业运行不少时间才报错的(比如40分钟,1小时多),这种时刻为什么会有
connect 操作呢?netty的connection不是在作业启动时,就发 partition request 的时候创建好的嘛。
(2)之前调整过 netty 的 server 的 backlog,目前设置2048,不应该是这个导致。
(3)之前我TM都是1个slot,netty的server
通过savepoint方式先停止作业可以,不停止,你要考虑是否你的作业是否能做到重复处理部分数据不影响准确性。
先做savepoint但不停止作业,新作业启动后,新旧作业是消费的数据是重复的,不会因为相同group就不重复。
因为kafka的消费是2个模式,一个是组模式,还有一个是不受到组约束的。Flink采用的是后者。
我说的那个方法是在kafka后边加一个filter,filter的参数就是start和end,根据start和end过滤数据。
而且这个start和end需要可动态配置,就是不重启作业能配置才行。
应该是不行,必须先停止。
除非业务层面做了改动,像我业务的话我支持动态配置消费数据的开始结束时间的过滤。这样假设当前作业为A,当前时间9点55分,先动态设置A消费到10点就停止。在10点前启动新作业B,并设置作业B从10点的作业开始消费。这样10点之后比如10点5分左右确认作业A已经消费完10点前数据且sink完就可以停止了。
否则没办法,指定相同group也不可以的应该,我记得flink是使用主动assign分区的方式使用kafka的,因此如果前后作业同时存在,实际是重复消费,不存在共享消费的概念。
刘超 于2022年12月1日周四 09:36写道:
>
> kafka
好吧,sql我具体不了解,我用的stream api比较多,我了解是stream
api到streamGraph,然后到jobGraph,然后就是直接rest api方式提交给集群执行。 standalone场景。
casel.chen 于2022年11月30日周三 00:16写道:
>
> 如果要支持调整flink sql作业每个算子资源和并行度的话,不是要从json转回streamGraph再提交吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
&g
并不需要从执行计划json生成streamGraph呀~
streamGraph提交之前直接转jobGraph。
casel.chen 于2022年11月28日周一 08:53写道:
>
> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教
hi,我们继续实验了reuse=false,max-num-tcp-connections继续为1的情况,也解决了问题。因此是reuse的问题。
yidan zhao 于2022年11月21日周一 10:34写道:
>
> 我们这边其实是生产环境一直有类似local/remote transport异常,其中一大类就是 Sending the partition
> request to '...' failed 这种错误。
>
> 当前是 1.15.2,默认 taskmanager.network.max-num-tcp-
日志太少,建议开启debug日志后,发出来完整日志看看。
junjie.m...@goupwith.com 于2022年11月21日周一 19:04写道:
>
> flink1.15.2无论blob.server.port设置什么端口范围,都会报错。
> 如设置blob.server.port=“50100-50200”时报错:
> Caused by: java.io.IOException: Unable to open BLOB Server in specified port
> range: 50100
> at
If no data skew exists, you can set the job's parallelism any times of
the count of taskmanagers, and set `cluster.evenly-spread-out-slots`
to true in flink-conf.yaml of your flink cluster.
harshit.varsh...@iktara.ai 于2022年11月7日周一 20:41写道:
>
> Dear Team,
>
>
>
> I need some advice on setting up
> taskmanager.network.tcp-connection.enable-reuse-across-jobs 影响了任务稳定性。
>
> 非常期待你的反馈,谢谢。
>
> fanrui
>
> On 2022/11/18 04:25:47 yidan zhao wrote:
> > Hi, weijie guo.
> > 你在 https://issues.apache.org/jira/browse/FLINK-28695
> > 中提到的解决方案,我们这边实验观察看起来是有效的。 当然我们任
aring的话也只会把A和B相同并发的share在一起,连接其他的subtask A还是要建立连接。
> 2.指的是作业jar,每个TM只会下载一次
>
> Best regards,
>
> Weijie
>
>
> yidan zhao 于2022年10月31日周一 19:54写道:
>
> > 嗯,问题1我主要是在想,这种复杂的连接关系,会不会增大Sending the partition request to '...'
> > failed;这种异常的概率。
> > 问题2,你提到
First of all, you should trigger a savepoint before stopping the job,
and then you can restart the job with the savepoint.
For checkpoints, you need to set
‘execution.checkpointing.externalized-checkpoint-retention’ to
'RETAIN_ON_CANCELLATION'. You can get the checkpoints info via history
server.
on reuse的,默认TM间建立一个物理TCP连接。
> 并发大了的话,你的TM只有一个slot,启动的TM会变多。task全变成running的状态变慢的因素也比较多:有些TM容器在的结点比较慢、下载jar包时间长、state
> restore慢等
>
> Best regards,
>
> Weijie
>
>
> yidan zhao 于2022年10月30日周日 11:36写道:
>
> > 如题,我生产集群频繁报 org.apache.flink.runtime.io
> > .network.
当前我发现部分奇怪现象,比如A=>B。
存在A处于反压,但是B全部都是idle的,busy为0,这种情况是什么原因呢?
如题,我生产集群频繁报
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException
异常,具体异常cause有如下情况,按照出现频率从高到底列举。
(1)Sending the partition request to '...' failed;
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to
> >
> >
> > 为了要10分钟发送,是因为上游太多数据, 所以我先提前用窗口个聚合一下,目前一秒将近有 800MB 的流量
> >
> >
> >
> > Shammon FY 于2022年10月20日周四 11:48写道:
> >
> >> 如果必须要10分钟,但是key比较分散,感觉这种情况可以增加资源加大一下并发试试,减少每个task发出的数据量
> >>
> >> On Thu, Oct 20, 2022 at 9:49 AM
这个描述前后矛盾,写出速度跟不上导致反压,那控制写出速度不是问题更大。不过你不需要考虑这些,因为你控制不了写出速度,只能控制写出时机。
写出时机是由window的结束时间和watermark决定的,所以如果真要解决,需要控制分窗不要固定整点10分钟。
macia kk 于2022年10月20日周四 00:57写道:
>
> 聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。
>
> 如果控制一下写出的速度,让他慢慢写会不会好一些
<
> > hinobl...@gmail.com;
> > 发送时间:2022年8月23日(星期二) 晚上11:09
> > 收件人:"user-zh" >
> > 主题:Re: Re: flink1.15.1 stop 任务失败
> >
> >
> >
> > 1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
> > 2 也可以从 cancel 和 stop 的区别上考虑下?
> > 3 补充
任务假设:
任务从kafka读取数据,经过若干复杂处理(process、window、join、等等),然后sink到kafka。
并发最高240(kafka分区数),当前采用全部算子相同并发方式部署。
算子间存在 hash、forward、rebalance 等分区情况。
此处假设 A 和 B 算子之间是 rebalance。 C 和 D 算子直接是 hash 分区(无数据倾斜)。ABCD都是240并发。 其他算子暂忽略。
TM连接数:
Flink 的 taskmanager 之间的共享 tcp
ck“打”完是啥意思。
Congxian Qiu 于2022年10月10日周一 15:11写道:
>
> Hi
> 可以的话也同步下相关的计算逻辑,从 checkpoint 恢复后的统计结果可能会和计算逻辑有关
> Best,
> Congxian
>
>
> Hangxiang Yu 于2022年10月10日周一 14:04写道:
>
> > 是什么值下跌呢?哪个metric吗?
> >
> > On Mon, Oct 10, 2022 at 1:34 PM 天下五帝东 wrote:
> >
> > > Hi:
> > >
加下游的等待时间和重试次数,减小出现
> PartitionNotFoundException 的概率。
>
> Best,
> Lijie
>
> yidan zhao 于2022年9月28日周三 17:35写道:
>>
>> 按照flink的设计,存在上游还没部署成功,下游就开始请求 partition 的情况吗? 此外,上游没有部署成功一般会有相关日志不?
>>
>> 我目前重启了集群后OK了,在等段时间,看看还会不会出现。
>>
>> Shammon FY 于2022年9月28日周三 15:45写道:
>
署成功。
> 所以从这个异常错误来看netty连接是通的,你可能需要根据输出PartitionNotFoundException信息的计算任务,查一下它的上游计算任务为什么没有部署成功
>
> On Tue, Sep 27, 2022 at 10:20 PM yidan zhao wrote:
>>
>> 补充:flink1.15.2版本,standalone集群,基于zk的ha。
>> 环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
>>
>>
其实可以和kafka的pull模型对比下,kafka消费是不断轮训pull。我的认知中flink应该不是吧?
flink应该仅仅是请求 result partition 的时候下游主动去上游请求? 建立之后应该就是类似一条连接不断读取数据?
yanfei lei 于2022年9月22日周四 11:31写道:
>
> Hi,
> Flink社区有一篇关于Credit-based Flow Control的blog post
>
补充:flink1.15.2版本,standalone集群,基于zk的ha。
环境是公司自研容器环境。3个容器启JM+HistoryServer。剩下几百个容器都是TM。每个TM提供1个slot。
yidan zhao 于2022年9月27日周二 22:07写道:
>
> 此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
> 我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
> 会不会剩下400的TM的连接,时间厂了就会出现某种问题?
>
> yidan zhao 于20
此外,今天还做了个尝试,貌似和长时间没重启TM有关?重启后频率低很多会。
我预留的TM很多,比如500个TM,每个TM就提供1个slot,任务可能只用100个TM。
会不会剩下400的TM的连接,时间厂了就会出现某种问题?
yidan zhao 于2022年9月27日周二 16:21写道:
>
> 打开了TM的debug日志后发现很多这种日志:
> Responding with error: class
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
>
打开了TM的debug日志后发现很多这种日志:
Responding with error: class
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException
目前问题的直观表现是:提交任务后,一直报 LocalTransportException:
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
Sending the partition request to
之前是如何实现的,通过 kafka 的record key?
casel.chen 于2022年9月26日周一 23:21写道:
>
> flink cdc
> 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?
应该不行吧,kafka client本身就没有限速的功能。
Jason_H 于2022年9月26日周一 10:17写道:
>
> Hi,各位大佬:
> 我们在使用flink消费kafka的时候,是否可以在代码中自定义消费速率,来调整源端的消费能力。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
如题,在工作中经常遇到flink任务各种异常,今天我列了下主要的异常,想请大佬们对不同异常的出现场景根据自身经验说下原因、场景、还有可能的优化解决方案。
(1) org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager
with id {...} is no longer reachable.
(2) org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
(3)
那你代码检查下有没有内存泄露呢。
杨扬 于2022年9月19日周一 11:21写道:
>
> 还有一个现象,观察到
> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。
>
>
>
>
> > 在 2022年9月15日,下午8:58,yidan zhao 写道:
> >
> > 本身低延迟一定程度上
嗯。去zookeeper中删除jobgraph和running job xx吧啦的几个节点。
Summer 于2022年9月16日周五 16:51写道:
> 开了,但是全被干挂了
> 回复的原邮件
> 发件人 yidan zhao
> 发送日期 2022年9月16日 16:05
> 收件人 Summer
> 抄送人 user-zh@flink.apache.org
>
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> HA模式开启了对嘛。
>
> Summer
HA模式开启了对嘛。
Summer 于2022年9月16日周五 15:48写道:
> 原因是找到了,${FLINK_HOME}/lib缺少了一个任务依赖Jar包,
> 那么如果我在不添加这个jar的情况下,由于Flink无法启动,怎么才能取消掉这个任务??
>
>
>
> 回复的原邮件 ----
> 发件人 yidan zhao
> 发送日期 2022年9月16日 14:51
> 收件人 Summer
> 抄送人 user-zh@flink.apache.org
>
> 主题 Re: 任务启
开启了HA是吧。
Summer 于2022年9月16日周五 14:32写道:
> standlone部署
>
>
>
>
>
>
>
>
> ---- 回复的原邮件
> 发件人 yidan zhao
> 发送日期 2022年9月16日 14:20
> 收件人 user-zh
> 主题 Re: 任务启动异常导致Flink服务挂掉,无法启动Flink服务
> 什么部署模式。
>
> Summer 于2022年9月16日周五 13:57写道:
>
>
>
什么部署模式。
Summer 于2022年9月16日周五 13:57写道:
>
>
> Flink版本:1.13.3
> 我有一个Flink Sql的任务,也生成了checkpoint,但是执行过程出现Execption,导致整个Flink JobManger无法启动。
> 我再重启Flink的时候,这个FlinkSql任务由于一直抛异常导致Flink进程启动不起来。
> 请问有什么办法取消这个任务。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。
yidan zhao 于2022年9月15日周四 20:57写道:
>
> 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
>
> 杨扬 于2022年9月15日周四 20:02写道:
> >
> > 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
> >
> >
> >
&g
资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
杨扬 于2022年9月15日周四 20:02写道:
>
> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
>
>
>
>
> > 在 2022年9月15日,下午7:27,yidan zhao 写道:
> >
> > busy那就提升并发度看看效果?
> >
> > 杨扬 mailto:yangya...@cupdata.com>
busy那就提升并发度看看效果?
杨扬 于2022年9月15日周四 14:51写道:
> 各位好!
> 目前有一flink作业,大致分为3个阶段:
> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)->
> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
>
> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
>
>
你发的代码就是原因,keySelector对于同一个输入数据,输出结果必须一致,不能不同。
如果需要实现类似效果,可以先使用 flatMap 生成 random key,然后将 key 存储到一个字段,比如就叫做 key,然后
keyBy("key") 这样可以。
junjie.m...@goupwith.com 于2022年9月9日周五 17:59写道:
>
>
> Integer[] rebalanceKeys = createRebalanceKeys(parallelism);
> int rebalanceKeyIndex = new
Hi, I want to know is there some way to avoid this problem now?
I can not guarantee jobmanager and taskmanager do not run in the same machine.
hello
yh z 于2022年9月2日周五 18:30写道:
>
> Hello
>
> yh z 于2022年9月2日周五 11:51写道:
>
> > hello flink
> >
Do you place the s3 jar to plugins/s3 dir?
Darius Žalandauskas 于2022年8月31日周三 03:11写道:
>
> Hello apache-flink team,
> For the last week I am really struggling with setting up EMR to store stream
> output to AWS S3.
> According to the documentation, if running flink with emr, no manual
>
如下代码片段:
watermarkStrategy = watermarkStrategy.withTimestampAssigner(
new SerializableTimestampAssigner>() {
@Override
public long extractTimestamp(KafkaMessageWrapper
element, long recordTimestamp) {
try {
return
题目即问题,主要是jira,打开个页面好几十秒。
貌似这个问题已经有jira了,https://issues.apache.org/jira/browse/FLINK-27341
这个会解决。看样子得等1.15.2或1.16了。
yidan zhao 于2022年8月26日周五 14:08写道:
>
> 目前我可运行方式是:
> bind-host不配置,默认就是0.0.0.0(注意flink-conf中默认配了localhost,需要注释掉),或者配置为0.0.0.0。
>
> JM和TM机器不重复,就是JM独立部署,这样ok。否则都会出问题。
>
> yidan zhao
目前我可运行方式是:
bind-host不配置,默认就是0.0.0.0(注意flink-conf中默认配了localhost,需要注释掉),或者配置为0.0.0.0。
JM和TM机器不重复,就是JM独立部署,这样ok。否则都会出问题。
yidan zhao 于2022年8月26日周五 14:01写道:
>
> 如题,这俩地址啥区别呢?
>
> 1.15.1版本:从测试效果来看:
> (1)Taskmanager实际绑定地址取决于 bind-host
> (2)taskmanager.host 貌似被用于 tm 的resource-id部分使用了。
如题,这俩地址啥区别呢?
1.15.1版本:从测试效果来看:
(1)Taskmanager实际绑定地址取决于 bind-host
(2)taskmanager.host 貌似被用于 tm 的resource-id部分使用了。
(3)假设我设置 host 为 localhost,bind-host为0.0.0.0。这导致我集群的web ui的taskmanager界面展示为:
localhost:33865-c8a37d
akka.tcp://flink@localhost:33865/user/rpc/taskmanager_0
localhost:43867-3afa06
知道问题了大概,远程debug下来,发现在获取地址时:
(1)JM leader 机器,先拿 127.0.0.1 连接 resource-manager,直接成功,返回了 127.0.0.1。
(2)非 JM leader 机器,先拿 127.0.0.1 连接 resource-manager 失败,然后走下一个 local
host 可以拿到正确 hostname。
今天远程调试了下,目前发现开启远程调试情况,启动后是ok的?
yidan zhao 于2022年8月26日周五 00:01写道:
>
> 这个问题有人知道吗,目前反复实验确定有问题。
>
> 经过多次测试,目前初步怀疑。 并不是单 JM 就会有问题。多JM也有问题。
>
> 出问题的是JM为leader的机器。 比如ABCD4台机器,如果A的JM是leader,那么A机器启动的TM就是127.0.0.1。
>
>
>
> yidan zhao 于2022年8月24日周三 10:30写道:
> >
hi。想继续问下。我目前看官方的 tag 1.15.1 显示不属于任何分支,所以最终1.15.1下载的发布包对应是不是1.15.1的tag呢?
包括后续1.15.2的修改是基于哪个分支 patch 上去的。
Lijie Wang 于2022年7月21日周四 14:01写道:
>
> Hi,
> 1.15.1 应该是对应 tag release-1.15.1
>
> yidan zhao 于2022年7月21日周四 12:53写道:
>
> > 我目前看了下,有一定规律但也还是不完全懂。
> > 比如我目前有部分公司内部用到
这个问题有人知道吗,目前反复实验确定有问题。
经过多次测试,目前初步怀疑。 并不是单 JM 就会有问题。多JM也有问题。
出问题的是JM为leader的机器。 比如ABCD4台机器,如果A的JM是leader,那么A机器启动的TM就是127.0.0.1。
yidan zhao 于2022年8月24日周三 10:30写道:
>
> masters:
> A:8682
> workers:
> A
> B
> C
>
> 都是内网hostname(相互都可解析),非127.0.0.1。
>
> flink版本:1.1
的内存算flink taskmanager 配置的内存,你应该可以用参数
> *'taskmanager.memory.task.off-heap.size*
> 来配置,可以参考这个问题:
> https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
>
>
>
> On Wed, 24 Aug 2022 at 1:05 PM, yidan zhao wrote
如题,pyflink场景的任务,内存是如何管理呢。
python部分的内存是否算入flink TaskManager配置的内存中呢?
比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢?
——
如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?
有多个内网 IP: A,B,C
>
> Best,
> Weihua
>
>
> On Tue, Aug 23, 2022 at 7:37 PM yidan zhao wrote:
>
> >
> > 如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> > Partition
> > c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964
1 大概率是source部分问题,或者 savepoint 的 trigger 层面。
2 也可以从 cancel 和 stop 的区别上考虑下?
3 补充信息:我的kafka source是用的旧版本(没办法用新版本,原因是由于一些原因我必须用 kafka 低版本 client)。
yidan zhao 于2022年8月23日周二 23:06写道:
>
> 看了下,报错很少。
> 反正 flink cancel -s 是可以的,flink stop 就不行。而且目测是瞬间失败。从web
> ui来看,整个savepoint的完成是0/841,应该是几乎没开始就
Best!
> Xuyang
>
>
>
>
>
> Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
> 在 2022-08-23 20:41:59,"yidan zhao" 写道:
> >补充部分信息:
> >看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
> >2022-08-23 20:33:22,307 INFO
> >org.apache.flink.r
ar:1.15.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
yidan zhao 于2022年8月23日周二 20:31写道:
>
> 如题,stop,停止并保存检查点失败。
> 测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
>
> stop则不行,报错主要是
> Could not stop with a savepoint job "1b87f308e2582f3c
如题,stop,停止并保存检查点失败。
测试看 cancel、cancel -s 方式都成功。 cancel -s 可成功生成检查点并退出。
stop则不行,报错主要是
Could not stop with a savepoint job "1b87f308e2582f3cc0e3ccc812471201"
...
Caused by: java.util.concurrent.ExecutionException:
java.util.concurrent.CompletionException:
如题,目前发现任务报错是:org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
Partition c74a0a104d81bf2d38f76f104d65a2ab#27@7e1a8495f062f8ceb964a3205e584613
not found
——
任务本身问题不大,也不是网络问题。 目前发现解决方法:
换成非单 JM 即可。
同时也发现一个可能原因,或另一个明显现象:
从web ui的Taskmanager界面可以发现,执行 start-cluster
1 需求是根据输入流,根据字段判定,拆分并输出为2个流。
2 目前看 pyflink 的 api,貌似不支持 sideoutput。
3 虽然可以基于输入流 A,连续处理2次,即输入流 A 流向算子 B 和算子 C,分别筛选自己需要的数据进行处理。但这样会导致数据重复传输。
我最近也在对比storm和flink。有没有大佬介绍下,storm这种ack模式的是不是恢复会更快点,目前我感觉storm的架构下,各个节点的fail
over更加独立感觉。
Flink 目前集群中任何一个机器失败都会导致整个任务重启,耗时会长点。
但是从全局资源来说,ckpt的资源占用貌似又比ack模式少。
不知道理解对不对。
tison 于2022年7月30日周六 14:28写道:
>
> 可以看下这两份材料
>
> *
>
我是本地直接ide内run。
Weihua Hu 于2022年7月27日周三 22:10写道:
>
> Hi, 你是怎么提交的任务呢?是提交到远端的 session cluster 上吗?有其他的相关日志吗?
>
> Best,
> Weihua
>
>
> On Wed, Jul 27, 2022 at 5:36 PM yidan zhao wrote:
>
> > 而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
> >
使用一个算子实现个轻量级的http服务,然后把收到的数据通过广播流方式供给其他流使用。
Jeff 于2022年7月28日周四 10:00写道:
>
> 是想把动态参数传给正在执行的算子?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-07-28 08:16:40,"张锴" 写道:
> >flink版本1.13.2
> >想通过http请求的方式将参数传给flink,这个怎么实现?
而且pyflink既然打包了flink的完整包,那么真正部署运行的时候是用这个呢?还是需要执行的机器上单独部署一个flink呢?
yidan zhao 于2022年7月27日周三 17:34写道:
>
> 我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
>
> yidan zhao 于2022年7月27日周三 10:40写道:
> >
> > pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> > 但 f
我将这3个jar放到pyflink的lib下则是可以的。通过 add_jar 方式给出是不可以的。有人知道原因吗。
yidan zhao 于2022年7月27日周三 10:40写道:
>
> pyflink情况 flink-sql-connector-kafka-1.15.0.jar 可以。
> 但 flink-connector-base-1.15.0.jar + flink-connector-kafka-1.15.0.jar +
> kafka-clients-2.8.1.jar 却报:
> py4j.pro
基于python实现一个flatMap,针对每个元素跑模型预测。主体则使用java,有什么实现方法吗?
yidan zhao 于2022年7月13日周三 21:54写道:
>
> 谢谢回答。
>
> Xingbo Huang 于2022年7月13日周三 16:55写道:
> >
> > Hi,
> >
> > 简单来说,如果你的作业逻辑中只使用了纯java的算子,比如你写的是一个没有使用 Python udf 的sql/table api
> > 作业时,那么运行时就不需要对Python有需求,但
:
>
> 最终会放到 pipeline.jars 配置中,在提交作业时上传到 blobServer
>
> Best,
> Weihua
>
>
> On Tue, Jul 26, 2022 at 5:40 PM yidan zhao wrote:
>
> > 如题,我看注释和文档。
> > add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
> >
如题,我看注释和文档。
add_jars 是添加要upload到cluster的jar,那么上传到什么路径呢?
~
我目前看了下,有一定规律但也还是不完全懂。
比如我目前有部分公司内部用到的,希望基于1.15.1的release上加的话,我需要基于哪个分支?还是tag做更改呢?
哪个branch、or tag是对应官方download页面提供的下载链接的包中一模一样的源码呢,就是不包含新增开发但未发布代码的版本。
如图,1.15.1启动后,提交examples中任务,web ui 查看checkpoint,不显示内容。
console报错:
main.a7e97c2f60a2616e.js:1 ERROR TypeError: Cannot read properties of
null (reading 'checkpointed_size')
at q (253.e9e8f2b56b4981f5.js:1:607974)
at Sl (main.a7e97c2f60a2616e.js:1:186068)
at Br
Hi all, Does 'standalone mode support in the kubernetes operator'
means: Using flink-k8s-operator to manage jobs deployed in a
standalone cluster?
What is the advantag doing so.
Yang Wang 于2022年7月14日周四 10:55写道:
>
> I think the standalone mode support is expected to be done in the version
>
r
>
> Best,
> Yang
>
> yidan zhao 于2022年7月12日周二 13:17写道:
>
> > 我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
> > 那么使用 --target kubernetes-session
> > -Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
> > cluster 的 svc 的 clusterIp 去提交呢。
> >
> &g
们知道python函数的性能是不如Java函数的。关于框架层的开销,我之前有写了专门的文章[3]分析过。
>
> 希望对你有所帮助。
>
> Best,
> Xingbo
>
> [1] https://github.com/alibaba/pemja
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/python_execution_mode/
> [3] https://flink.apache.org/2022/05
目前看了下pyflink,想了解下,pyflink的任务实际运行时也是JAVA+python双环境吗。
涉及java和python交互等是吗。性能相比java直接开发的任务会有区别吗?
yidan zhao 于2022年7月12日周二 19:27写道:
>
> 公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
>
> 初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
> 实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实
公司有部分项目是基于 storm 开发的,目前想进行改造,计划用 flink。
初步看了下代码,发现 storm 中实现都是通过 multi-lang 方式各种调用 shell、python
实现。这些shell和python主要通过storm提供的一个 storm.py 基础包实现和 父进程
的通信(基于stdin和stdout貌似)。
想问问,这种如何改造呢?
首先是大方向上,(1)连同python、shell部分一起改造。(2)保留python、shell部分,基于flink实现一套类似机制。
(1)和(2)目前看起来都会很复杂。
有没有小伙伴做过类似事情呢?
我用 flink run -m 方式指定 clusterIp 是可以提交任务的。
那么使用 --target kubernetes-session
-Dkubernetes.cluster-id=my-first-flink-cluster 的方式,为什么不能智能点拿到对应
cluster 的 svc 的 clusterIp 去提交呢。
yidan zhao 于2022年7月12日周二 12:50写道:
>
> 如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
>
>
> 其次,NodePort我大概理解,一直不是很懂
如果是在 k8s-master-node 上,可不可以直接用 ClusterIp 呢?
其次,NodePort我大概理解,一直不是很懂 LoadBalancer 方式是什么原理。
yidan zhao 于2022年7月12日周二 12:48写道:
>
> 我理解的 k8s 集群内是组成 k8s 的机器,是必须在 pod 内?我在k8s的node上也不可以是吧。
>
> Yang Wang 于2022年7月12日周二 12:07写道:
> >
> > 日志里面已经说明的比较清楚了,如果用的是ClusterIP的方式,那你的Flink
10:23:23,021 WARN
> org.apache.flink.kubernetes.KubernetesClusterDescriptor [] -
> Please note that Flink client operations(e.g. cancel, list, stop,
> savepoint, etc.) won't work from outside the Kubernetes cluster since
> 'kubernetes.rest-service.exposed.type' has been set to ClusterIP.
>
>
> Best,
> Yang
>
1 - 100 of 361 matches
Mail list logo