关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?

目前我实现一个sink,带超时希望用到timerservice。但是不支持。
同时不希望使用keyedStream,因为会导致数据不均衡。

除了引入随机key外还有什么方法吗。


Re: 关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都 于2021年2月9日周二 下午3:04写道:

> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
>
> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
> 同时不希望使用keyedStream,因为会导致数据不均衡。
>
> 除了引入随机key外还有什么方法吗。
>


Re: 关于非keyedstream使用定时器问题

2021-02-08 Thread yidan zhao
当然,如果是 randomeKey %30 这样,虽然最终效果差不多,但却导致30个sink batch可能都集中到某几个并发实例上。

yidan zhao  于2021年2月9日周二 下午3:22写道:

> 引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
> 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
> 实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
>
> 我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都
> yidan zhao  于2021年2月9日周二 下午3:04写道:
>
>> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
>>
>> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
>> 同时不希望使用keyedStream,因为会导致数据不均衡。
>>
>> 除了引入随机key外还有什么方法吗。
>>
>


Re: 关于非keyedstream使用定时器问题

2021-02-17 Thread yidan zhao
不行,那不就导致不均衡了。数据得均衡。
1均衡 2 batch 3 timeout
目前这3者依靠flink现有机制比较难实现,当然并不是所以场景都需要这样,比如mysql还需要考虑死锁问题,但对于部分不需要考虑锁的sink,其实不在意相同key是否分发到一起,所以不需要依靠keyedStream,这样能保证1,2,但无法保证3。
使用keyedStream使用随机key(很随机),会保证1,3,但无法有2(因为key太随机,每个key下数据太少)。
使用keyedStream使用随机key %
N,则会保证3,1和2则是看具体数据分布、数据量,以及N的大小都可能有关系。部分场景下很难有合适的参数保证全部3者。

Kenyore Woo  于2021年2月15日周一 下午1:09写道:

> 如果使用一个常量字段作为key呢。是不是就可以实现你要的效果了
>
> yidan zhao  于2021年2月9日周二 下午3:24写道:
>
> > 当然,如果是 randomeKey %30 这样,虽然最终效果差不多,但却导致30个sink batch可能都集中到某几个并发实例上。
> >
> > yidan zhao  于2021年2月9日周二 下午3:22写道:
> >
> > > 引入随机key最大问题是,本身希望batch方式sink,key太随机压根无法batch。
> > >
> >
> 如果randomKey%1024这样分桶到1024的话,也不行,会导致每个bucket中数据量太少,进而基本都是timeout而sink,而不是达到batchSize,换言之,每次sink都会有1024个并发sink次数。
> > > 实际后端的存储可能不期望这么高并发,本身做batch的目的就是希望降低sink次数。
> > >
> > >
> >
> 我希望的是按照并发度(比如30并发度)就每次sink30次(或超过30次,因为可能>batchSize,如果都 > >
> > > yidan zhao  于2021年2月9日周二 下午3:04写道:
> > >
> > >> 如题,当前flink不支持非keyedStream使用定时器,不清楚有啥解决方法吗?
> > >>
> > >> 目前我实现一个sink,带超时希望用到timerservice。但是不支持。
> > >> 同时不希望使用keyedStream,因为会导致数据不均衡。
> > >>
> > >> 除了引入随机key外还有什么方法吗。
> > >>
> > >
> >
>


Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 Thread yidan zhao
只有最后一个keyBy有效。

Hongyuan Ma  于2021年2月21日周日 下午10:59写道:

> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
> 还是在前一次keyby的基础上生成m*n个窗口?
>
>
> 像下面这样写, 最后的窗口是只按area划分的吗?
> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
> stream.keyby("id")
> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
> .assignTime() // 修改轨迹eventTime为预测出的时间
> .keyby("area")
> .window() // 根据区域划分窗口
> .process() // 统计各个区域内的轨迹
>
>


Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-21 Thread yidan zhao
不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。

yidan zhao  于2021年2月22日周一 上午10:31写道:

> 只有最后一个keyBy有效。
>
> Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
>
>> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
>> 还是在前一次keyby的基础上生成m*n个窗口?
>>
>>
>> 像下面这样写, 最后的窗口是只按area划分的吗?
>> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
>> stream.keyby("id")
>> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
>> .assignTime() // 修改轨迹eventTime为预测出的时间
>> .keyby("area")
>> .window() // 根据区域划分窗口
>> .process() // 统计各个区域内的轨迹
>>
>>


Re: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-22 Thread yidan zhao
我突然感觉还是沟通问题。window的只有1个。因为你就写了一个window。
我指的是flatMap和window是分开的算子,不会是1个算子。

hdxg1101300123  于2021年2月22日周一 下午11:37写道:

>
> 为什么flatmap就是2个
>
>
> 发自vivo智能手机
> > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。
> >
> > yidan zhao  于2021年2月22日周一 上午10:31写道:
> >
> > > 只有最后一个keyBy有效。
> > >
> > > Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
> > >
> > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
> > >> 还是在前一次keyby的基础上生成m*n个窗口?
> > >>
> > >>
> > >> 像下面这样写, 最后的窗口是只按area划分的吗?
> > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
> > >> stream.keyby("id")
> > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
> > >> .assignTime() // 修改轨迹eventTime为预测出的时间
> > >> .keyby("area")
> > >> .window() // 根据区域划分窗口
> > >> .process() // 统计各个区域内的轨迹
> > >>
> > >>
>


Re: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-22 Thread yidan zhao
而如果是连续keyBy,比如.keyBy(xx).keyBy(yy).window()这样keyBy多少此也只最后一个有效,window当然还是只有1个。不会出现多个window的。


yidan zhao  于2021年2月23日周二 下午3:31写道:

> 我突然感觉还是沟通问题。window的只有1个。因为你就写了一个window。
> 我指的是flatMap和window是分开的算子,不会是1个算子。
>
> hdxg1101300123  于2021年2月22日周一 下午11:37写道:
>
>>
>> 为什么flatmap就是2个
>>
>>
>> 发自vivo智能手机
>> > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。
>> >
>> > yidan zhao  于2021年2月22日周一 上午10:31写道:
>> >
>> > > 只有最后一个keyBy有效。
>> > >
>> > > Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
>> > >
>> > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
>> > >> 还是在前一次keyby的基础上生成m*n个窗口?
>> > >>
>> > >>
>> > >> 像下面这样写, 最后的窗口是只按area划分的吗?
>> > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
>> > >> stream.keyby("id")
>> > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
>> > >> .assignTime() // 修改轨迹eventTime为预测出的时间
>> > >> .keyby("area")
>> > >> .window() // 根据区域划分窗口
>> > >> .process() // 统计各个区域内的轨迹
>> > >>
>> > >>
>>
>


Re: 关于1.12新增的initialize阶段时间较长问题

2021-02-23 Thread yidan zhao
这个问题有人清楚吗。今天又是重启,5min了,还是initializing阶段,client部分直接报异常退出(报的Caused by:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error.,  于2021年2月7日周日 下午4:00写道:

> 截图也没办法反应动态变化的过程。
>
> 目前是10机器的Standalone集群,状态在5G左右。通过flink-client端提交任务,然后web-ui刷新就一直转圈,过一会(几十秒大概)就OK啦,然后刚刚OK一瞬间会有很多个处于Initialize状态的任务,然后慢慢(10s内吧)没掉。
>
> flink-client端的话,有时候正常提交完成,有时候出现报错(类似说是重复任务的)。
>
>
> zilong xiao  于2021年2月7日周日 下午3:25写道:
>
>> 有截图吗?
>>
>> 赵一旦  于2021年2月7日周日 下午3:13写道:
>>
>> > 这个问题现在还有个现象,我提交任务,web
>> > UI就类似卡住状态,过一会刷新出来任务,会有4-5个initialize状态的任务,然后几秒之内陆续消失,剩下1个。
>> >
>> > 目前怀疑是有什么重试机制,导致重复提交N个任务,然后可能还有什么去重机制,然后其中几个陆续自动停掉?
>> >
>> > 赵一旦  于2021年1月26日周二 上午10:51写道:
>> >
>> > >
>> 如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。
>> > >
>> >
>>
>


关于standalone集群jobmanager在操作时候web-ui卡顿的问题

2021-02-24 Thread yidan zhao
如题,standalone集群,当有集群操作的时候容易卡顿。
集群操作指:提交任务、触发保存点并停止任务、主动触发保存点(不严重)等。
这些操作执行时候web-ui回出现卡顿转圈,大多数情况转圈10-30s内会结束恢复正常,偶尔情况下会出现jobmanager进程失败。

如上,一个是希望大佬们帮忙分析下原因?
目前Jobmanager和Taskmanager是相同机器部署,20台机器,20个Jm和Tm进程。
不清楚卡顿和JM的“内存”是否有关,还是主要CPU? 我JM目前内存10G+,TM内存70G+。
我当前计划想把JM数量降低,不搞20个,本身也用不着。想着单独出来JM部署,这样可以少部署,但提升JM的内存。当然不清楚内存影响大不大。如果是CPU影响大,可能还需要单独部署JM的机器不部署TM这样。


关于Flink五分钟统计导致五分钟的CPU尖刺问题

2021-02-24 Thread yidan zhao
如题,线上集群很多任务依赖五分钟粒度。导致CPU存在五分钟一波的尖刺,正常情况CPU利用为40%,尖刺时候达到90%+。
大家一般类似情况都怎么解决呢?


基于kafka中转数据流情况下,下游任务的watermark推进问题。

2021-02-26 Thread yidan zhao
如题,如果我任务本身是多个连续的window处理。
现在想拆分,基于kafka中转数据。但面临的第一个麻烦问题就是watermark的推进,当然简单实现也能满足功能,但是比如我窗口都是5min的,会导致下游窗口晚5min触发。比如window1
=> window2的场景下,使用maxOutOfOrderness为1min的时候,[0-5)
的数据在6min数据到的时候触发计算。如果拆分了,那么window2需要11min时候window1输出[5-10)的数据到达window2时候才会触发window2的[0,5)的计算。

方案1:一个是将time(window时间)放入key,然后下游使用session
window,正常肯定没问题,但是如果数据出现异常,比如上游某个五分钟数据分2批次到达下游可能会导致下游计算错误(规则类型window,业务不允许部分数据做计算)。
方案2:上游window想办法定期输出watermark到kafka,下游解析ts作为watermark?


Re: 基于kafka中转数据流情况下,下游任务的watermark推进问题。

2021-02-26 Thread yidan zhao
或者如果不行我就继续合并在一起了。
但是这样就需要解决一个其他问题。

问题描述
能否基于检查点/保存点重启的时候,唯独让KafkaSource不基于检查点和保存点中的offset继续消费,而是通过我指定的offset开始消费。
简而言之:我希望保留状态的同时,忽略部分数据。应用场景:数据延迟了,但我希望快速赶到最新数据去,但不希望直接不基于保存点重启任务,因为部分算子的状态比较重要,是天级别的状态,需要一整天保留。



yidan zhao  于2021年2月26日周五 下午5:48写道:

> 如题,如果我任务本身是多个连续的window处理。
> 现在想拆分,基于kafka中转数据。但面临的第一个麻烦问题就是watermark的推进,当然简单实现也能满足功能,但是比如我窗口都是5min的,会导致下游窗口晚5min触发。比如window1
> => window2的场景下,使用maxOutOfOrderness为1min的时候,[0-5)
> 的数据在6min数据到的时候触发计算。如果拆分了,那么window2需要11min时候window1输出[5-10)的数据到达window2时候才会触发window2的[0,5)的计算。
>
> 方案1:一个是将time(window时间)放入key,然后下游使用session
> window,正常肯定没问题,但是如果数据出现异常,比如上游某个五分钟数据分2批次到达下游可能会导致下游计算错误(规则类型window,业务不允许部分数据做计算)。
> 方案2:上游window想办法定期输出watermark到kafka,下游解析ts作为watermark?
>


关于rebalance和forward的性能

2021-03-01 Thread yidan zhao
如题,如果我确认某2个task我期望是2个task,即不会chain到一起。这种情况下使用foward和rebalannce有哪种固定会性能更好吗。哪种一般性能更好呢?


Re: 关于rebalance和forward的性能

2021-03-01 Thread yidan zhao
好的。谢谢。

Smile  于2021年3月2日周二 下午2:49写道:

> 你好,
>
> 不 chain 的话基本取决于上游是否有数据倾斜吧。
> 有数据倾斜的话 Rebalance 可以让下游算子均衡一点,性能会好一些;没有数据倾斜的话,Forward 是一对一发,Rebalance 是 n 对
> m 发,后者网络开销相对会大一些,Forward 的性能可能会稍微好一点点,但是我理解这个差异对作业性能的影响可能微乎其微。
>
> Smile
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:Flink checkpoint 速度慢问题请教

2021-03-01 Thread yidan zhao
我比较奇怪的是再慢的磁盘,对于几十KB的状态也不至于“慢”吧。

Jacob <17691150...@163.com> 于2021年3月2日周二 上午10:34写道:

> 谢谢回复
>
> 我用的是filesystem,
> 相关配置如下:
>
>
> state.backend: filesystem
> state.checkpoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
> state.savepoints.dir: hdfs://nameservice1/datafeed/prd/flink_checkpoint
> state.backend.incremental: false
> state.backend.fs.memory-threshold: 1024
> state.checkpoints.num-retained: 3
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 1000
> restart-strategy.fixed-delay.delay: 30 s
>
>
>
> 后面把上面配置注释掉,然后在代码中指定了checkpoint类型为内存,但速度还是很慢。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


理性规律异常,停止任务后,再提交会导致JobManager进程失败。

2021-03-02 Thread yidan zhao
如题,standalone,1.12。
目前感觉不像是停止任务或启动任务本身问题。看起来像是这俩操作导致JM的压力大什么的。然后报错异常如下:

2021-03-03 10:14:51,298 ERROR
org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread
'cluster-io-thread-3' produced an uncaught exception. Stopping the
process...

java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask@422cfccb rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@6709b3f5[Terminated, pool
size = 0, active threads = 0, queued tasks = 0, completed tasks = 2304]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
~[?:1.8.0_251]
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
~[?:1.8.0_251]
at
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
~[?:1.8.0_251]
at
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
~[?:1.8.0_251]
at
java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
~[?:1.8.0_251]
at
java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
~[?:1.8.0_251]
at
org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
~[flink-dist_2.11-1.12.0.jar:1.12.0]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_251]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]


Re: 理性规律异常,停止任务后,再提交会导致JobManager进程失败。

2021-03-02 Thread yidan zhao
这个问题和我之前反馈的另一个问题估计也有关系。实际表现还有个特点:
即提交任务后,任务会处于initialize阶段较长时间,并且WEB-UI开始卡顿转圈无法展示具体状态。然后过一会恢复(此期间某JM进程会失败自动重启(我们这边的脚本机制))。这是某种表现,还有一种是处于initialize阶段较长时间后,恢复之后会出现多个一模一样的处于innitialize阶段的任务(从web-ui界面看到),然后陆续减少到1个,最终只有1个成功运行处于running状态。

yidan zhao  于2021年3月3日周三 上午10:52写道:

> 如题,standalone,1.12。
> 目前感觉不像是停止任务或启动任务本身问题。看起来像是这俩操作导致JM的压力大什么的。然后报错异常如下:
>
> 2021-03-03 10:14:51,298 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL:
> Thread 'cluster-io-thread-3' produced an uncaught exception. Stopping the
> process...
>
> java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.ScheduledThreadPoolExecutor
> $ScheduledFutureTask@422cfccb rejected from
> java.util.concurrent.ScheduledThreadPoolExecutor@6709b3f5[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 2304]
> at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
> ~[?:1.8.0_251]
> at
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_251]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_251]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
>
>
>


Flink启动后某个TM的某个slot不工作,看起来像是直接没任何通信。

2021-03-02 Thread yidan zhao
如题,日志:
2021-03-03 11:03:17,151 WARN org.apache.flink.runtime.util.HadoopUtils [] -
Could not find Hadoop configuration via any of the supported methods (Flink
configuration, environment variables).

2021-03-03 11:03:17,344 WARN org.apache.hadoop.util.NativeCodeLoader [] -
Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable

2021-03-03 11:03:17,441 WARN org.apache.flink.runtime.util.HadoopUtils [] -
Could not find Hadoop configuration via any of the supported methods (Flink
configuration, environment variables).

2021-03-03 11:03:18,226 WARN
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
SASL configuration failed: javax.security.auth.login.LoginException: No
JAAS configuration section named 'Client' was found in specified JAAS
configuration file:
'/home/work/antibotFlink/flink-1.12.0/tmp/jaas-1092430908919603833.conf'.
Will continue connection to Zookeeper server without SASL authentication,
if Zookeeper server allows it.

2021-03-03 11:03:18,227 ERROR
org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] -
Authentication failed

2021-03-03 11:03:18,957 ERROR akka.remote.transport.netty.NettyTransport []
- failed to bind to /0.0.0.0:2027, shutting down Netty transport
2021-03-03 11:03:18,973 ERROR akka.remote.Remoting [] - Remoting system has
been terminated abrubtly. Attempting to shut down transports

如上,有个关于端口的报错,有人知道原因吗?
问题直接表现和影响是,我某个source的task无任何输出(此处的无输出包括任何数据,bytes
sent为0)。导致后续结点无watermark。进而反压永久=1(进而出现了一种之前就觉得很奇怪的场景:即反压到不工作,CPU都不再利用了。。。)。


Re: Flink 1.11.2运行一段时间后,会报ResourceManager leader changed to new address null的异常

2021-03-02 Thread yidan zhao
Mark下。这个问题我也遇到多次,看过一个xintognsongn的回复,由于网络、zk可用性等问题会导致。不够一般会自动恢复。

史 正超  于2020年12月7日周一 下午10:13写道:

> 8 个slot,8个并行度,jm是2G,tm配置的是8G,其它的任务配置是
> ```
> SET 'execution.checkpointing.interval' = '5min';
> SET 'execution.checkpointing.min-pause' = '10s';
> SET 'min.idle.state.retention.time' = '1d';
> SET 'max.idle.state.retention.time' = '25h';
> SET 'checkpoint.with.rocksdb' = 'true';
> set 'table.exec.mini-batch.enabled' = 'true';
> set 'table.exec.mini-batch.allow-latency' = '5s';
> set 'table.exec.mini-batch.size' = '5000';
>
> ```
>
> 2020-12-07 19:35:01
> org.apache.flink.util.FlinkException: ResourceManager leader changed to
> new address null
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>


Re: flink-savepoint问题

2021-03-03 Thread yidan zhao
是不是使用了随机key。

guaishushu1...@163.com  于2021年3月3日周三 下午6:53写道:

> checkpoint 可以成功保存,但是savepoint出现错误:
> java.lang.Exception: Could not materialize checkpoint 2404 for operator
> KeyedProcess (21/48).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1042)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
> ... 3 more
> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
> at
> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:314)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:256)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:221)
> at
> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:174)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:447)
> ... 5 more
>
>
> guaishushu1...@163.com
>


Re: Flink 1.11.2运行一段时间后,会报ResourceManager leader changed to new address null的异常

2021-03-07 Thread yidan zhao
还有没有大佬解释下,我最近又遇到这个问题了,而且很频繁。任务启动1小时restored达到了8。

yidan zhao  于2021年3月3日周三 下午2:58写道:

> Mark下。这个问题我也遇到多次,看过一个xintognsongn的回复,由于网络、zk可用性等问题会导致。不够一般会自动恢复。
>
> 史 正超  于2020年12月7日周一 下午10:13写道:
>
>> 8 个slot,8个并行度,jm是2G,tm配置的是8G,其它的任务配置是
>> ```
>> SET 'execution.checkpointing.interval' = '5min';
>> SET 'execution.checkpointing.min-pause' = '10s';
>> SET 'min.idle.state.retention.time' = '1d';
>> SET 'max.idle.state.retention.time' = '25h';
>> SET 'checkpoint.with.rocksdb' = 'true';
>> set 'table.exec.mini-batch.enabled' = 'true';
>> set 'table.exec.mini-batch.allow-latency' = '5s';
>> set 'table.exec.mini-batch.size' = '5000';
>>
>> ```
>>
>> 2020-12-07 19:35:01
>> org.apache.flink.util.FlinkException: ResourceManager leader changed to
>> new address null
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>


频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-07 Thread yidan zhao
如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
leadership’ 错导致任务重启。

下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
2021-03-08 14:31:40
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
at org.apache.flink.runtime.io.network.netty.
CreditBasedPartitionRequestClientHandler.decodeMsg(
CreditBasedPartitionRequestClientHandler.java:294)
at org.apache.flink.runtime.io.network.netty.
CreditBasedPartitionRequestClientHandler.channelRead(
CreditBasedPartitionRequestClientHandler.java:183)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:357)
at org.apache.flink.runtime.io.network.netty.
NettyMessageClientDecoderDelegate.channelRead(
NettyMessageClientDecoderDelegate.java:115)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
.java:357)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
1410)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:379)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
AbstractEpollStreamChannel.java:792)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
.processReady(EpollEventLoop.java:475)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
.run(EpollEventLoop.java:378)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.io.network.partition.
ProducerFailedException: org.apache.flink.util.FlinkException: JobManager
responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
.enqueueAvailableReader(PartitionRequestQueue.java:108)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
.userEventTriggered(PartitionRequestQueue.java:170)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeUserEventTriggered(
AbstractChannelHandlerContext.java:346)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeUserEventTriggered(
AbstractChannelHandlerContext.java:332)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireUserEventTriggered(
AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.
ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter
.java:117)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.
ByteToMessageDecoder.userEventTriggered(ByteToMessageDecoder.java:365)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeUserEventTriggered(
AbstractChannelHandlerContext.java:346)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeUserEventTriggered(
AbstractChannelHandlerContext.java:332)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireUserEventTriggered(
AbstractChannelHandlerContext.java:324)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline$HeadContext.userEventTriggered(DefaultChannelPipeline
.java:1428)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeUserEventTriggered(
AbstractC

Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-08 Thread yidan zhao
好的,我会看下。
然后我今天发现我好多个集群GC collector不一样。
目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
"-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
threads,还有一种是Mark Sweep Compact GC。
大佬们,Flink是根据内存大小有什么动态调整吗。

不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。


杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:

> Hi,
>
>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>
> Best,
> jjiey
>
> > 2021年3月8日 14:37,yidan zhao  写道:
> >
> >
> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
> > leadership’ 错导致任务重启。
> >
> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
> > 2021-03-08 14:31:40
> > org.apache.flink.runtime.io
> .network.netty.exception.RemoteTransportException:
> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
> >at org.apache.flink.runtime.io.network.netty.
> > CreditBasedPartitionRequestClientHandler.decodeMsg(
> > CreditBasedPartitionRequestClientHandler.java:294)
> >at org.apache.flink.runtime.io.network.netty.
> > CreditBasedPartitionRequestClientHandler.channelRead(
> > CreditBasedPartitionRequestClientHandler.java:183)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:379)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:365)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> > .java:357)
> >at org.apache.flink.runtime.io.network.netty.
> > NettyMessageClientDecoderDelegate.channelRead(
> > NettyMessageClientDecoderDelegate.java:115)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:379)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:365)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> > .java:357)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >
> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
> > 1410)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:379)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeChannelRead(
> > AbstractChannelHandlerContext.java:365)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
> > AbstractEpollStreamChannel.java:792)
> >at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> > .processReady(EpollEventLoop.java:475)
> >at
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
> > .run(EpollEventLoop.java:378)
> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> >at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.runtime.io.network.partition.
> > ProducerFailedException: org.apache.flink.util.FlinkException: JobManager
> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> > .writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:221)
> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> > .enqueueAvailableReader(PartitionRequestQueue.java:108)
> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
> > .userEventTriggered(PartitionRequestQueue.java:170)
> >at org.apache.flink.shaded.netty4.io.netty.channel.
> > AbstractChannelHandlerContext.invokeUserEventTriggered(
> > AbstractChannelHandlerContext.java:346)
> >at org.apache.flink.shaded.nett

Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-08 Thread yidan zhao
而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?

或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。

yidan zhao  于2021年3月9日周二 下午2:56写道:

> 好的,我会看下。
> 然后我今天发现我好多个集群GC collector不一样。
> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
> threads,还有一种是Mark Sweep Compact GC。
> 大佬们,Flink是根据内存大小有什么动态调整吗。
>
>
> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>
>
> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>
>> Hi,
>>
>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>
>> Best,
>> jjiey
>>
>> > 2021年3月8日 14:37,yidan zhao  写道:
>> >
>> >
>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>> > leadership’ 错导致任务重启。
>> >
>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>> > 2021-03-08 14:31:40
>> > org.apache.flink.runtime.io
>> .network.netty.exception.RemoteTransportException:
>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>> >at org.apache.flink.runtime.io.network.netty.
>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>> > CreditBasedPartitionRequestClientHandler.java:294)
>> >at org.apache.flink.runtime.io.network.netty.
>> > CreditBasedPartitionRequestClientHandler.channelRead(
>> > CreditBasedPartitionRequestClientHandler.java:183)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:379)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:365)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> > .java:357)
>> >at org.apache.flink.runtime.io.network.netty.
>> > NettyMessageClientDecoderDelegate.channelRead(
>> > NettyMessageClientDecoderDelegate.java:115)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:379)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:365)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> > .java:357)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >
>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>> > 1410)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:379)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > AbstractChannelHandlerContext.invokeChannelRead(
>> > AbstractChannelHandlerContext.java:365)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> > DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>> >at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
>> > AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
>> > AbstractEpollStreamChannel.java:792)
>> >at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> > .processReady(EpollEventLoop.java:475)
>> >at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
>> > .run(EpollEventLoop.java:378)
>> >at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> > SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>> >at org.apache.flink.shaded.netty4.io.netty.util.internal.
>> > ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> >at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.flink.runtime.io.network.partition.
>> > ProducerFailedException: org.apache.flink.util.FlinkException:
>> JobManager
>> > responsible for eb5d2893c4c6f4034995b9c8e180f01e lost the leadership.
>> >at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>> > .writeAndFlushNextMessageIfPossible(Partition

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
然后目前通过Flink的web-ui看了下gc情况。
发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。

(1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
(2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
   我目前5个TM的集群,单TM100G内存,跑任务大概10w
qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。


Michael Ran  于2021年3月9日周二 下午4:27写道:

> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
> >
> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
> >
> >yidan zhao  于2021年3月9日周二 下午2:56写道:
> >
> >> 好的,我会看下。
> >> 然后我今天发现我好多个集群GC collector不一样。
> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
> >> threads,还有一种是Mark Sweep Compact GC。
> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
> >>
> >>
> >>
> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
> >>
> >>
> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
> >>
> >>> Hi,
> >>>
> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
> >>>
> >>> Best,
> >>> jjiey
> >>>
> >>> > 2021年3月8日 14:37,yidan zhao  写道:
> >>> >
> >>> >
> >>>
> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
> >>> > leadership’ 错导致任务重启。
> >>> >
> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
> >>> > 2021-03-08 14:31:40
> >>> > org.apache.flink.runtime.io
> >>> .network.netty.exception.RemoteTransportException:
> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
> >>> > CreditBasedPartitionRequestClientHandler.java:294)
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
> >>> > CreditBasedPartitionRequestClientHandler.java:183)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> >>> > .java:357)
> >>> >at org.apache.flink.runtime.io.network.netty.
> >>> > NettyMessageClientDecoderDelegate.channelRead(
> >>> > NettyMessageClientDecoderDelegate.java:115)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
> >>> > .java:357)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> >>>
> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
> >>> > 1410)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:379)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> > AbstractChannelHandlerContext.invokeChannelRead(
> >>> > AbstractChannelHandlerContext.java:365)
> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
> >>> >
> DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。

yidan zhao  于2021年3月9日周二 下午7:26写道:

> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
> 然后目前通过Flink的web-ui看了下gc情况。
> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>
> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>我目前5个TM的集群,单TM100G内存,跑任务大概10w
> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>
>
> Michael Ran  于2021年3月9日周二 下午4:27写道:
>
>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>> >
>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>> >
>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>> >
>> >> 好的,我会看下。
>> >> 然后我今天发现我好多个集群GC collector不一样。
>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>> >> threads,还有一种是Mark Sweep Compact GC。
>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>> >>
>> >>
>> >>
>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>> >>
>> >>
>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>> >>
>> >>> Hi,
>> >>>
>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>> >>>
>> >>> Best,
>> >>> jjiey
>> >>>
>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>> >>> >
>> >>> >
>> >>>
>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>> >>> > leadership’ 错导致任务重启。
>> >>> >
>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>> >>> > 2021-03-08 14:31:40
>> >>> > org.apache.flink.runtime.io
>> >>> .network.netty.exception.RemoteTransportException:
>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> >>> > .java:357)
>> >>> >at org.apache.flink.runtime.io.network.netty.
>> >>> > NettyMessageClientDecoderDelegate.channelRead(
>> >>> > NettyMessageClientDecoderDelegate.java:115)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:379)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>> >>> > AbstractChannelHandlerContext.java:365)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>> >>> > .java:357)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> >
>> >>>
>> DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:
>> >>> > 1410)
>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>> >>> > AbstractChannelHandlerContext.in

Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
今天对比了下G1和copy-MarkSweepCompact的效果。
运行相同时间, 相同任务。 G1的GC时长更长,但是次数更多,因为每次GC的时间更短。
1h15min时间,G1的gc 1100+次,平均每次1s左右。 后者gc 205次,平均每次1.9s左右。

yidan zhao  于2021年3月9日周二 下午7:30写道:

> 补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。
>
> yidan zhao  于2021年3月9日周二 下午7:26写道:
>
>> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
>> 然后目前通过Flink的web-ui看了下gc情况。
>> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>>
>> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
>> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>>我目前5个TM的集群,单TM100G内存,跑任务大概10w
>> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>>
>>
>> Michael Ran  于2021年3月9日周二 下午4:27写道:
>>
>>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>>> >
>>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>>> >
>>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>>> >
>>> >> 好的,我会看下。
>>> >> 然后我今天发现我好多个集群GC collector不一样。
>>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>>> >> threads,还有一种是Mark Sweep Compact GC。
>>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>> >>
>>> >>
>>> >>
>>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>> >>
>>> >>
>>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>> >>
>>> >>> Hi,
>>> >>>
>>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>> >>>
>>> >>> Best,
>>> >>> jjiey
>>> >>>
>>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>>> >>> >
>>> >>> >
>>> >>>
>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e lost
>>> >>> > leadership’ 错导致任务重启。
>>> >>> >
>>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>> >>> > 2021-03-08 14:31:40
>>> >>> > org.apache.flink.runtime.io
>>> >>> .network.netty.exception.RemoteTransportException:
>>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>> >>> > .java:357)
>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>> >>> > NettyMessageClientDecoderDelegate.channelRead(
>>> >>> > NettyMessageClientDecoderDelegate.java:115)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:379)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>> >>> > AbstractChannelHandlerContext.java:365)
>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>> >>> >
>>> >>>
>>&

如何自定义web-ui上对taskmanager的标识符

2021-03-09 Thread yidan zhao
如题,目前有几个地方的需求。
(1)taskmanagers页面的path、id是啥参数,是否仅展示,可随意自定义。
(2)任务点到task后展开的右侧页面中。taskmanagers子tab中的host、以及subtasks子tab中的host是否可以自定义。每次希望看比如subtask-n这个是哪台机器,默认看到host不足以定位机器(我们是容器,因为无法根据host直接登陆),单个host上可能虚拟出多个容器;如果这个host不作为绑定地址,仅作为展示的话是否有地方可以配置。


Re: 如何自定义web-ui上对taskmanager的标识符

2021-03-09 Thread yidan zhao
感觉可以在subtask和taskmanager的2个tab中新加上显示 id
的功能。如何taskmanager支持自定义自己的id(一个用户自定义的随意不重复的id)。

yidan zhao  于2021年3月10日周三 下午2:27写道:

> 如题,目前有几个地方的需求。
> (1)taskmanagers页面的path、id是啥参数,是否仅展示,可随意自定义。
>
> (2)任务点到task后展开的右侧页面中。taskmanagers子tab中的host、以及subtasks子tab中的host是否可以自定义。每次希望看比如subtask-n这个是哪台机器,默认看到host不足以定位机器(我们是容器,因为无法根据host直接登陆),单个host上可能虚拟出多个容器;如果这个host不作为绑定地址,仅作为展示的话是否有地方可以配置。
>
>
>


Re: Re: 频繁发生 'ResourceManager leader changed to new address null' 异常导致任务重启

2021-03-09 Thread yidan zhao
希望有大佬给下这些参数的区别。如果环境的网络不好,纠结要调整哪个参数?还是哪些参数。 我目前只提高了 ask.timeout 。
目前看配置,太多与timeout相关的参数了。

akka.ask.timeout
akka.lookup.timeout
akka.retry-gate-closed-for
akka.tcp.timeout
akka.startup-timeout

heartbeat.interval
heartbeat.timeout

high-availability.zookeeper.client.connection-timeout
high-availability.zookeeper.client.session-timeout


taskmanager.network.request-backoff.max

...

yidan zhao  于2021年3月10日周三 下午1:13写道:

> 今天对比了下G1和copy-MarkSweepCompact的效果。
> 运行相同时间, 相同任务。 G1的GC时长更长,但是次数更多,因为每次GC的时间更短。
> 1h15min时间,G1的gc 1100+次,平均每次1s左右。 后者gc 205次,平均每次1.9s左右。
>
> yidan zhao  于2021年3月9日周二 下午7:30写道:
>
>> 补充,还有就是GC收集器,是否无脑使用G1就可以呢?我之前一直是G1,只是最近修改了opts不小心换成其他了。本意不是为了换GC收集器的。
>>
>> yidan zhao  于2021年3月9日周二 下午7:26写道:
>>
>>> 观察了下。CPU什么的有尖刺,但是也算基本正常,因为我的任务就是5分钟一波。基本每5分钟都有个尖刺。
>>> 然后目前通过Flink的web-ui看了下gc情况。
>>> 发现部分集群的fgc的确有问题,fgc平均大概达到10-20s,当然只有平均值,不清楚是否有某些gc时间更长情况。总难题来说10-20s的确是比较长的,这个我之后会去看看改进下。
>>>
>>> (1)不过不清楚这个是否和这个问题直接相关,因为20s的卡顿是否足以引起该问题呢?
>>> (2)此外,大家推荐个内存设置,比如你们都多少TM,每个TM多少内存,跑的任务多大数据量大概。
>>>我目前5个TM的集群,单TM100G内存,跑任务大概10w
>>> qps的入口流量,但是很大部分呢会过滤掉,后续部分流量较少。此外,检查点大概达到3-4GB。
>>>
>>>
>>> Michael Ran  于2021年3月9日周二 下午4:27写道:
>>>
>>>> 看看当时的负载呢?有没有过高的情况,是什么原因。然后监控下网络和磁盘
>>>> 在 2021-03-09 14:57:43,"yidan zhao"  写道:
>>>> >而且大家推荐怎么设置呢,我可能默认就G1了。不清楚G1是否也需要精调。
>>>> >我目前设置的内存还是比较大的。(50G的,100G的TaskManager都有),这么大heap,是否需要特别设置啥呢?
>>>> >
>>>> >或者是否有必要拆小,比如设置10Gheap,然后把taskmanager数量提上去。
>>>> >
>>>> >yidan zhao  于2021年3月9日周二 下午2:56写道:
>>>> >
>>>> >> 好的,我会看下。
>>>> >> 然后我今天发现我好多个集群GC collector不一样。
>>>> >> 目前发现3种,默认的是G1。flink conf中配置了env.java.opts:
>>>> >> "-XX:-OmitStackTraceInFastThrow"的情况出现了2种,一种是Parallel GC with 83
>>>> >> threads,还有一种是Mark Sweep Compact GC。
>>>> >> 大佬们,Flink是根据内存大小有什么动态调整吗。
>>>> >>
>>>> >>
>>>> >>
>>>> 不使用G1我大概理解了,可能设置了java.opts这个是覆盖,不是追加。本身我只是希望设置下-XX:-OmitStackTraceInFastThrow而已。
>>>> >>
>>>> >>
>>>> >> 杨杰 <471419...@qq.com> 于2021年3月8日周一 下午3:09写道:
>>>> >>
>>>> >>> Hi,
>>>> >>>
>>>> >>>   可以排查下 GC 情况,频繁 FGC 也会导致这些情况。
>>>> >>>
>>>> >>> Best,
>>>> >>> jjiey
>>>> >>>
>>>> >>> > 2021年3月8日 14:37,yidan zhao  写道:
>>>> >>> >
>>>> >>> >
>>>> >>>
>>>> 如题,我有个任务频繁发生该异常然后重启。今天任务启动1h后,看了下WEB-UI的检查点也没,restored达到了8已经。然后Exception页面显示该错误,估计大多数都是因为该错误导致的restore。
>>>> >>> > 除此外,就是 ‘Job leader for job id eb5d2893c4c6f4034995b9c8e180f01e
>>>> lost
>>>> >>> > leadership’ 错导致任务重启。
>>>> >>> >
>>>> >>> > 下面给出刚刚的一个错误日志(环境flink1.12,standalone集群,5JM+5TM,JM和TM混部在相同机器):
>>>> >>> > 2021-03-08 14:31:40
>>>> >>> > org.apache.flink.runtime.io
>>>> >>> .network.netty.exception.RemoteTransportException:
>>>> >>> > Error at remote task manager '10.35.185.38/10.35.185.38:2016'.
>>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>>> >>> > CreditBasedPartitionRequestClientHandler.decodeMsg(
>>>> >>> > CreditBasedPartitionRequestClientHandler.java:294)
>>>> >>> >at org.apache.flink.runtime.io.network.netty.
>>>> >>> > CreditBasedPartitionRequestClientHandler.channelRead(
>>>> >>> > CreditBasedPartitionRequestClientHandler.java:183)
>>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>>> >>> > AbstractChannelHandlerContext.java:379)
>>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> >>> > AbstractChannelHandlerContext.invokeChannelRead(
>>>> >>> > AbstractChannelHandlerContext.java:365)
>>>> >>> >at org.apache.flink.shaded.netty4.io.netty.channel.
>>>> >>> >
>>>> >>>
>>>> AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext
>>>> >>> > .java:357)
>>>> >>> >at org.apache.flink.runtime.io.ne

Connection reset by peer

2021-03-15 Thread yidan zhao
任务异常自动重启,日志如下,伙伴们帮忙分析下问题。
2021-03-16 00:00:06
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection reset by peer (connection to '
10.35.100.171/10.35.100.171:2016')
at org.apache.flink.runtime.io.network.netty.
CreditBasedPartitionRequestClientHandler.exceptionCaught(
CreditBasedPartitionRequestClientHandler.java:173)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:281)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.fireExceptionCaught(
AbstractChannelHandlerContext.java:273)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline
.java:1377)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.
AbstractChannelHandlerContext.invokeExceptionCaught(
AbstractChannelHandlerContext.java:281)
at org.apache.flink.shaded.netty4.io.netty.channel.
DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(
AbstractEpollStreamChannel.java:728)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(
AbstractEpollStreamChannel.java:818)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.
AbstractEpollChannel$AbstractEpollUnsafe.epollRdHupReady(
AbstractEpollChannel.java:442)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
.processReady(EpollEventLoop.java:482)
at org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop
.run(EpollEventLoop.java:378)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at org.apache.flink.shaded.netty4.io.netty.util.internal.
ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.
Errors$NativeIoException: readAddress(..) failed: Connection reset by peer


Re: Flink 1.12.0 隔几个小时Checkpoint就会失败

2021-03-17 Thread yidan zhao
设置下检查点失败不影响任务呀,你这貌似还导致任务重启了?

Frost Wong  于2021年3月18日周四 上午10:38写道:

> Hi 大家好
>
> 我用的Flink on yarn模式运行的一个任务,每隔几个小时就会出现一次错误
>
> 2021-03-18 08:52:37,019 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 661818 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (562357 bytes in
> 4699 ms).
> 2021-03-18 08:52:37,637 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 661819 (type=CHECKPOINT) @ 1616028757520 for job
> 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> 2021-03-18 08:52:42,956 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 661819 for job 4fa72fc414f53e5ee062f9fbd5a2f4d5 (2233389 bytes
> in 4939 ms).
> 2021-03-18 08:52:43,528 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 661820 (type=CHECKPOINT) @ 1616028763457 for job
> 4fa72fc414f53e5ee062f9fbd5a2f4d5.
> 2021-03-18 09:12:43,528 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Checkpoint 661820 of job 4fa72fc414f53e5ee062f9fbd5a2f4d5 expired before
> completing.
> 2021-03-18 09:12:43,615 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to
> recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:90)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:65)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1760)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1733)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:93)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1870)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_231]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_231]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_231]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_231]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_231]
> 2021-03-18 09:12:43,618 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
> csmonitor_comment_strategy (4fa72fc414f53e5ee062f9fbd5a2f4d5) switched from
> state RUNNING to RESTARTING.
> 2021-03-18 09:12:43,619 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (43/256) (18dec1f23b95f741f5266594621971d5) switched from RUNNING to
> CANCELING.
> 2021-03-18 09:12:43,622 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (44/256) (3f2ec60b2f3042ceea6e1d660c78d3d7) switched from RUNNING to
> CANCELING.
> 2021-03-18 09:12:43,622 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Flat Map
> (45/256) (66d411c2266ab025b69196dfec30d888) switched from RUNNING to
> CANCELING.
> 然后就自己恢复了。用的是Unaligned
> Checkpoint,rocksdb存储后端,在这个错误前后也没有什么其他报错信息。从Checkpoint的metrics看,总是剩最后一个无法完成,调整过parallelism也无法解决问题。
>
> 谢谢大家!
>


Re: flink 1.11.2 使用rocksdb时出现org.apache.flink.util.SerializedThrowable错误

2021-03-21 Thread yidan zhao
分享下原因呗。

hdxg1101300...@163.com  于2021年3月21日周日 上午12:36写道:

> 知道原因了
>
>
>
> hdxg1101300...@163.com
>
> 发件人: hdxg1101300...@163.com
> 发送时间: 2021-03-20 22:07
> 收件人: user-zh
> 主题: flink 1.11.2 使用rocksdb时出现org.apache.flink.util.SerializedThrowable错误
> 你好:
> 最近升级flink版本从flink 1.10.2 升级到flink.1.11.2;主要是考虑日志太大查看不方便的原因;
> 代码没有变动只是从1.10.2.编译为1.11.2 ,集群客户端版本升级到1.11.2;任务提交到yarn 使用per job方式;
> 之前时一个taskmanager一个slot,现在使用一个taskmanager 2个slot;程序运行一段时间(1个小时左右)后就会出现
> Caused by: org.apache.flink.util.SerializedThrowable
>
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> complete snapshot 53 for operator Sink: 发送短信 (5/8). Failure reason:
> Checkpoint was declined.
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
> Caused by: org.apache.flink.util.SerializedThrowable
> at com.com.functions.Transaction.firstPhase(Transaction.java:193)
> ~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
> at com.com.functions.TransactionData.flush(TransactionData.java:37)
> ~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
> at com.com.utils.TwoPhaseHttpSink.preCommit(TwoPhaseHttpSink.java:105)
> ~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
> at com.com.utils.TwoPhaseHttpSink.preCommit(TwoPhaseHttpSink.java:39)
> ~[dc_cbssbroadband-1.0.4.3.1-jar-with-dependencies.jar:?]
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:321)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> 

Re: flink-提交jar 隔断时间自己重启问题

2021-03-30 Thread yidan zhao
没看懂问题。任务自动重启?失败了自然就重启了,restart策略设置的吧。

valve <903689...@qq.com> 于2021年3月31日周三 上午11:31写道:

> 我也遇到这个问题 不知道为啥
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink启动后某个TM的某个slot不工作,看起来像是直接没任何通信。

2021-04-05 Thread yidan zhao
这个问题出现很多次了。目前还有一种case,不是启动的时候。
如果是启动的时候,则表现为watermark显示为没有,即无任何watermark。
另一种case是启动后正常运行,若干时间(可能几小时,也可能很多天)后,突然开始watermark无限停滞。导致无限反压。

yidan zhao  于2021年3月3日周三 下午2:47写道:

> 如题,日志:
> 2021-03-03 11:03:17,151 WARN org.apache.flink.runtime.util.HadoopUtils []
> - Could not find Hadoop configuration via any of the supported methods
> (Flink configuration, environment variables).
>
> 2021-03-03 11:03:17,344 WARN org.apache.hadoop.util.NativeCodeLoader [] -
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable
>
> 2021-03-03 11:03:17,441 WARN org.apache.flink.runtime.util.HadoopUtils []
> - Could not find Hadoop configuration via any of the supported methods
> (Flink configuration, environment variables).
>
> 2021-03-03 11:03:18,226 WARN
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] -
> SASL configuration failed: javax.security.auth.login.LoginException: No
> JAAS configuration section named 'Client' was found in specified JAAS
> configuration file:
> '/home/work/antibotFlink/flink-1.12.0/tmp/jaas-1092430908919603833.conf'.
> Will continue connection to Zookeeper server without SASL authentication,
> if Zookeeper server allows it.
>
> 2021-03-03 11:03:18,227 ERROR
> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] -
> Authentication failed
>
> 2021-03-03 11:03:18,957 ERROR akka.remote.transport.netty.NettyTransport
> [] - failed to bind to /0.0.0.0:2027, shutting down Netty transport
> 2021-03-03 11:03:18,973 ERROR akka.remote.Remoting [] - Remoting system
> has been terminated abrubtly. Attempting to shut down transports
>
> 如上,有个关于端口的报错,有人知道原因吗?
> 问题直接表现和影响是,我某个source的task无任何输出(此处的无输出包括任何数据,bytes
> sent为0)。导致后续结点无watermark。进而反压永久=1(进而出现了一种之前就觉得很奇怪的场景:即反压到不工作,CPU都不再利用了。。。)。
>


Re: Flink1.2对 key 进行分区,和 hash 分区有什么区别?

2021-04-06 Thread yidan zhao
首先,本身就是对key做的hash哈。只不过不是直接分配到并行的subtask,而是先分到maxParallelism,然后再分到subtask。加了一层主要是方便状态scala。

刘文  于2021年4月6日周二 上午9:33写道:

>
>
> Flink1.2对 key 进行分区,和 hash 分区有什么区别?
> 如: 分区数值 = key 的 hash值 % 并行度?
>
> 为什么不直接使用 hash 进行分区?
>
> KeyGroupStreamPartitioner.java
>
> @Override
> public int selectChannel(SerializationDelegate> record) {
>K key;
>try {
>   key = keySelector.getKey(record.getInstance().getValue());
>} catch (Exception e) {
>   throw new RuntimeException("Could not extract key from " + 
> record.getInstance().getValue(), e);
>}
>return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, 
> maxParallelism, numberOfChannels);
> }
>
>
>
>
>


Flink任务重启导致集群无限重启的问题

2021-04-07 Thread yidan zhao
如题,这个问题之前遇到过,昨天晚上又来一次。
首先我是flink1.12的standalone集群,通过公司的pass平台保证单机上的jm和tm失败后自动重启。
问题表现是,我停止任务,然后基于保存点重启任务。然后WEB-UI出现卡顿转圈,过一会报警显示某个容器的JM/TM失败自动重启。此期间WEB-UI转圈,我一直刷新,会出现突然一瞬间显示(应该是自动pass重启了),然后再刷新就继续转圈了(是某容器又失败,目前来看不是同一台容器一直失败,而是每个容器都轮着来失败)。

现象如上,说下我的分析。基本上大概可以知道原因是某容器失败,然后集群自动重选leader,然后自动《恢复原先任务》,这个步骤可能就会导致JM或TM失败,具体JM还是TM我不好确定了。因此会无限循环(恢复任务导致失败,然后pass重启,然后重复)。

至于为什么恢复任务会导致失败,这和之前我提到过的另一个问题也可能有关。即恢复任务的说话,web
ui卡顿,估计是JM瞬间压力高。但这种情况并不总是导致失败,我也遇到过卡顿后成功的(以前经历)。但有个现象是:我的任务,比如job1,会存在多个job1在web-ui界面出现,都处于initialize阶段。按照原先经历,过几十秒后其中一个会变为running状态继续正常
,其他任务会消失掉(不清楚原因,可能是client提交端重复提交?然后其中几个任务因为重复自动消失;也可能是否可能会出现瞬间的网络分区?比如5台机器各自leader,都恢复了任务?都是瞎猜哈)。


希望懂flink JM/TM通信以及底层原理的大佬们分析下。


Re: 回复: 回复:关于watermark和window

2021-05-18 Thread yidan zhao
没什么传不传的。processWatermark中应该就会直接将2窗口都处理掉了。

曲洋  于2021年5月10日周一 上午10:31写道:

>
>
>
> 对的,就是两个窗口同时存在,(3,1,2)(6,5,4)这就是两个窗口,然后watermark(7)来了,但是我不知道这个watermark是同时触发
>
> 两个窗口都计算,还是触发第一个,因为watermark也是一种特殊的数据,我看源码也没有找到当一个operater进行processWatermark之后
> 会不会把这个watermark进一步传递给并行的同时存在的窗口,也来触发它。
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-05-08 19:12:59,"明启 孙" <374060...@qq.com> 写道:
> >这个并行的窗口怎么理解,我理解的应该是两个窗口同时存在,窗口触发计算就是水印时间大于窗口endtime,那也就是两个窗口都会触发计算
> >
> >smq
> >
> >发件人: 曲洋
> >发送时间: 2021年5月8日 16:46
> >收件人: user-zh@flink.apache.org
> >主题: Re:回复:关于watermark和window
> >
> >
> >
> >
> >
> >
> >
> >现在是这两点满足的,因为乱序原因存在两个窗口,watermark时间同时大于两个窗口的end_time
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-05-08 16:40:00,"飞翔"  写道:
> >>window促发的条件:
> >>1、watermark时间 >= window_end_time
> 2、在[window_start_time,window_end_time)中有数据存在
> >>
> >>
> >>-- 原始邮件 --
> >>发件人:
> "user-zh"
>   <
> quyanghao...@126.com>;
> >>发送时间: 2021年5月8日(星期六) 下午3:16
> >>收件人: "user-zh" >>
> >>主题: Re:回复:关于watermark和window
> >>
> >>非常感谢你的回答,那是不是也就是说如果窗口过小,periodic watermark
> >>产生周期也比较长的话,会出现大量窗口延迟?
> >>
> >>在 2021-05-08 15:09:06,"Anlen" <2968969...@qq.com> 写道:
> >>>这两个窗口会按照window 结束时间从小到大触发窗口计算
> >>>
> >>>
> >>>
> >>>---原始邮件---
> >>>发件人: "曲洋" >>>发送时间: 2021年5月8日(周六) 下午2:10
> >>>收件人: "user-zh" >>>主题: 关于watermark和window
> >>>
> >>>
> >>>各位好,
> >>>我最近遇到了一个场景,就是当我使用window和periodic watermark的时候。
> >>>如果我的window时间设置的比较短,比如3秒,然后数据流假设为(6,5,3,1,2,4)。
> >>>此时watermark还没有产生,那么应该会有两个并行的window被创建(3,1,2)(6,5,4)。
> >>>这时watermark产生了,时间戳是7,同时超过了这两个窗口的边界.
> >>>那么此时会发生什么,两个窗口同时触发计算,或者第一窗口触发计算?
> >
>


Re: 如何统计n小时内,flink成功从kafka消费的数据量?

2021-05-18 Thread yidan zhao
步长多少?随时随刻的最近24小时?

zzzyw  于2021年5月18日周二 下午9:33写道:

> Hi 各位,
>   我需要统计出flink最近 n小时(例如24小时?) 成功从kafka中消费的数据量,有什么比较好的方案吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink/FlinkSQL如何开启检查点,但自动重启不基于检查点。

2021-05-23 Thread yidan zhao
如题,Flink/FlinkSQL如何开启检查点,但自动重启不基于检查点,或者基于检查点重启但忽略kafkaSource的状态。
目前Flink部分我自己覆盖了部分实现,可以实现基于检查点重启但忽略KafkaSource的offset状态。
现在是FlinkSQL部分,我目前都是设置很大的重启次数,但是自动重启后经常还是慢等导致继续ckpt失败,这个是恶性循环的。所以我目前希望是自动重启后忽略堆积的数据。
有个方法是不开启检查点,并设置自动重启。可以实现效果,不过还有个麻烦点是:如果不开启检查点,目前就没办法从 flink 的 web-ui
上看出任务是否经历过重启,比如ckpt_restored这个指标。我之前都是基于这个指标知道任务已经重启多少次的,我虽然希望任务能自动重启,并忽略堆积的数据,但偶尔人工看的时候还是需要知道任务是什么运行情况的。


关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 Thread yidan zhao
如题,按照官方文档的kafka source部分,有如下配置说明。

scan.startup.mode : optionalgroup-offsetsStringStartup mode for Kafka
consumer, valid values are 'earliest-offset', 'latest-offset',
'group-offsets', 'timestamp' and 'specific-offsets'. See the following
Start Reading Position for more details.

其中Reading Positions部分说明如下:

The config option scan.startup.mode specifies the startup mode for
Kafka consumer. The valid enumerations are:

`group-offsets`: start from committed offsets in ZK / Kafka brokers of
a specific consumer group.
`earliest-offset`: start from the earliest offset possible.
`latest-offset`: start from the latest offset.
`timestamp`: start from user-supplied timestamp for each partition.
`specific-offsets`: start from user-supplied specific offsets for each
partition.

可见,latest-offset和group-offsets是2个配置,所以我配置latest-offset肯定是从最新部分开始消费的,而不管使用的说明group
id,以及这个group id已提交的offset,这个估计没问题。

然后我想知道的是:带有latest-offset这个配置的情况下,sql任务自动重启基于检查点的情况呢?是否从最新消费,还是基于检查点的offset消费。

对于flink stream中实现,我知道是从检查点offset的,为此我还覆盖过实现。  现在想知道下sql部分实现是否也类似,不想去查sql部分,不熟悉。


Re: Flink/FlinkSQL如何开启检查点,但自动重启不基于检查点。

2021-06-03 Thread yidan zhao
这个问题拾起来,还有人回答下吗。

yidan zhao  于2021年5月24日周一 上午10:25写道:
>
> 如题,Flink/FlinkSQL如何开启检查点,但自动重启不基于检查点,或者基于检查点重启但忽略kafkaSource的状态。
> 目前Flink部分我自己覆盖了部分实现,可以实现基于检查点重启但忽略KafkaSource的offset状态。
> 现在是FlinkSQL部分,我目前都是设置很大的重启次数,但是自动重启后经常还是慢等导致继续ckpt失败,这个是恶性循环的。所以我目前希望是自动重启后忽略堆积的数据。
> 有个方法是不开启检查点,并设置自动重启。可以实现效果,不过还有个麻烦点是:如果不开启检查点,目前就没办法从 flink 的 web-ui 
> 上看出任务是否经历过重启,比如ckpt_restored这个指标。我之前都是基于这个指标知道任务已经重启多少次的,我虽然希望任务能自动重启,并忽略堆积的数据,但偶尔人工看的时候还是需要知道任务是什么运行情况的。


Re: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-06-03 Thread yidan zhao
这个问题比较复杂,具体最后糊里糊涂的半解决了。大概就是考虑用hashMap,以及最好不要继承,通过组合方式用。比如hashMap作为内层成员,最外边一层不要做成Map。这样可能会解决一定问题。

Lin Hou  于2021年4月1日周四 下午1:55写道:
>
> 你好,请问一下,这个问题是怎么解决的啊?
>
> 赵一旦  于2021年2月3日周三 下午1:59写道:
>
> > 我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。
> >
> > ℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:
> >
> > > 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
> > >
> > >
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: "赵一旦" > > 发送时间: 2021年2月3日(星期三) 中午1:24
> > > 收件人: "user-zh" > > 主题: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?
> > >
> > >
> > >
> > > 如题,按照flink对POJO的定义,感觉还是比较严格的。
> > >
> > >
> > 我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。
> >


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 Thread yidan zhao
那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web
UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。

JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道:
>
> hi
>
> sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 Thread yidan zhao
本质需求是我一个转发任务,本身检查点失败以及任务失败一般都是压力高,所以我希望重启是忽略堆积的数据从最新数据开始消费。我希望任务失败了就自动重启从最新开始继续转发。

yidan zhao  于2021年6月4日周五 上午11:51写道:
>
> 那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web
> UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。
>
> JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道:
> >
> > hi
> >
> > sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 Thread yidan zhao
你任务A中的redis和hbase是异步还是同步访问,同步是肯定不行的。ckpt小是因为没啥状态,自然就小,时间长可能是数据对齐时间长,你估计用的是对齐检查点是吧?
 如果换成非对齐检查点,时间应该能降下来,但是状态会变得很大,你可以试试。
最佳做法是,改造成异步的,不能同步。

JasonLee <17610775...@163.com> 于2021年6月4日周五 上午10:57写道:
>
> hi
>
> source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
> 空跑,浪费资源,你只需要把 map 的并行度调大即可.
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
我懂你意思,每个输入数据,经过redis、hbase等访问,以及相关调整(比如字段设置等),然后这个记录需要继续作为此算子的输出是吧。

我表达的是指你需要用异步访问redis、hbase方式,这个配合flink的异步算子去实现。所以你说的那个需求基于异步的是可以满足的。

Jacob <17691150...@163.com> 于2021年6月4日周五 下午3:21写道:
>
> @nobleyd 谢谢回复
>
> 你任务A中的redis和hbase是异步还是同步访问,--- 同步
>
> 你估计用的是对齐检查点是吧? ---是的
>
>
> 同步访问,是因为我们要及时生成新数据,换做异步就无法即时拿到最新的结果数据了
>
> 检查点我刚调整为非对齐方式了,从做完的十个checkpoint来看,state大小确实增加了,但速度尚未变快
>
>
> 消息量确实比较大,处理逻辑也较为复杂,处理逻辑算子的并行度我给了100,source并行度等于topic分区数
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-04 Thread yidan zhao
官方就有文档。其实本质就是一个异步操作假设1ms,那么同步操作的1s也就能1000个操作,qps太低了。异步的话可以大大提高qps。

Jacob <17691150...@163.com> 于2021年6月4日周五 下午5:58写道:
>
> 嗯嗯 你的描述是对的,job的执行过程大致就是如此
>
>
> 我明白你意思了
>
> 谢谢你提供的思路,我需要学习一下这个异步算子,之前从未接触过,不太清楚这具体是一个怎样的流程,请问你那边有相关的demo吗,或者该去具体去看哪部分的内容?
>
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-06 Thread yidan zhao
可以的,本身异步操作的本质就是线程池。 至于是你自己提供线程池,去执行某个同步操作。还是直接使用client/sdk等封装的异步方法内部默认的线程池这个无所谓。

Jacob <17691150...@163.com> 于2021年6月5日周六 下午1:15写道:
>
> thanks,
>
> 我查看了相关文档[1] 由于redis以及hbase的交互地方比较多,比较零散,不光是查询,还有回写redis
>
> 我打算把之前map算子的整段逻辑以线程池的形式丢在asyncInvoke()方法内部,不知道合适与否,这样数据的顺序性就无法得到保障了吧?
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/datastream/operators/asyncio/
> 
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


flink web ui 卡顿不出结果

2021-06-07 Thread yidan zhao
如题,flink web ui经常卡顿,有时候是慢,有时候一直出不来。
今天具体观察了某一次卡顿的case,发现至少有几次是 /config 接口一直 pengding 状态。请问有人清楚 /config
后端实现是什么情况吗?瓶颈在哪呢?


Re: Flink checkpoint 速度很慢 问题排查

2021-06-07 Thread yidan zhao
不是的哈,那个方法本身还是同步调用的。就是需要你自己保证逻辑的异步。

Jacob <17691150...@163.com> 于2021年6月8日周二 上午9:31写道:
>
> @nobleyd
> 谢谢大神指导,前两天休息没看邮件,才回复,抱歉
>
> 我后面把代码大概改成如下样子,checkpoint时间确实得到了改善,job运行几天正常
>
> 我提供的线程池在asyncInvoke方法内部跑,这样是不是不合适呀,asyncInvoke方法本身是不是就是封装好的异步方法,就不用单独启线程池了吧?直接在asyncInvoke方法内部写处理逻辑就好。
>
> public class AsyncProcessFunction extends RichAsyncFunction String>, List> {
>
>  private transient ExecutorService executorpool;
>
> @Override
> public void open(Configuration parameters) throws Exception {
>   executorpool= Executors.newFixedThreadPool(80);
> }
>
> @Override
> public void asyncInvoke(Map message,
> ResultFuture> resultFuture){
> executorpool.submit(()->{
>   // 处理逻辑
>   ..
>   resultFuture.complete(Collections.singletonList(...));
> });
> }
> }
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


【问题分析】Fink任务无限反压

2021-06-07 Thread yidan zhao
该任务有时候正常,偶尔反压。
最近观察发现,反压时,kafkaSouce节点100%反压到停滞,后续算子什么也收不到,任务整体停滞。

这类错误遇到过很多次了,目前我生产中flink有个很大问题就是这些稳定性,压力大不是需要时间去追赶,而是压力一大就整体处于停滞状态。


Re: 【问题分析】Fink任务无限反压

2021-06-07 Thread yidan zhao
正常的反压遇到过很多,也解决过很多。  但我现在这个问题类似的也是很多次了,很奇怪。就是反压到CPU利用率反而成0,就是不工作了。

LakeShen  于2021年6月8日周二 上午10:37写道:
>
> 你可以先结合你的任务逻辑,以及 Flink  Web UI 反压监控,看看到底是什么地方引起反压。
> 一般 source 反压,是下游的算子一直反压到 source。可以看看那个算子引起
>
> Best,
> LakeShen
>
>
> yidan zhao  于2021年6月8日周二 上午10:28写道:
>
> > 该任务有时候正常,偶尔反压。
> > 最近观察发现,反压时,kafkaSouce节点100%反压到停滞,后续算子什么也收不到,任务整体停滞。
> >
> > 这类错误遇到过很多次了,目前我生产中flink有个很大问题就是这些稳定性,压力大不是需要时间去追赶,而是压力一大就整体处于停滞状态。
> >


Re: 【问题分析】Fink任务无限反压

2021-06-07 Thread yidan zhao
怎么说的,这个应该不是代码层面的问题,所以感觉贴代码意义不大。
我是想看有没有类似的什么idea,什么情况可能呈现类似现象。 就是直接停滞了,我发现的时候所有算子都非反压,只要source处于反压状态。
然后source到下一个算子直接的records sent和records received数量已经彻底不变(眼看好几分钟也不变)。

HunterXHunter <1356469...@qq.com> 于2021年6月8日周二 上午11:13写道:
>
> 掐头去尾的提问,完全不知道是什么问题,没法回答你,最好是贴出代码,贴出图片等大家才能帮忙分析
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 【问题分析】Fink任务无限反压

2021-06-08 Thread yidan zhao
目前遇到个报错,这个有人看得懂不。
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection timed out (connection to
'10.35.213.143/10.35.213.143:2008')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection timed out

13631283359 <13631283...@163.com> 于2021年6月8日周二 下午3:41写道:
>
>
>
>
>
>
>
>
>
>
> 1.不知道这边有没有flink数据链路监控
> 2.flink反压可能有以下几种原因:
> 数据倾斜
> TaskManager配置内存太小,导致full gc
> checkpoint太慢
> 状态太大
>
>
>
>
>
>
>
>
> 在 2021-06-08 10:36:49,"LakeShen"  写道:
> >你可以先结合你的任务逻辑,以及 Flink  Web UI 反压监控,看看到底是什么地方引起反压。
> >一般 source 反压,是下游的算子一直反压到 source。可以看看那个算子引起
> >
> >Best,
> >LakeShen
> >
> >
> >yidan zhao  于2021年6月8日周二 上午10:28写道:
> >
> >> 该任务有时候正常,偶尔反压。
> >> 最近观察发现,反压时,kafkaSouce节点100%反压到停滞,后续算子什么也收不到,任务整体停滞。
> >>
> >> 这类错误遇到过很多次了,目前我生产中flink有个很大问题就是这些稳定性,压力大不是需要时间去追赶,而是压力一大就整体处于停滞状态。
> >>


报错分析,readAddress, connection time out。

2021-06-09 Thread yidan zhao
报错日志片段

2021-06-09 17:42:53,873 ERROR
org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] -
Encountered error while consuming partitions

org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection timed out

,如上是某个taskmanager的报错,30个taskmanager不好都去看。想知道这个异常指什么超时?需要做什么调整能改善呢。


flink1.13.1启动时候taskmanager异常日志

2021-06-09 Thread yidan zhao
目前来看不影响运行可能,但是是ERROR的日志。如下:
2021-06-09 20:04:41,194 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Pausing and re-attempting registration in 1 ms
2021-06-09 20:04:51,214 ERROR
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
Registration at ResourceManager failed due to an error
java.util.concurrent.CompletionException:
java.lang.ClassCastException: org.apache.flink.util.SerializedValue
cannot be cast to
org.apache.flink.runtime.registration.RegistrationResponse
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:673)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
~[?:1.8.0_251]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
[?:1.8.0_251]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.13.1.jar:1.13.1]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.13.1.jar:1.13.1]
Caused by: java.lang.ClassCastException:
org.apache.flink.util.SerializedValue cannot be cast to
org.apache.flink.runtime.registration.RegistrationResponse
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
~[?:1.8.0_251]
... 8 more


Re: flink1.13.1启动时候taskmanager异常日志

2021-06-09 Thread yidan zhao
忽略吧。升级的时候多个版本混合在一起导致的应该是,我全部升级完后没这个问题了。

yidan zhao  于2021年6月9日周三 下午8:05写道:
>
> 目前来看不影响运行可能,但是是ERROR的日志。如下:
> 2021-06-09 20:04:41,194 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Pausing and re-attempting registration in 1 ms
> 2021-06-09 20:04:51,214 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
> Registration at ResourceManager failed due to an error
> java.util.concurrent.CompletionException:
> java.lang.ClassCastException: org.apache.flink.util.SerializedValue
> cannot be cast to
> org.apache.flink.runtime.registration.RegistrationResponse
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:673)
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
> ~[?:1.8.0_251]
> at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> [?:1.8.0_251]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> at 
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.13.1.jar:1.13.1]
> Caused by: java.lang.ClassCastException:
> org.apache.flink.util.SerializedValue cannot be cast to
> org.apache.flink.runtime.registration.RegistrationResponse
> at 
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
> ~[?:1.8.0_251]
> ... 8 more


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yidan zhao
log4j还可以打印到kafka吗。

yujianbo <15205029...@163.com> 于2021年6月10日周四 上午11:47写道:
>
> 版本:1.12
> 框架:用默认的log4j2框架
> 问题:日志打到kafka,如何去区分jobmanager和taskmanger日志?我发现去改layout.pattern,还是没有能找到区分的好办法?
>
>
> appender.kafka.layout.type=PatternLayout
> appender.kafka.layout.pattern=%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
> %m%n -- %t -- %F
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink1.13.1启动后web-ui查看taskmanager信息失败并报错,稳定复现。

2021-06-09 Thread yidan zhao
如题,今天从1.12升级到1.13.1,启动standalone集群后。找到web-ui,点taskmanagers,出现列表,然后点任意taskmanager进行查看信息。右上角弹提示Internal
server error。查看了该JM的日志,后面附,主要报错是 Caused by:
java.io.NotSerializableException:
org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots。

此外,经过排查,并不是访问任意JM的rest地址都会有此问题,目前初步规律是,直接访问leader的web-ui无问题,访问其他地址的web-ui有此问题。


报错时JM的错误日志:

2021-06-10 13:00:27,395 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2021-06-10 13:00:27,399 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Preconfiguration:
2021-06-10 13:00:27,400 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -


RESOURCE_PARAMS extraction logs:
jvm_params: -Xmx2093796552 -Xms2093796552 -XX:MaxMetaspaceSize=536870912
dynamic_configs: -D jobmanager.memory.off-heap.size=268435456b -D
jobmanager.memory.jvm-overhead.min=322122552b -D
jobmanager.memory.jvm-metaspace.size=536870912b -D
jobmanager.memory.heap.size=2093796552b -D
jobmanager.memory.jvm-overhead.max=322122552b
logs: INFO  [] - Loading configuration property:
taskmanager.numberOfTaskSlots, 20
INFO  [] - Loading configuration property: cluster.evenly-spread-out-slots, true
INFO  [] - Loading configuration property: parallelism.default, 1
INFO  [] - Loading configuration property: jobmanager.memory.process.size, 3gb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-metaspace.size, 512mb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.fraction, 0.1
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.min, 192mb
INFO  [] - Loading configuration property:
jobmanager.memory.jvm-overhead.max, 512mb
INFO  [] - Loading configuration property:
jobmanager.memory.off-heap.size, 256mb
INFO  [] - Loading configuration property: taskmanager.memory.process.size, 20gb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-metaspace.size, 512mb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.fraction, 0.1
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.min, 192mb
INFO  [] - Loading configuration property:
taskmanager.memory.jvm-overhead.max, 512mb
INFO  [] - Loading configuration property: taskmanager.memory.segment-size, 32kb
INFO  [] - Loading configuration property:
taskmanager.memory.managed.fraction, 0.4
INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 64mb
INFO  [] - Loading configuration property:
taskmanager.memory.network.fraction, 0.1
INFO  [] - Loading configuration property: taskmanager.memory.network.min, 1gb
INFO  [] - Loading configuration property: taskmanager.memory.network.max, 2gb
INFO  [] - Loading configuration property:
taskmanager.memory.framework.off-heap.size, 256mb
INFO  [] - Loading configuration property:
taskmanager.memory.task.off-heap.size, 512mb
INFO  [] - Loading configuration property:
taskmanager.memory.framework.heap.size, 256mb
INFO  [] - Loading configuration property: high-availability, zookeeper
INFO  [] - Loading configuration property:
high-availability.storageDir, bos://flink-bucket/flink/ha
INFO  [] - Loading configuration property:
high-availability.zookeeper.quorum,
bjhw-aisecurity-cassandra01.bjhw:9681,bjhw-aisecurity-cassandra02.bjhw:9681,bjhw-aisecurity-cassandra03.bjhw:9681,bjhw-aisecurity-cassandra04.bjhw:9681,bjhw-aisecurity-cassandra05.bjhw:9681
INFO  [] - Loading configuration property:
high-availability.zookeeper.path.root, /flink
INFO  [] - Loading configuration property:
high-availability.cluster-id, opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property: web.checkpoints.history, 100
INFO  [] - Loading configuration property: state.checkpoints.num-retained, 100
INFO  [] - Loading configuration property: state.checkpoints.dir,
bos://flink-bucket/flink/default-checkpoints
INFO  [] - Loading configuration property: state.savepoints.dir,
bos://flink-bucket/flink/default-savepoints
INFO  [] - Loading configuration property:
jobmanager.execution.failover-strategy, region
INFO  [] - Loading configuration property: web.submit.enable, false
INFO  [] - Loading configuration property: jobmanager.archive.fs.dir,
bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property:
historyserver.archive.fs.dir,
bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
INFO  [] - Loading configuration property:
historyserver.archive.fs.refresh-interval, 1
INFO  [] - Loading configuration property: rest.port, 8600
INFO  [] - Loading configuration property: historyserver.web.port, 8700
INFO  [] - Loading configuration property:
high-availability.jobmanager.port, 9318
INFO  [] - Loading configuration property: blob.server.port, 9320
INFO  [] - Loading configuration property: taskmanager.rpc.port, 9319
INFO  [] - Loading configuration property: taskmanager.data.port, 9325
INFO  [] - Loading configuration property:
me

Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-09 Thread yidan zhao
我觉得还有个头疼的吧,你很多机器,怎么区分每个机器还得。哪个机器的JM/TM的日志。

yujianbo <15205029...@163.com> 于2021年6月10日周四 下午1:48写道:
>
> log4j可以,log4j2也可以,现在头疼已经实现打kafka,不知道怎么区分这两边的日志
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


关于1.13中通过颜色等反馈的反压体验

2021-06-10 Thread yidan zhao
目前初步体验下来,还可以。我明显可以看到窗口出发时机,窗口后续的算子瞬间进入红色,然后几秒后恢复。

此外,非窗口出发时期的话,目前发现跟着KafkaSource算子之后的那个节点相对是busy值最高的,这个我理解为我kafka数据不断进入,所以对于source后直接跟的算子数据比较不断,所以一直处于有输入可用状态。

——不过,我目前采用的是全部算子相同并行度的策略,因为为了让每个算子都平均分配到全部TaskManager机器上。目前任何一个算子并行度设置不一致,flink就无法保证分发task的时候每个task都在TM之间均衡。


检查点失败 Checkpoint Coordinator is suspending 。

2021-06-10 Thread yidan zhao
如题,Checkpoint Coordinator is suspending
这种检查点失败是什么情况,timeout那种我理解是检查点执行时间长,超时了。但是 Checkpoint Coordinator is
suspending 这个是什么含义呢?


Re: Flink1.13.1启动后web-ui查看taskmanager信息失败并报错,稳定复现。

2021-06-10 Thread yidan zhao
Hi。没人遇到过这个问题嘛。我感觉我都稳定复现的。

yidan zhao  于2021年6月10日周四 下午1:20写道:
>
> 如题,今天从1.12升级到1.13.1,启动standalone集群后。找到web-ui,点taskmanagers,出现列表,然后点任意taskmanager进行查看信息。右上角弹提示Internal
> server error。查看了该JM的日志,后面附,主要报错是 Caused by:
> java.io.NotSerializableException:
> org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots。
>
> 此外,经过排查,并不是访问任意JM的rest地址都会有此问题,目前初步规律是,直接访问leader的web-ui无问题,访问其他地址的web-ui有此问题。
>
>
> 报错时JM的错误日志:
>
> 2021-06-10 13:00:27,395 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> 
> 2021-06-10 13:00:27,399 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Preconfiguration:
> 2021-06-10 13:00:27,400 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
>
> RESOURCE_PARAMS extraction logs:
> jvm_params: -Xmx2093796552 -Xms2093796552 -XX:MaxMetaspaceSize=536870912
> dynamic_configs: -D jobmanager.memory.off-heap.size=268435456b -D
> jobmanager.memory.jvm-overhead.min=322122552b -D
> jobmanager.memory.jvm-metaspace.size=536870912b -D
> jobmanager.memory.heap.size=2093796552b -D
> jobmanager.memory.jvm-overhead.max=322122552b
> logs: INFO  [] - Loading configuration property:
> taskmanager.numberOfTaskSlots, 20
> INFO  [] - Loading configuration property: cluster.evenly-spread-out-slots, 
> true
> INFO  [] - Loading configuration property: parallelism.default, 1
> INFO  [] - Loading configuration property: jobmanager.memory.process.size, 3gb
> INFO  [] - Loading configuration property:
> jobmanager.memory.jvm-metaspace.size, 512mb
> INFO  [] - Loading configuration property:
> jobmanager.memory.jvm-overhead.fraction, 0.1
> INFO  [] - Loading configuration property:
> jobmanager.memory.jvm-overhead.min, 192mb
> INFO  [] - Loading configuration property:
> jobmanager.memory.jvm-overhead.max, 512mb
> INFO  [] - Loading configuration property:
> jobmanager.memory.off-heap.size, 256mb
> INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> 20gb
> INFO  [] - Loading configuration property:
> taskmanager.memory.jvm-metaspace.size, 512mb
> INFO  [] - Loading configuration property:
> taskmanager.memory.jvm-overhead.fraction, 0.1
> INFO  [] - Loading configuration property:
> taskmanager.memory.jvm-overhead.min, 192mb
> INFO  [] - Loading configuration property:
> taskmanager.memory.jvm-overhead.max, 512mb
> INFO  [] - Loading configuration property: taskmanager.memory.segment-size, 
> 32kb
> INFO  [] - Loading configuration property:
> taskmanager.memory.managed.fraction, 0.4
> INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 
> 64mb
> INFO  [] - Loading configuration property:
> taskmanager.memory.network.fraction, 0.1
> INFO  [] - Loading configuration property: taskmanager.memory.network.min, 1gb
> INFO  [] - Loading configuration property: taskmanager.memory.network.max, 2gb
> INFO  [] - Loading configuration property:
> taskmanager.memory.framework.off-heap.size, 256mb
> INFO  [] - Loading configuration property:
> taskmanager.memory.task.off-heap.size, 512mb
> INFO  [] - Loading configuration property:
> taskmanager.memory.framework.heap.size, 256mb
> INFO  [] - Loading configuration property: high-availability, zookeeper
> INFO  [] - Loading configuration property:
> high-availability.storageDir, bos://flink-bucket/flink/ha
> INFO  [] - Loading configuration property:
> high-availability.zookeeper.quorum,
> bjhw-aisecurity-cassandra01.bjhw:9681,bjhw-aisecurity-cassandra02.bjhw:9681,bjhw-aisecurity-cassandra03.bjhw:9681,bjhw-aisecurity-cassandra04.bjhw:9681,bjhw-aisecurity-cassandra05.bjhw:9681
> INFO  [] - Loading configuration property:
> high-availability.zookeeper.path.root, /flink
> INFO  [] - Loading configuration property:
> high-availability.cluster-id, opera_upd_FlinkTestJob3
> INFO  [] - Loading configuration property: web.checkpoints.history, 100
> INFO  [] - Loading configuration property: state.checkpoints.num-retained, 100
> INFO  [] - Loading configuration property: state.checkpoints.dir,
> bos://flink-bucket/flink/default-checkpoints
> INFO  [] - Loading configuration property: state.savepoints.dir,
> bos://flink-bucket/flink/default-savepoints
> INFO  [] - Loading configuration property:
> jobmanager.execution.failover-strategy, region
> INFO  [] - Loading configuration property: web.submit.enable, false
> INFO  [] - Loading configuration property: jobmanager.archive.fs.dir,
> bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3
> INFO  [] - Loading configuration property:
> historyserver.archive.fs.dir,
> bos://flink-bucket/flink/completed-jobs/opera_upd_FlinkTestJob3

Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-10 Thread yidan zhao
@yujianbo hi。可以把你log4j的配置发出来嘛,我也参考参考。

yujianbo <15205029...@163.com> 于2021年6月10日周四 下午3:31写道:
>
> 大佬,能告知一下吗?我目前知道lay out有这么多的参数可以配置,哪个参数能区分jm或者tm的日志呢:
>
> 具体的格式化说明:
>   %p:输出日志信息的优先级,即DEBUG,INFO,WARN,ERROR,FATAL。
>   %d:输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,如:%d{/MM/dd
> HH:mm:ss,SSS}。
>   %r:输出自应用程序启动到输出该log信息耗费的毫秒数。
>   %t:输出产生该日志事件的线程名。
>
> %l:输出日志事件的发生位置,相当于%c.%M(%F:%L)的组合,包括类全名、方法、文件名以及在代码中的行数。例如:test.TestLog4j.main(TestLog4j.java:10)。
>   %c:输出日志信息所属的日志对象,也就是getLogger()中的内容。
>   %C:输出日志信息所属的类目;
>   %logger:log4j中没有此格式;
>   %M:输出产生日志信息的方法名。
>   %F:输出日志消息产生时所在的文件名称。
>   %L::输出代码中的行号。
>   %m::输出代码中指定的具体日志信息。
>   %n:输出一个回车换行符,Windows平台为"rn",Unix平台为"n"。
>   %x:输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。
>   %%:输出一个"%"字符。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-10 Thread yidan zhao
 我也尝试了一波,不过比较奇怪的是,我程序测试log4可以写kafka OK。但是flink就是写不进去。

yidan zhao  于2021年6月10日周四 下午4:18写道:
>
> @yujianbo hi。可以把你log4j的配置发出来嘛,我也参考参考。
>
> yujianbo <15205029...@163.com> 于2021年6月10日周四 下午3:31写道:
> >
> > 大佬,能告知一下吗?我目前知道lay out有这么多的参数可以配置,哪个参数能区分jm或者tm的日志呢:
> >
> > 具体的格式化说明:
> >   %p:输出日志信息的优先级,即DEBUG,INFO,WARN,ERROR,FATAL。
> >   %d:输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,如:%d{/MM/dd
> > HH:mm:ss,SSS}。
> >   %r:输出自应用程序启动到输出该log信息耗费的毫秒数。
> >   %t:输出产生该日志事件的线程名。
> >
> > %l:输出日志事件的发生位置,相当于%c.%M(%F:%L)的组合,包括类全名、方法、文件名以及在代码中的行数。例如:test.TestLog4j.main(TestLog4j.java:10)。
> >   %c:输出日志信息所属的日志对象,也就是getLogger()中的内容。
> >   %C:输出日志信息所属的类目;
> >   %logger:log4j中没有此格式;
> >   %M:输出产生日志信息的方法名。
> >   %F:输出日志消息产生时所在的文件名称。
> >   %L::输出代码中的行号。
> >   %m::输出代码中指定的具体日志信息。
> >   %n:输出一个回车换行符,Windows平台为"rn",Unix平台为"n"。
> >   %x:输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。
> >   %%:输出一个"%"字符。
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink1.13.1启动后web-ui查看taskmanager信息失败并报错,稳定复现。

2021-06-10 Thread yidan zhao
目前除了此问题外,还发现不少升级1.13.1后的问题。都是些比较奇怪的。具体遇到一个提交任务后,一直处于created状态的问题。

此外最麻烦的还是如上说的那个,后端一直报错,我集群30容器,每次都得想办法找到leader,否则都没办法看具体日志。

yidan zhao  于2021年6月10日周四 下午3:49写道:
>
> Hi。没人遇到过这个问题嘛。我感觉我都稳定复现的。
>
> yidan zhao  于2021年6月10日周四 下午1:20写道:
> >
> > 如题,今天从1.12升级到1.13.1,启动standalone集群后。找到web-ui,点taskmanagers,出现列表,然后点任意taskmanager进行查看信息。右上角弹提示Internal
> > server error。查看了该JM的日志,后面附,主要报错是 Caused by:
> > java.io.NotSerializableException:
> > org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots。
> >
> > 此外,经过排查,并不是访问任意JM的rest地址都会有此问题,目前初步规律是,直接访问leader的web-ui无问题,访问其他地址的web-ui有此问题。
> >
> >
> > 报错时JM的错误日志:
> >
> > 2021-06-10 13:00:27,395 INFO
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> > 
> > 2021-06-10 13:00:27,399 INFO
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> > Preconfiguration:
> > 2021-06-10 13:00:27,400 INFO
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> >
> >
> > RESOURCE_PARAMS extraction logs:
> > jvm_params: -Xmx2093796552 -Xms2093796552 -XX:MaxMetaspaceSize=536870912
> > dynamic_configs: -D jobmanager.memory.off-heap.size=268435456b -D
> > jobmanager.memory.jvm-overhead.min=322122552b -D
> > jobmanager.memory.jvm-metaspace.size=536870912b -D
> > jobmanager.memory.heap.size=2093796552b -D
> > jobmanager.memory.jvm-overhead.max=322122552b
> > logs: INFO  [] - Loading configuration property:
> > taskmanager.numberOfTaskSlots, 20
> > INFO  [] - Loading configuration property: cluster.evenly-spread-out-slots, 
> > true
> > INFO  [] - Loading configuration property: parallelism.default, 1
> > INFO  [] - Loading configuration property: jobmanager.memory.process.size, 
> > 3gb
> > INFO  [] - Loading configuration property:
> > jobmanager.memory.jvm-metaspace.size, 512mb
> > INFO  [] - Loading configuration property:
> > jobmanager.memory.jvm-overhead.fraction, 0.1
> > INFO  [] - Loading configuration property:
> > jobmanager.memory.jvm-overhead.min, 192mb
> > INFO  [] - Loading configuration property:
> > jobmanager.memory.jvm-overhead.max, 512mb
> > INFO  [] - Loading configuration property:
> > jobmanager.memory.off-heap.size, 256mb
> > INFO  [] - Loading configuration property: taskmanager.memory.process.size, 
> > 20gb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.jvm-metaspace.size, 512mb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.jvm-overhead.fraction, 0.1
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.jvm-overhead.min, 192mb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.jvm-overhead.max, 512mb
> > INFO  [] - Loading configuration property: taskmanager.memory.segment-size, 
> > 32kb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.managed.fraction, 0.4
> > INFO  [] - Loading configuration property: taskmanager.memory.managed.size, 
> > 64mb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.network.fraction, 0.1
> > INFO  [] - Loading configuration property: taskmanager.memory.network.min, 
> > 1gb
> > INFO  [] - Loading configuration property: taskmanager.memory.network.max, 
> > 2gb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.framework.off-heap.size, 256mb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.task.off-heap.size, 512mb
> > INFO  [] - Loading configuration property:
> > taskmanager.memory.framework.heap.size, 256mb
> > INFO  [] - Loading configuration property: high-availability, zookeeper
> > INFO  [] - Loading configuration property:
> > high-availability.storageDir, bos://flink-bucket/flink/ha
> > INFO  [] - Loading configuration property:
> > high-availability.zookeeper.quorum,
> > bjhw-aisecurity-cassandra01.bjhw:9681,bjhw-aisecurity-cassandra02.bjhw:9681,bjhw-aisecurity-cassandra03.bjhw:9681,bjhw-aisecurity-cassandra04.bjhw:9681,bjhw-aisecurity-cassandra05.bjhw:9681
> > INFO  [] - Loading configuration property:
> > high-availability.zookeeper.path.root, /flink
> > INFO  [] - Loading configuration property:
> > high-availability.cluster-id, opera_upd_FlinkTestJob3
> > INFO  [] - Loading configuration property: web.checkpoints.history, 100
> > INFO  [] - Loading configuration property: state.checkpoints.num-retained, 
> > 100
> > INFO  [] - Loading configuration property: state.checkpoints.dir,
> > bos://flink-b

Flink的web-ui卡顿问题

2021-06-10 Thread yidan zhao
工作中遇到的各种web-ui现象:
0 web-ui正常。
1 web-ui卡顿,进入页面时候白色,一直转圈。看控制台发现network部分对应请求一直pending,请求一直不返回。
2 web-ui卡顿,转圈很久突然一瞬间展示,如何刷新就继续转圈进入白色页面了。
3 web-uI基本正常使用,但是很卡顿。
4 任务提交、取消、主动触发保存点等case下,web-ui卡顿严重,过会恢复。
5 如上几个case中,有时候伴随卡顿之后进入选举,然后重新进入循环(可能正常也可能继续卡顿继续导致JM进程失败导致重启拉起进入选举等)。


如上总结,目前我部署集群,比如30个容器。每个容器都部署Jm和Tm进程。JM分配3G内存,TM分配20G内存。我自认为web-ui的卡顿部分问题在于JM的性能假设,那么我想知道的是:
(1)是否需要降低JM进程的数量,比如30个JM是不是会导致选举性能下降,那么JM一般搞多少合适呢?比如搞3个(在TM为30个的规模背景下)?
(2)考虑30个JM,每个JM3G内存,那就是90G内存。是否改造成3个JM,每个30G内存,这样leader的JM进程就性能好一些,这个想法正确吗?


Re: Flink1.12 用官方默认的log4j2打日志到kafka,如何区分jobmanager和TaskManager的日志?怎么加参数?

2021-06-11 Thread yidan zhao
OK. done。  成功了。 至于怎么区分机器的话可以使用env或sys变量。
JM和TM的话可以自己改一改flink-daemon.sh脚本的启动部分,新增 "-Dxxx=${DAEMON}-${id}"
系统启动属性。然后log4使用${sys:xxx}

yidan zhao  于2021年6月10日周四 下午8:08写道:
>
>  我也尝试了一波,不过比较奇怪的是,我程序测试log4可以写kafka OK。但是flink就是写不进去。
>
> yidan zhao  于2021年6月10日周四 下午4:18写道:
> >
> > @yujianbo hi。可以把你log4j的配置发出来嘛,我也参考参考。
> >
> > yujianbo <15205029...@163.com> 于2021年6月10日周四 下午3:31写道:
> > >
> > > 大佬,能告知一下吗?我目前知道lay out有这么多的参数可以配置,哪个参数能区分jm或者tm的日志呢:
> > >
> > > 具体的格式化说明:
> > >   %p:输出日志信息的优先级,即DEBUG,INFO,WARN,ERROR,FATAL。
> > >   %d:输出日志时间点的日期或时间,默认格式为ISO8601,也可以在其后指定格式,如:%d{/MM/dd
> > > HH:mm:ss,SSS}。
> > >   %r:输出自应用程序启动到输出该log信息耗费的毫秒数。
> > >   %t:输出产生该日志事件的线程名。
> > >
> > > %l:输出日志事件的发生位置,相当于%c.%M(%F:%L)的组合,包括类全名、方法、文件名以及在代码中的行数。例如:test.TestLog4j.main(TestLog4j.java:10)。
> > >   %c:输出日志信息所属的日志对象,也就是getLogger()中的内容。
> > >   %C:输出日志信息所属的类目;
> > >   %logger:log4j中没有此格式;
> > >   %M:输出产生日志信息的方法名。
> > >   %F:输出日志消息产生时所在的文件名称。
> > >   %L::输出代码中的行号。
> > >   %m::输出代码中指定的具体日志信息。
> > >   %n:输出一个回车换行符,Windows平台为"rn",Unix平台为"n"。
> > >   %x:输出和当前线程相关联的NDC(嵌套诊断环境),尤其用到像java servlets这样的多客户多线程的应用中。
> > >   %%:输出一个"%"字符。
> > >
> > >
> > >
> > > --
> > > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: JobManager使用zk做高可用的时候,如何得到当前活跃的JobManager地址

2021-06-14 Thread yidan zhao
指定随意一个都可以估计。

cuicle  于2021年6月15日周二 上午11:34写道:
>
> 当我创建一个StreamExecutionEnvironment的时候,需要指定JobManager的地址
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8080);
> 当使用zk做高可用的时候,如何获得当前活跃的JobManager地址?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: JobManager使用zk做高可用的时候,如何得到当前活跃的JobManager地址

2021-06-14 Thread yidan zhao
你这个方式不是用于测试嘛。线上情况不应该使用flink run命令嘛。使用flink run的情况下本身就是基于zk自动获取jm地址提交的。

cuicle  于2021年6月15日周二 下午2:04写道:
>
> 别估计啊。。。理论上只有一个active
> jobManager,其它的都是standby。就算你说的可以,那客户端再维护所有的JobManager的地址也是很丑的呀。
>
>
> 我是想是否能从zk里面直接获取当前的active jobManager
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: JobManager使用zk做高可用的时候,如何得到当前活跃的JobManager地址

2021-06-14 Thread yidan zhao
代码里不需要remoteEnv创建,你这种removeEnv是通过本地ide提交任务的情况。这种情况很少见,一般线上网络环境和办公网络是隔离的吧。

yidan zhao  于2021年6月15日周二 下午2:49写道:
>
> 你这个方式不是用于测试嘛。线上情况不应该使用flink run命令嘛。使用flink run的情况下本身就是基于zk自动获取jm地址提交的。
>
> cuicle  于2021年6月15日周二 下午2:04写道:
> >
> > 别估计啊。。。理论上只有一个active
> > jobManager,其它的都是standby。就算你说的可以,那客户端再维护所有的JobManager的地址也是很丑的呀。
> >
> >
> > 我是想是否能从zk里面直接获取当前的active jobManager
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


关于反压的问题

2021-06-15 Thread yidan zhao
假设任务逻辑为 A => B => C 这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。

A是Kafka数据源,数据qps很平滑。
B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。

总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)

对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
C task busy,进而导致B被反压,即B的backpress值很高。

此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。

想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


Re: 关于反压的问题

2021-06-15 Thread yidan zhao
flink 的task单线程,我那个问题中,很可能当B被反压时候,线程被卡在向C发送的过程中。这样B本身就无法处理A的输入。进而导致receive
buffer满,然后A被反压。
如上为我的想法,这个也比较坑。如果是这样,那对于我这种类似的case,如果C属于计算复杂的task,B属于window的话。可能需要从B到C之间分割成2个任务。否则性能影响还是比较大的。

东东  于2021年6月15日周二 下午7:07写道:
>
> flink处理反压,我理解就是靠task之间的send/receive buffer,所以如果仅仅是下游的receive buffer满了,上游的send 
> buffer没满,就不会影响上游,否则就会影响,且影响发生在上游输出结果的时候。
> 所以总体来说就是看你buffer的大小,越大就约能碾平反压的影响,否则就约容易受到影响。
>
> 在 2021-06-15 17:39:26,"yidan zhao"  写道:
> >假设任务逻辑为 A => B => C 这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。
> >
> >A是Kafka数据源,数据qps很平滑。
> >B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
> >C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。
> >
> >总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)
> >
> >对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
> >C task busy,进而导致B被反压,即B的backpress值很高。
> >
> >此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
> > 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
> >但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
> >为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。
> >
> >想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
> >还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


Flink1.13.1提交任务一直处于created状态

2021-06-15 Thread yidan zhao
如题,提交任务后,全部处于created状态。slot资源是充足的。直到失败。看日志,失败原因是触发了检查点,但由于任务不处于running状态,所以失败。
因此,本质问题还是为什么一直处于created状态呢?看了JM的日志,最后一部分日志也没异常,显示的是:
INFO 2021-06-15 20:06:48,297
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager -
Registered job manager
a8f816a36a786bea34a5ae44a5fd4...@akka.tcp://flink@10.194.70.28:9518/user/rpc/jobmanager_10
for job 426f13e61d032a5922044bed776f5ba0.
这条日志打印了很多很多次(估计几十上百次)。


Re: Re: 关于反压的问题

2021-06-15 Thread yidan zhao
我不计划做分开的尝试,目前只是分析。
但我还是认为,按照flink单个sub-task只是单线程模型的话,如果这个线程正在请求输出buffer,结果请求不到,挂起等待输出buffer,即被反压。这种情况下肯定是不会处理输入的。
但实际上对于window这类算子来说,处理输入并不会导致马上更多输出,而只是更新window算子的状态而已(比如pv累加),这么考虑的话,我更希望B被反压时候,仍然继续处理A的输出,而不是不处理进而导致A被反压。

东东  于2021年6月16日周三 上午10:59写道:
>
>
>
> B到C如何分割成2个任务,你是打算把B里面每个Window的数据都Sink出来,然后在另外一个任务里读进来再用C处理么?
>
>
> 这样做的意义何在呢?仅仅从监控层面看不见反压而已(反压实际上转移到连接B和C的存储上来了),整体的系统开销是上升的,对吞吐和延迟恐怕并没有正面的影响。
>
>
> 对性能影响大的肯定是瓶颈部分,不提高C的处理能力,优化其他部分没有多大意义。
>
>
> 如果只是希望反压的影响更平滑,可以尝试增大buffer,整体开销恐怕还是比增加一个任务要小。
>
>
> 在 2021-06-15 20:03:32,"yidan zhao"  写道:
> >flink 的task单线程,我那个问题中,很可能当B被反压时候,线程被卡在向C发送的过程中。这样B本身就无法处理A的输入。进而导致receive
> >buffer满,然后A被反压。
> >如上为我的想法,这个也比较坑。如果是这样,那对于我这种类似的case,如果C属于计算复杂的task,B属于window的话。可能需要从B到C之间分割成2个任务。否则性能影响还是比较大的。
> >
> >东东  于2021年6月15日周二 下午7:07写道:
> >>
> >> flink处理反压,我理解就是靠task之间的send/receive buffer,所以如果仅仅是下游的receive 
> >> buffer满了,上游的send buffer没满,就不会影响上游,否则就会影响,且影响发生在上游输出结果的时候。
> >> 所以总体来说就是看你buffer的大小,越大就约能碾平反压的影响,否则就约容易受到影响。
> >>
> >> 在 2021-06-15 17:39:26,"yidan zhao"  写道:
> >> >假设任务逻辑为 A => B => C 这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。
> >> >
> >> >A是Kafka数据源,数据qps很平滑。
> >> >B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
> >> >C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。
> >> >
> >> >总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)
> >> >
> >> >对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
> >> >C task busy,进而导致B被反压,即B的backpress值很高。
> >> >
> >> >此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
> >> > 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
> >> >但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
> >> >为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。
> >> >
> >> >想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
> >> >还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


Re: Re: Re: 关于反压的问题

2021-06-15 Thread yidan zhao
不是呀,对于window来说,我每个key假设统计的是pv,pv=1和pv=2的存储空间是一样的,如果上游不发数据过来,这个pv就停滞在某个值不变了,发数据过来也不会导致window的数据空间变大,只是状态值变。
我考虑的是window这个sub-taskd单线程模型问题。不是buffer的问题,对于非window类算子这个问题不存在,但是window的触发是基于时间的。这意味着更多的上游数据的到位,加重的主要是cpu压力,而不是内存等状态存储压力。

东东  于2021年6月16日周三 上午11:36写道:
>
>
>
> 这中间有buffer的存在啊,并不是只要下游处理慢,buffer就一定慢,要看你究竟有多慢,还有上游究竟有多快。
>
>
>
> 目前flink是通允许你控制buffer的大小,来调整这个容忍程度的,但这个容忍肯定是有限度。
>
>
> 另外,你似乎在意的是数据buffer在哪里,buffer在window里和buffer在上游,从全局看,肯定是有持久化能力的上游更靠谱。如果buffer在window算子里,那么:
> 1、你的checkpoint会很慢,因为aligned checkpoint会等对齐之后才能完成。
> 2、如果启用unaligned checkpoint的话,checkpoint会变大,且如果没有反压上游,让上游减速的机制的话,这个增大会无法控制。
>
>
> 目前的机制提供了一种有兜底的灵活调整能力,其实是简单合理的设计。
>
>
>
> 在 2021-06-16 11:04:53,"yidan zhao"  写道:
> >我不计划做分开的尝试,目前只是分析。
> >但我还是认为,按照flink单个sub-task只是单线程模型的话,如果这个线程正在请求输出buffer,结果请求不到,挂起等待输出buffer,即被反压。这种情况下肯定是不会处理输入的。
> >但实际上对于window这类算子来说,处理输入并不会导致马上更多输出,而只是更新window算子的状态而已(比如pv累加),这么考虑的话,我更希望B被反压时候,仍然继续处理A的输出,而不是不处理进而导致A被反压。
> >
> >东东  于2021年6月16日周三 上午10:59写道:
> >>
> >>
> >>
> >> B到C如何分割成2个任务,你是打算把B里面每个Window的数据都Sink出来,然后在另外一个任务里读进来再用C处理么?
> >>
> >>
> >> 这样做的意义何在呢?仅仅从监控层面看不见反压而已(反压实际上转移到连接B和C的存储上来了),整体的系统开销是上升的,对吞吐和延迟恐怕并没有正面的影响。
> >>
> >>
> >> 对性能影响大的肯定是瓶颈部分,不提高C的处理能力,优化其他部分没有多大意义。
> >>
> >>
> >> 如果只是希望反压的影响更平滑,可以尝试增大buffer,整体开销恐怕还是比增加一个任务要小。
> >>
> >>
> >> 在 2021-06-15 20:03:32,"yidan zhao"  写道:
> >> >flink 的task单线程,我那个问题中,很可能当B被反压时候,线程被卡在向C发送的过程中。这样B本身就无法处理A的输入。进而导致receive
> >> >buffer满,然后A被反压。
> >> >如上为我的想法,这个也比较坑。如果是这样,那对于我这种类似的case,如果C属于计算复杂的task,B属于window的话。可能需要从B到C之间分割成2个任务。否则性能影响还是比较大的。
> >> >
> >> >东东  于2021年6月15日周二 下午7:07写道:
> >> >>
> >> >> flink处理反压,我理解就是靠task之间的send/receive buffer,所以如果仅仅是下游的receive 
> >> >> buffer满了,上游的send buffer没满,就不会影响上游,否则就会影响,且影响发生在上游输出结果的时候。
> >> >> 所以总体来说就是看你buffer的大小,越大就约能碾平反压的影响,否则就约容易受到影响。
> >> >>
> >> >> 在 2021-06-15 17:39:26,"yidan zhao"  写道:
> >> >> >假设任务逻辑为 A => B => C 这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。
> >> >> >
> >> >> >A是Kafka数据源,数据qps很平滑。
> >> >> >B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
> >> >> >C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。
> >> >> >
> >> >> >总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)
> >> >> >
> >> >> >对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
> >> >> >C task busy,进而导致B被反压,即B的backpress值很高。
> >> >> >
> >> >> >此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
> >> >> > 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
> >> >> >但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
> >> >> >为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。
> >> >> >
> >> >> >想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
> >> >> >还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


Re: Re: Re: Re: 关于反压的问题

2021-06-15 Thread yidan zhao
下游并不是“很慢”,只是由于下游是接受B输出的,而B的输出是集中窗口出发的,0-5分数据都是在6min触发输出到C。假设C需要1min的时间处理完这波数据。那么6-7min的时间段内,B都处于反压状态,进而A也反压,等到7min后,B才开始继续处理A输出的6-10min的数据,这样就浪费了至少1min的时间呀。B其实从6min就可以继续处理上游数据。

东东  于2021年6月16日周三 下午12:32写道:
>
>
>
>
> 如果B处理完的数据结构很小,比如你就是一个简单的count,那么buffer里就可以放很多个window,如果这样buffer都满了,说明上下游处理能力不匹配非常严重啊,也许下游永远也别想跟上了,这个时候,你是希望上游慢一点,还是上游保持现在的速度,一直到把内存撑爆(也就是不可控),还是咋的?
>
>
>
>
>
>
> 在 2021-06-16 11:41:08,"yidan zhao"  写道:
> >不是呀,对于window来说,我每个key假设统计的是pv,pv=1和pv=2的存储空间是一样的,如果上游不发数据过来,这个pv就停滞在某个值不变了,发数据过来也不会导致window的数据空间变大,只是状态值变。
> >我考虑的是window这个sub-taskd单线程模型问题。不是buffer的问题,对于非window类算子这个问题不存在,但是window的触发是基于时间的。这意味着更多的上游数据的到位,加重的主要是cpu压力,而不是内存等状态存储压力。
> >
> >东东  于2021年6月16日周三 上午11:36写道:
> >>
> >>
> >>
> >> 这中间有buffer的存在啊,并不是只要下游处理慢,buffer就一定慢,要看你究竟有多慢,还有上游究竟有多快。
> >>
> >>
> >>
> >> 目前flink是通允许你控制buffer的大小,来调整这个容忍程度的,但这个容忍肯定是有限度。
> >>
> >>
> >> 另外,你似乎在意的是数据buffer在哪里,buffer在window里和buffer在上游,从全局看,肯定是有持久化能力的上游更靠谱。如果buffer在window算子里,那么:
> >> 1、你的checkpoint会很慢,因为aligned checkpoint会等对齐之后才能完成。
> >> 2、如果启用unaligned checkpoint的话,checkpoint会变大,且如果没有反压上游,让上游减速的机制的话,这个增大会无法控制。
> >>
> >>
> >> 目前的机制提供了一种有兜底的灵活调整能力,其实是简单合理的设计。
> >>
> >>
> >>
> >> 在 2021-06-16 11:04:53,"yidan zhao"  写道:
> >> >我不计划做分开的尝试,目前只是分析。
> >> >但我还是认为,按照flink单个sub-task只是单线程模型的话,如果这个线程正在请求输出buffer,结果请求不到,挂起等待输出buffer,即被反压。这种情况下肯定是不会处理输入的。
> >> >但实际上对于window这类算子来说,处理输入并不会导致马上更多输出,而只是更新window算子的状态而已(比如pv累加),这么考虑的话,我更希望B被反压时候,仍然继续处理A的输出,而不是不处理进而导致A被反压。
> >> >
> >> >东东  于2021年6月16日周三 上午10:59写道:
> >> >>
> >> >>
> >> >>
> >> >> B到C如何分割成2个任务,你是打算把B里面每个Window的数据都Sink出来,然后在另外一个任务里读进来再用C处理么?
> >> >>
> >> >>
> >> >> 这样做的意义何在呢?仅仅从监控层面看不见反压而已(反压实际上转移到连接B和C的存储上来了),整体的系统开销是上升的,对吞吐和延迟恐怕并没有正面的影响。
> >> >>
> >> >>
> >> >> 对性能影响大的肯定是瓶颈部分,不提高C的处理能力,优化其他部分没有多大意义。
> >> >>
> >> >>
> >> >> 如果只是希望反压的影响更平滑,可以尝试增大buffer,整体开销恐怕还是比增加一个任务要小。
> >> >>
> >> >>
> >> >> 在 2021-06-15 20:03:32,"yidan zhao"  写道:
> >> >> >flink 
> >> >> >的task单线程,我那个问题中,很可能当B被反压时候,线程被卡在向C发送的过程中。这样B本身就无法处理A的输入。进而导致receive
> >> >> >buffer满,然后A被反压。
> >> >> >如上为我的想法,这个也比较坑。如果是这样,那对于我这种类似的case,如果C属于计算复杂的task,B属于window的话。可能需要从B到C之间分割成2个任务。否则性能影响还是比较大的。
> >> >> >
> >> >> >东东  于2021年6月15日周二 下午7:07写道:
> >> >> >>
> >> >> >> flink处理反压,我理解就是靠task之间的send/receive buffer,所以如果仅仅是下游的receive 
> >> >> >> buffer满了,上游的send buffer没满,就不会影响上游,否则就会影响,且影响发生在上游输出结果的时候。
> >> >> >> 所以总体来说就是看你buffer的大小,越大就约能碾平反压的影响,否则就约容易受到影响。
> >> >> >>
> >> >> >> 在 2021-06-15 17:39:26,"yidan zhao"  写道:
> >> >> >> >假设任务逻辑为 A => B => C 这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。
> >> >> >> >
> >> >> >> >A是Kafka数据源,数据qps很平滑。
> >> >> >> >B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
> >> >> >> >C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。
> >> >> >> >
> >> >> >> >总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)
> >> >> >> >
> >> >> >> >对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
> >> >> >> >C task busy,进而导致B被反压,即B的backpress值很高。
> >> >> >> >
> >> >> >> >此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
> >> >> >> > 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
> >> >> >> >但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
> >> >> >> >为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。
> >> >> >> >
> >> >> >> >想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
> >> >> >> >还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


Re: Re: Re: Re: Re: 关于反压的问题

2021-06-16 Thread yidan zhao
嗯,你这个说法我同意。

东东  于2021年6月16日周三 下午2:34写道:
>
> 就你这个例子,只要buffer足够大,B在6min产生的数据都能放进buffer里,B就感受不到反压的影响,可以继续处理上游的数据啊,只要下一个窗口触发之前,C能处理完buffer中的数据,那么B全程都不会被限制。buffer在send和receive两端都是有的,B只关心自己的send
>  buffer还能不能写进去。
>
>
> 在 2021-06-16 13:32:52,"yidan zhao"  写道:
> >下游并不是“很慢”,只是由于下游是接受B输出的,而B的输出是集中窗口出发的,0-5分数据都是在6min触发输出到C。假设C需要1min的时间处理完这波数据。那么6-7min的时间段内,B都处于反压状态,进而A也反压,等到7min后,B才开始继续处理A输出的6-10min的数据,这样就浪费了至少1min的时间呀。B其实从6min就可以继续处理上游数据。
> >
> >东东  于2021年6月16日周三 下午12:32写道:
> >>
> >>
> >>
> >>
> >> 如果B处理完的数据结构很小,比如你就是一个简单的count,那么buffer里就可以放很多个window,如果这样buffer都满了,说明上下游处理能力不匹配非常严重啊,也许下游永远也别想跟上了,这个时候,你是希望上游慢一点,还是上游保持现在的速度,一直到把内存撑爆(也就是不可控),还是咋的?
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2021-06-16 11:41:08,"yidan zhao"  写道:
> >> >不是呀,对于window来说,我每个key假设统计的是pv,pv=1和pv=2的存储空间是一样的,如果上游不发数据过来,这个pv就停滞在某个值不变了,发数据过来也不会导致window的数据空间变大,只是状态值变。
> >> >我考虑的是window这个sub-taskd单线程模型问题。不是buffer的问题,对于非window类算子这个问题不存在,但是window的触发是基于时间的。这意味着更多的上游数据的到位,加重的主要是cpu压力,而不是内存等状态存储压力。
> >> >
> >> >东东  于2021年6月16日周三 上午11:36写道:
> >> >>
> >> >>
> >> >>
> >> >> 这中间有buffer的存在啊,并不是只要下游处理慢,buffer就一定慢,要看你究竟有多慢,还有上游究竟有多快。
> >> >>
> >> >>
> >> >>
> >> >> 目前flink是通允许你控制buffer的大小,来调整这个容忍程度的,但这个容忍肯定是有限度。
> >> >>
> >> >>
> >> >> 另外,你似乎在意的是数据buffer在哪里,buffer在window里和buffer在上游,从全局看,肯定是有持久化能力的上游更靠谱。如果buffer在window算子里,那么:
> >> >> 1、你的checkpoint会很慢,因为aligned checkpoint会等对齐之后才能完成。
> >> >> 2、如果启用unaligned 
> >> >> checkpoint的话,checkpoint会变大,且如果没有反压上游,让上游减速的机制的话,这个增大会无法控制。
> >> >>
> >> >>
> >> >> 目前的机制提供了一种有兜底的灵活调整能力,其实是简单合理的设计。
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-06-16 11:04:53,"yidan zhao"  写道:
> >> >> >我不计划做分开的尝试,目前只是分析。
> >> >> >但我还是认为,按照flink单个sub-task只是单线程模型的话,如果这个线程正在请求输出buffer,结果请求不到,挂起等待输出buffer,即被反压。这种情况下肯定是不会处理输入的。
> >> >> >但实际上对于window这类算子来说,处理输入并不会导致马上更多输出,而只是更新window算子的状态而已(比如pv累加),这么考虑的话,我更希望B被反压时候,仍然继续处理A的输出,而不是不处理进而导致A被反压。
> >> >> >
> >> >> >东东  于2021年6月16日周三 上午10:59写道:
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> B到C如何分割成2个任务,你是打算把B里面每个Window的数据都Sink出来,然后在另外一个任务里读进来再用C处理么?
> >> >> >>
> >> >> >>
> >> >> >> 这样做的意义何在呢?仅仅从监控层面看不见反压而已(反压实际上转移到连接B和C的存储上来了),整体的系统开销是上升的,对吞吐和延迟恐怕并没有正面的影响。
> >> >> >>
> >> >> >>
> >> >> >> 对性能影响大的肯定是瓶颈部分,不提高C的处理能力,优化其他部分没有多大意义。
> >> >> >>
> >> >> >>
> >> >> >> 如果只是希望反压的影响更平滑,可以尝试增大buffer,整体开销恐怕还是比增加一个任务要小。
> >> >> >>
> >> >> >>
> >> >> >> 在 2021-06-15 20:03:32,"yidan zhao"  写道:
> >> >> >> >flink 
> >> >> >> >的task单线程,我那个问题中,很可能当B被反压时候,线程被卡在向C发送的过程中。这样B本身就无法处理A的输入。进而导致receive
> >> >> >> >buffer满,然后A被反压。
> >> >> >> >如上为我的想法,这个也比较坑。如果是这样,那对于我这种类似的case,如果C属于计算复杂的task,B属于window的话。可能需要从B到C之间分割成2个任务。否则性能影响还是比较大的。
> >> >> >> >
> >> >> >> >东东  于2021年6月15日周二 下午7:07写道:
> >> >> >> >>
> >> >> >> >> flink处理反压,我理解就是靠task之间的send/receive buffer,所以如果仅仅是下游的receive 
> >> >> >> >> buffer满了,上游的send buffer没满,就不会影响上游,否则就会影响,且影响发生在上游输出结果的时候。
> >> >> >> >> 所以总体来说就是看你buffer的大小,越大就约能碾平反压的影响,否则就约容易受到影响。
> >> >> >> >>
> >> >> >> >> 在 2021-06-15 17:39:26,"yidan zhao"  写道:
> >> >> >> >> >假设任务逻辑为 A => B => C 
> >> >> >> >> >这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。
> >> >> >> >> >
> >> >> >> >> >A是Kafka数据源,数据qps很平滑。
> >> >> >> >> >B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
> >> >> >> >> >C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。
> >> >> >> >> >
> >> >> >> >> >总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)
> >> >> >> >> >
> >> >> >> >> >对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
> >> >> >> >> >C task busy,进而导致B被反压,即B的backpress值很高。
> >> >> >> >> >
> >> >> >> >> >此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
> >> >> >> >> > 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
> >> >> >> >> >但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
> >> >> >> >> >为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。
> >> >> >> >> >
> >> >> >> >> >想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
> >> >> >> >> >还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


Re: Re: Re: Re: Re: 关于反压的问题

2021-06-16 Thread yidan zhao
@东东 帮忙分析个其他异常吧。异常如下图所示,我是standalone集群,每天一会一个报错,目前阶段是这个报错。

yidan zhao  于2021年6月16日周三 下午3:32写道:
>
> 嗯,你这个说法我同意。
>
> 东东  于2021年6月16日周三 下午2:34写道:
> >
> > 就你这个例子,只要buffer足够大,B在6min产生的数据都能放进buffer里,B就感受不到反压的影响,可以继续处理上游的数据啊,只要下一个窗口触发之前,C能处理完buffer中的数据,那么B全程都不会被限制。buffer在send和receive两端都是有的,B只关心自己的send
> >  buffer还能不能写进去。
> >
> >
> > 在 2021-06-16 13:32:52,"yidan zhao"  写道:
> > >下游并不是“很慢”,只是由于下游是接受B输出的,而B的输出是集中窗口出发的,0-5分数据都是在6min触发输出到C。假设C需要1min的时间处理完这波数据。那么6-7min的时间段内,B都处于反压状态,进而A也反压,等到7min后,B才开始继续处理A输出的6-10min的数据,这样就浪费了至少1min的时间呀。B其实从6min就可以继续处理上游数据。
> > >
> > >东东  于2021年6月16日周三 下午12:32写道:
> > >>
> > >>
> > >>
> > >>
> > >> 如果B处理完的数据结构很小,比如你就是一个简单的count,那么buffer里就可以放很多个window,如果这样buffer都满了,说明上下游处理能力不匹配非常严重啊,也许下游永远也别想跟上了,这个时候,你是希望上游慢一点,还是上游保持现在的速度,一直到把内存撑爆(也就是不可控),还是咋的?
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> 在 2021-06-16 11:41:08,"yidan zhao"  写道:
> > >> >不是呀,对于window来说,我每个key假设统计的是pv,pv=1和pv=2的存储空间是一样的,如果上游不发数据过来,这个pv就停滞在某个值不变了,发数据过来也不会导致window的数据空间变大,只是状态值变。
> > >> >我考虑的是window这个sub-taskd单线程模型问题。不是buffer的问题,对于非window类算子这个问题不存在,但是window的触发是基于时间的。这意味着更多的上游数据的到位,加重的主要是cpu压力,而不是内存等状态存储压力。
> > >> >
> > >> >东东  于2021年6月16日周三 上午11:36写道:
> > >> >>
> > >> >>
> > >> >>
> > >> >> 这中间有buffer的存在啊,并不是只要下游处理慢,buffer就一定慢,要看你究竟有多慢,还有上游究竟有多快。
> > >> >>
> > >> >>
> > >> >>
> > >> >> 目前flink是通允许你控制buffer的大小,来调整这个容忍程度的,但这个容忍肯定是有限度。
> > >> >>
> > >> >>
> > >> >> 另外,你似乎在意的是数据buffer在哪里,buffer在window里和buffer在上游,从全局看,肯定是有持久化能力的上游更靠谱。如果buffer在window算子里,那么:
> > >> >> 1、你的checkpoint会很慢,因为aligned checkpoint会等对齐之后才能完成。
> > >> >> 2、如果启用unaligned 
> > >> >> checkpoint的话,checkpoint会变大,且如果没有反压上游,让上游减速的机制的话,这个增大会无法控制。
> > >> >>
> > >> >>
> > >> >> 目前的机制提供了一种有兜底的灵活调整能力,其实是简单合理的设计。
> > >> >>
> > >> >>
> > >> >>
> > >> >> 在 2021-06-16 11:04:53,"yidan zhao"  写道:
> > >> >> >我不计划做分开的尝试,目前只是分析。
> > >> >> >但我还是认为,按照flink单个sub-task只是单线程模型的话,如果这个线程正在请求输出buffer,结果请求不到,挂起等待输出buffer,即被反压。这种情况下肯定是不会处理输入的。
> > >> >> >但实际上对于window这类算子来说,处理输入并不会导致马上更多输出,而只是更新window算子的状态而已(比如pv累加),这么考虑的话,我更希望B被反压时候,仍然继续处理A的输出,而不是不处理进而导致A被反压。
> > >> >> >
> > >> >> >东东  于2021年6月16日周三 上午10:59写道:
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> B到C如何分割成2个任务,你是打算把B里面每个Window的数据都Sink出来,然后在另外一个任务里读进来再用C处理么?
> > >> >> >>
> > >> >> >>
> > >> >> >> 这样做的意义何在呢?仅仅从监控层面看不见反压而已(反压实际上转移到连接B和C的存储上来了),整体的系统开销是上升的,对吞吐和延迟恐怕并没有正面的影响。
> > >> >> >>
> > >> >> >>
> > >> >> >> 对性能影响大的肯定是瓶颈部分,不提高C的处理能力,优化其他部分没有多大意义。
> > >> >> >>
> > >> >> >>
> > >> >> >> 如果只是希望反压的影响更平滑,可以尝试增大buffer,整体开销恐怕还是比增加一个任务要小。
> > >> >> >>
> > >> >> >>
> > >> >> >> 在 2021-06-15 20:03:32,"yidan zhao"  写道:
> > >> >> >> >flink 
> > >> >> >> >的task单线程,我那个问题中,很可能当B被反压时候,线程被卡在向C发送的过程中。这样B本身就无法处理A的输入。进而导致receive
> > >> >> >> >buffer满,然后A被反压。
> > >> >> >> >如上为我的想法,这个也比较坑。如果是这样,那对于我这种类似的case,如果C属于计算复杂的task,B属于window的话。可能需要从B到C之间分割成2个任务。否则性能影响还是比较大的。
> > >> >> >> >
> > >> >> >> >东东  于2021年6月15日周二 下午7:07写道:
> > >> >> >> >>
> > >> >> >> >> flink处理反压,我理解就是靠task之间的send/receive buffer,所以如果仅仅是下游的receive 
> > >> >> >> >> buffer满了,上游的send buffer没满,就不会影响上游,否则就会影响,且影响发生在上游输出结果的时候。
> > >> >> >> >> 所以总体来说就是看你buffer的大小,越大就约能碾平反压的影响,否则就约容易受到影响。
> > >> >> >> >>
> > >> >> >> >> 在 2021-06-15 17:39:26,"yidan zhao"  写道:
> > >> >> >> >> >假设任务逻辑为 A => B => C 
> > >> >> >> >> >这样的task序列,其中B是window类型task,使用时间滚动窗口,假设滚动窗口为5min。
> > >> >> >> >> >
> > >> >> >> >> >A是Kafka数据源,数据qps很平滑。
> > >> >> >> >> >B是window类任务,5min滚动窗口,比如0-5分的数据会在6min触发窗口输出。即B的输入是平滑的,但B的输出是集中在6min的(此处以及后续都以0-5min窗口做示例说明)。
> > >> >> >> >> >C是基于B输出的5min特征的一个计算类task,且复杂性也比较高。
> > >> >> >> >> >
> > >> >> >> >> >总结即: A --平滑输出---> B ---窗口触发集中输出--> C(计算复杂)
> > >> >> >> >> >
> > >> >> >> >> >对于0-5min数据,由于B输出集中在6min,且C本身的计算也比较复杂。因此6min时,C task的busy值很容易升高。
> > >> >> >> >> >C task busy,进而导致B被反压,即B的backpress值很高。
> > >> >> >> >> >
> > >> >> >> >> >此时我想知道的是,对于B来说,如果B在6min时候输出很快输出到了C,那么没有残留,此时B虽然被反压,但本质不应该影响B继续处理A输出的数据。
> > >> >> >> >> > 即使B在6min时的数据无法很快输出到C,即输出一部分就开始处于反压状态。
> > >> >> >> >> >但B0-5分的窗口的触发以及触发后的输出可以卡着,我想知道B的输入是否也可能卡住。
> > >> >> >> >> >为什么需要考虑这个呢,是因为我认为这种情况下,B继续处理5-10分的数据只会影响B的窗口状态,并不会导致B新增更多输出。所以我更期望此时不影响B处理A的输出,即B的反压不传播到A。
> > >> >> >> >> >
> > >> >> >> >> >想问问了解的大佬们,这种情况B的反压是否会传播到A呢?当一个task被反压到100%,是无脑传播到上游,导致上游也反压呢?
> > >> >> >> >> >还是独立的逻辑,比如B的输入buffer本身还是可以及时处理掉的,只是B无法输出而已。因此,B并不会导致A反压呢?


flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
Attachment is the exception stack from flink's web-ui. Does anyone
have also met this problem?

Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
each 28G mem.


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
Hi, here is the text exception stack:

org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection timed out (connection to
'10.35.215.18/10.35.215.18:2045')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:201)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1377)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:907)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:728)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:818)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection timed out

Robert Metzger  于2021年6月16日周三 下午4:26写道:
>
> Hi Yidan,
> it seems that the attachment did not make it through the mailing list. Can
> you copy-paste the text of the exception here or upload the log somewhere?
>
>
>
> On Wed, Jun 16, 2021 at 9:36 AM yidan zhao  wrote:
>
> > Attachment is the exception stack from flink's web-ui. Does anyone
> > have also met this problem?
> >
> > Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> > each 28G mem.
> >


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
2: I use G1, and no full gc occurred, young gc count: 422, time:
142892, so it is not bad.
3: stream job.
4: I will try to config taskmanager.network.retries which is default
0, and taskmanager.network.netty.client.connectTimeoutSec 's default
is 120s。
5: I checked the net fd number of the taskmanager, it is about 1000+,
so I think it is a reasonable value.

1: can not be sure.

Yingjie Cao  于2021年6月16日周三 下午4:34写道:
>
> Hi yidan,
>
> 1. Is the network stable?
> 2. Is there any GC problem?
> 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more 
> information.
> 4. You may try to config these two options: taskmanager.network.retries, 
> taskmanager.network.netty.client.connectTimeoutSec. More relevant options can 
> be found in 'Data Transport Network Stack' section of [2].
> 5. If it is not the above cases, it is may related to [3], you may need to 
> check the number of tcp connection per TM and node.
>
> Hope this helps.
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> [3] https://issues.apache.org/jira/browse/FLINK-22643
>
> Best,
> Yingjie
>
> yidan zhao  于2021年6月16日周三 下午3:36写道:
>>
>> Attachment is the exception stack from flink's web-ui. Does anyone
>> have also met this problem?
>>
>> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
>> each 28G mem.


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
Hi, yingjie.
If the network is not stable, which config parameter I should adjust.

yidan zhao  于2021年6月16日周三 下午6:56写道:
>
> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> 142892, so it is not bad.
> 3: stream job.
> 4: I will try to config taskmanager.network.retries which is default
> 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> is 120s。
> 5: I checked the net fd number of the taskmanager, it is about 1000+,
> so I think it is a reasonable value.
>
> 1: can not be sure.
>
> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >
> > Hi yidan,
> >
> > 1. Is the network stable?
> > 2. Is there any GC problem?
> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more 
> > information.
> > 4. You may try to config these two options: taskmanager.network.retries, 
> > taskmanager.network.netty.client.connectTimeoutSec. More relevant options 
> > can be found in 'Data Transport Network Stack' section of [2].
> > 5. If it is not the above cases, it is may related to [3], you may need to 
> > check the number of tcp connection per TM and node.
> >
> > Hope this helps.
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> > [2] 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >
> > Best,
> > Yingjie
> >
> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >>
> >> Attachment is the exception stack from flink's web-ui. Does anyone
> >> have also met this problem?
> >>
> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> >> each 28G mem.


Re: flink1.11.2集群出现了3种连接拒绝,导致任务失败

2021-06-16 Thread yidan zhao
mark. 我也是第一个问题,暂时无解。

chaiyi  于2021年3月22日周一 下午12:28写道:
>
> 你好:
> 最近建立一个3台机子的flink集群,版本是 zk-3.6.2 + hadoop-3.3.0 + 
> flink-1.11.2。3台机制是在同一个物理机上建立的虚拟机,应该来说不会出现网络波动导致的网络拒绝,但是为什么一直会出现网络拒绝
> 项目在运行一段时间以后,短则几个小时,长则3到5天,任务就会挂掉,一共出现了一下3种异常,全是网络连接方法的,请帮忙看看,是不是flink网络配置方面有问题。
> 1. 集群之间通信连接拒绝:
> 2021-03-03 08:50:42,851 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
> Window(ProcessingTimeSessionWindows(9), ProcessingTimeTrigger, 
> FlightTrackAggregate, FlightTrackSectorResult) -> Sink: Unnamed (4/4) 
> (3097c00c09b475b35c23782a3b4a8eaa) switched from RUNNING to FAILED on 
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4df09503.
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
> readAddress(..) failed: Connection reset by peer (connection to 
> '10.100.1.222/10.100.1.222:43156')
> at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1388)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:730)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:820)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:424)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:326)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>  ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_152]
> Caused by: 
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>  readAddress(..) failed: Connection reset by peer
>
>
> 2. 连接到ZK的请求失败,
> 2021-03-02 20:27:13,487 INFO  
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - 
> Unable to read additional data from server sessionid 0x30018710a580007, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect
> 2021-03-02 20:27:13,588 INFO  
> org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
>  [] - State change: SUSPENDED
> 2021-03-02 20:27:13,590 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - 
> Connection to ZooKeeper suspended. The contender LeaderContender: 
> DefaultDispatcherRunner no longer participates in the leader election.
> 2021-03-02 20:27:13,591 WARN  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService [] - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
> 2021-03-02 20:27:13,591 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - 
> Connection to ZooKeeper suspended. The contender LeaderContender: 
> StandaloneResourceManager no longer participates in the leader election.
> 2021-03-02 20:27:13,591 WARN  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService [] - 
> Connection to ZooKeeper suspended. The contender http://flink-02:8081 no 
> longer participates in the leader election.
> 2021-03-02 20:27:

Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
I also searched many result in internet. There are some related
exception like 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException,
but in my case it is
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException.
It is different in 'LocalTransportException' or
'RemoteTransportException'.

yidan zhao  于2021年6月16日周三 下午7:10写道:
>
> Hi, yingjie.
> If the network is not stable, which config parameter I should adjust.
>
> yidan zhao  于2021年6月16日周三 下午6:56写道:
> >
> > 2: I use G1, and no full gc occurred, young gc count: 422, time:
> > 142892, so it is not bad.
> > 3: stream job.
> > 4: I will try to config taskmanager.network.retries which is default
> > 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> > is 120s。
> > 5: I checked the net fd number of the taskmanager, it is about 1000+,
> > so I think it is a reasonable value.
> >
> > 1: can not be sure.
> >
> > Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> > >
> > > Hi yidan,
> > >
> > > 1. Is the network stable?
> > > 2. Is there any GC problem?
> > > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more 
> > > information.
> > > 4. You may try to config these two options: taskmanager.network.retries, 
> > > taskmanager.network.netty.client.connectTimeoutSec. More relevant options 
> > > can be found in 'Data Transport Network Stack' section of [2].
> > > 5. If it is not the above cases, it is may related to [3], you may need 
> > > to check the number of tcp connection per TM and node.
> > >
> > > Hope this helps.
> > >
> > > [1] 
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> > > [2] 
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> > > [3] https://issues.apache.org/jira/browse/FLINK-22643
> > >
> > > Best,
> > > Yingjie
> > >
> > > yidan zhao  于2021年6月16日周三 下午3:36写道:
> > >>
> > >> Attachment is the exception stack from flink's web-ui. Does anyone
> > >> have also met this problem?
> > >>
> > >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> > >> each 28G mem.


Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
Ok, I will try.

Yingjie Cao  于2021年6月16日周三 下午8:00写道:
>
> Maybe you can try to increase taskmanager.network.retries, 
> taskmanager.network.netty.server.backlog and 
> taskmanager.network.netty.sendReceiveBufferSize. These options are useful for 
> our jobs.
>
> yidan zhao  于2021年6月16日周三 下午7:10写道:
>>
>> Hi, yingjie.
>> If the network is not stable, which config parameter I should adjust.
>>
>> yidan zhao  于2021年6月16日周三 下午6:56写道:
>> >
>> > 2: I use G1, and no full gc occurred, young gc count: 422, time:
>> > 142892, so it is not bad.
>> > 3: stream job.
>> > 4: I will try to config taskmanager.network.retries which is default
>> > 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
>> > is 120s。
>> > 5: I checked the net fd number of the taskmanager, it is about 1000+,
>> > so I think it is a reasonable value.
>> >
>> > 1: can not be sure.
>> >
>> > Yingjie Cao  于2021年6月16日周三 下午4:34写道:
>> > >
>> > > Hi yidan,
>> > >
>> > > 1. Is the network stable?
>> > > 2. Is there any GC problem?
>> > > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more 
>> > > information.
>> > > 4. You may try to config these two options: taskmanager.network.retries, 
>> > > taskmanager.network.netty.client.connectTimeoutSec. More relevant 
>> > > options can be found in 'Data Transport Network Stack' section of [2].
>> > > 5. If it is not the above cases, it is may related to [3], you may need 
>> > > to check the number of tcp connection per TM and node.
>> > >
>> > > Hope this helps.
>> > >
>> > > [1] 
>> > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
>> > > [2] 
>> > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
>> > > [3] https://issues.apache.org/jira/browse/FLINK-22643
>> > >
>> > > Best,
>> > > Yingjie
>> > >
>> > > yidan zhao  于2021年6月16日周三 下午3:36写道:
>> > >>
>> > >> Attachment is the exception stack from flink's web-ui. Does anyone
>> > >> have also met this problem?
>> > >>
>> > >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
>> > >> each 28G mem.


Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。

此外,有个点我不是很清楚,网上这个报错很少,类似的都是
RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。

东东  于2021年6月17日周四 上午11:19写道:
>
> 单机standalone,还是Docker/K8s ?
>
>
>
> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
>
>
>
> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
> >Hi, yingjie.
> >If the network is not stable, which config parameter I should adjust.
> >
> >yidan zhao  于2021年6月16日周三 下午6:56写道:
> >>
> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> >> 142892, so it is not bad.
> >> 3: stream job.
> >> 4: I will try to config taskmanager.network.retries which is default
> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> >> is 120s。
> >> 5: I checked the net fd number of the taskmanager, it is about 1000+,
> >> so I think it is a reasonable value.
> >>
> >> 1: can not be sure.
> >>
> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >> >
> >> > Hi yidan,
> >> >
> >> > 1. Is the network stable?
> >> > 2. Is there any GC problem?
> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for more 
> >> > information.
> >> > 4. You may try to config these two options: taskmanager.network.retries, 
> >> > taskmanager.network.netty.client.connectTimeoutSec. More relevant 
> >> > options can be found in 'Data Transport Network Stack' section of [2].
> >> > 5. If it is not the above cases, it is may related to [3], you may need 
> >> > to check the number of tcp connection per TM and node.
> >> >
> >> > Hope this helps.
> >> >
> >> > [1] 
> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> >> > [2] 
> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >> >
> >> > Best,
> >> > Yingjie
> >> >
> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >> >>
> >> >> Attachment is the exception stack from flink's web-ui. Does anyone
> >> >> have also met this problem?
> >> >>
> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> >> >> each 28G mem.


Re: Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
是的,宿主机IP。

net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_timestamps = 1

东东  于2021年6月17日周四 下午12:52写道:
>
> 10.35.215.18是宿主机IP?
>
> 看一下  tcp_tw_recycle和net.ipv4.tcp_timestamps是什么值
> 实在不行就 tcpdump 吧
>
>
>
> 在 2021-06-17 12:41:58,"yidan zhao"  写道:
> >@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
> >我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。
> >
> >此外,有个点我不是很清楚,网上这个报错很少,类似的都是
> >RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
> >LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。
> >
> >东东  于2021年6月17日周四 上午11:19写道:
> >>
> >> 单机standalone,还是Docker/K8s ?
> >>
> >>
> >>
> >> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
> >>
> >>
> >>
> >> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
> >> >Hi, yingjie.
> >> >If the network is not stable, which config parameter I should adjust.
> >> >
> >> >yidan zhao  于2021年6月16日周三 下午6:56写道:
> >> >>
> >> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> >> >> 142892, so it is not bad.
> >> >> 3: stream job.
> >> >> 4: I will try to config taskmanager.network.retries which is default
> >> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> >> >> is 120s。
> >> >> 5: I checked the net fd number of the taskmanager, it is about 1000+,
> >> >> so I think it is a reasonable value.
> >> >>
> >> >> 1: can not be sure.
> >> >>
> >> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >> >> >
> >> >> > Hi yidan,
> >> >> >
> >> >> > 1. Is the network stable?
> >> >> > 2. Is there any GC problem?
> >> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for 
> >> >> > more information.
> >> >> > 4. You may try to config these two options: 
> >> >> > taskmanager.network.retries, 
> >> >> > taskmanager.network.netty.client.connectTimeoutSec. More relevant 
> >> >> > options can be found in 'Data Transport Network Stack' section of [2].
> >> >> > 5. If it is not the above cases, it is may related to [3], you may 
> >> >> > need to check the number of tcp connection per TM and node.
> >> >> >
> >> >> > Hope this helps.
> >> >> >
> >> >> > [1] 
> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> >> >> > [2] 
> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> >> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >> >> >
> >> >> > Best,
> >> >> > Yingjie
> >> >> >
> >> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >> >> >>
> >> >> >> Attachment is the exception stack from flink's web-ui. Does anyone
> >> >> >> have also met this problem?
> >> >> >>
> >> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 containers,
> >> >> >> each 28G mem.


Re: Re: Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-16 Thread yidan zhao
这啥原理,这个改动我没办法直接改,需要申请。

东东  于2021年6月17日周四 下午1:36写道:
>
>
>
> 把其中一个改成0
>
>
> 在 2021-06-17 13:11:01,"yidan zhao"  写道:
> >是的,宿主机IP。
> >
> >net.ipv4.tcp_tw_reuse = 1
> >net.ipv4.tcp_timestamps = 1
> >
> >东东  于2021年6月17日周四 下午12:52写道:
> >>
> >> 10.35.215.18是宿主机IP?
> >>
> >> 看一下  tcp_tw_recycle和net.ipv4.tcp_timestamps是什么值
> >> 实在不行就 tcpdump 吧
> >>
> >>
> >>
> >> 在 2021-06-17 12:41:58,"yidan zhao"  写道:
> >> >@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
> >> >我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。
> >> >
> >> >此外,有个点我不是很清楚,网上这个报错很少,类似的都是
> >> >RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
> >> >LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。
> >> >
> >> >东东  于2021年6月17日周四 上午11:19写道:
> >> >>
> >> >> 单机standalone,还是Docker/K8s ?
> >> >>
> >> >>
> >> >>
> >> >> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
> >> >> >Hi, yingjie.
> >> >> >If the network is not stable, which config parameter I should adjust.
> >> >> >
> >> >> >yidan zhao  于2021年6月16日周三 下午6:56写道:
> >> >> >>
> >> >> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> >> >> >> 142892, so it is not bad.
> >> >> >> 3: stream job.
> >> >> >> 4: I will try to config taskmanager.network.retries which is default
> >> >> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's default
> >> >> >> is 120s。
> >> >> >> 5: I checked the net fd number of the taskmanager, it is about 1000+,
> >> >> >> so I think it is a reasonable value.
> >> >> >>
> >> >> >> 1: can not be sure.
> >> >> >>
> >> >> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >> >> >> >
> >> >> >> > Hi yidan,
> >> >> >> >
> >> >> >> > 1. Is the network stable?
> >> >> >> > 2. Is there any GC problem?
> >> >> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] for 
> >> >> >> > more information.
> >> >> >> > 4. You may try to config these two options: 
> >> >> >> > taskmanager.network.retries, 
> >> >> >> > taskmanager.network.netty.client.connectTimeoutSec. More relevant 
> >> >> >> > options can be found in 'Data Transport Network Stack' section of 
> >> >> >> > [2].
> >> >> >> > 5. If it is not the above cases, it is may related to [3], you may 
> >> >> >> > need to check the number of tcp connection per TM and node.
> >> >> >> >
> >> >> >> > Hope this helps.
> >> >> >> >
> >> >> >> > [1] 
> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> >> >> >> > [2] 
> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> >> >> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >> >> >> >
> >> >> >> > Best,
> >> >> >> > Yingjie
> >> >> >> >
> >> >> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >> >> >> >>
> >> >> >> >> Attachment is the exception stack from flink's web-ui. Does anyone
> >> >> >> >> have also met this problem?
> >> >> >> >>
> >> >> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 
> >> >> >> >> containers,
> >> >> >> >> each 28G mem.


Re: Re: Re: Re: Re: flink job exception analysis (netty related, readAddress failed. connection timed out)

2021-06-17 Thread yidan zhao
我仔细想了想,我的集群是内网服务器上的容器,容器之间访问应该不算经过NAT。

当然和网络相关的监控来看,的确很多机器的time-wait状态的连接不少,在5w+个左右,但也不至于导致这个问题感觉。

东东  于2021年6月17日周四 下午2:48写道:
>
> 这俩都开启的话,就要求同一源ip的连接请求中的timstamp必须是递增的,否则(非递增)的连接请求被视为无效,数据包会被抛弃,给client端的感觉就是时不时的连接超时。
>
>
>
> 一般来说单机不会有这个问题,因为时钟应该是一个,在NAT后面才容易出现这个现象(因为多个主机时钟通常不完全一致),但不清楚你的具体架构,只能说试一试。
>
>
> 最后,可以跟运维讨论一下,除非确信不会有经过NAT过来的链接,否则这俩最好别都开。
>
>
> PS: kernel 4.1里面已经把 tcp_tw_reuse 这玩意废掉了,因为太多人掉这坑里了
>
>
> 在 2021-06-17 14:07:50,"yidan zhao"  写道:
> >这啥原理,这个改动我没办法直接改,需要申请。
> >
> >东东  于2021年6月17日周四 下午1:36写道:
> >>
> >>
> >>
> >> 把其中一个改成0
> >>
> >>
> >> 在 2021-06-17 13:11:01,"yidan zhao"  写道:
> >> >是的,宿主机IP。
> >> >
> >> >net.ipv4.tcp_tw_reuse = 1
> >> >net.ipv4.tcp_timestamps = 1
> >> >
> >> >东东  于2021年6月17日周四 下午12:52写道:
> >> >>
> >> >> 10.35.215.18是宿主机IP?
> >> >>
> >> >> 看一下  tcp_tw_recycle和net.ipv4.tcp_timestamps是什么值
> >> >> 实在不行就 tcpdump 吧
> >> >>
> >> >>
> >> >>
> >> >> 在 2021-06-17 12:41:58,"yidan zhao"  写道:
> >> >> >@东东 standalone集群。 随机时间,一会一个的,没有固定规律。  和CPU、内存、网络的话有一定规律,但不确认,因为不是很明显。
> >> >> >我排查过几个exception,时间和网络尖刺对上了,但不全能对上,所以不好说是否有这个原因。
> >> >> >
> >> >> >此外,有个点我不是很清楚,网上这个报错很少,类似的都是
> >> >> >RemoteTransportException,然后提示中说taskmager可能已丢失之类。但我的是
> >> >> >LocalTransportException,不清楚netty中这俩错误的含义是不是不一样。目前来看网络上关于这俩异常的资料也查不到什么。
> >> >> >
> >> >> >东东  于2021年6月17日周四 上午11:19写道:
> >> >> >>
> >> >> >> 单机standalone,还是Docker/K8s ?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 这个异常出现的时机,与周期性的,还是跟CPU、内存,乃至网络流量变化相关?
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> 在 2021-06-16 19:10:24,"yidan zhao"  写道:
> >> >> >> >Hi, yingjie.
> >> >> >> >If the network is not stable, which config parameter I should 
> >> >> >> >adjust.
> >> >> >> >
> >> >> >> >yidan zhao  于2021年6月16日周三 下午6:56写道:
> >> >> >> >>
> >> >> >> >> 2: I use G1, and no full gc occurred, young gc count: 422, time:
> >> >> >> >> 142892, so it is not bad.
> >> >> >> >> 3: stream job.
> >> >> >> >> 4: I will try to config taskmanager.network.retries which is 
> >> >> >> >> default
> >> >> >> >> 0, and taskmanager.network.netty.client.connectTimeoutSec 's 
> >> >> >> >> default
> >> >> >> >> is 120s。
> >> >> >> >> 5: I checked the net fd number of the taskmanager, it is about 
> >> >> >> >> 1000+,
> >> >> >> >> so I think it is a reasonable value.
> >> >> >> >>
> >> >> >> >> 1: can not be sure.
> >> >> >> >>
> >> >> >> >> Yingjie Cao  于2021年6月16日周三 下午4:34写道:
> >> >> >> >> >
> >> >> >> >> > Hi yidan,
> >> >> >> >> >
> >> >> >> >> > 1. Is the network stable?
> >> >> >> >> > 2. Is there any GC problem?
> >> >> >> >> > 3. Is it a batch job? If so, please use sort-shuffle, see [1] 
> >> >> >> >> > for more information.
> >> >> >> >> > 4. You may try to config these two options: 
> >> >> >> >> > taskmanager.network.retries, 
> >> >> >> >> > taskmanager.network.netty.client.connectTimeoutSec. More 
> >> >> >> >> > relevant options can be found in 'Data Transport Network Stack' 
> >> >> >> >> > section of [2].
> >> >> >> >> > 5. If it is not the above cases, it is may related to [3], you 
> >> >> >> >> > may need to check the number of tcp connection per TM and node.
> >> >> >> >> >
> >> >> >> >> > Hope this helps.
> >> >> >> >> >
> >> >> >> >> > [1] 
> >> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/batch/blocking_shuffle/
> >> >> >> >> > [2] 
> >> >> >> >> > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/
> >> >> >> >> > [3] https://issues.apache.org/jira/browse/FLINK-22643
> >> >> >> >> >
> >> >> >> >> > Best,
> >> >> >> >> > Yingjie
> >> >> >> >> >
> >> >> >> >> > yidan zhao  于2021年6月16日周三 下午3:36写道:
> >> >> >> >> >>
> >> >> >> >> >> Attachment is the exception stack from flink's web-ui. Does 
> >> >> >> >> >> anyone
> >> >> >> >> >> have also met this problem?
> >> >> >> >> >>
> >> >> >> >> >> Flink1.12 - Flink1.13.1.  Standalone Cluster, include 30 
> >> >> >> >> >> containers,
> >> >> >> >> >> each 28G mem.


Re: flink 提交job后 task 一直是 schedule 状态

2021-06-17 Thread yidan zhao
mark。遇到过,但不清楚啥问题也。

Lei Wang  于2021年6月18日周五 下午1:43写道:
>
> flink-1.11.2
> ./bin/start-cluster.sh 启动然后
> ./bin/flink run examples/streaming/SocketWindowWordCount.jar  --hostname
> localhost --port 
>
> 但是 jobmanger 页面 task 一直是 scheduler 状态,过了一段那时间后输出错误
>
> 2021-06-18 13:34:26,683 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
> Socket Stream -> Flat Map (1/1) (7fc37b6f2e20170da9d95a9b2
> 28577f2) switched from SCHEDULED to FAILED on not deployed.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate the required slot within slot request timeout. Please
> make sure tha
> t the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.11-1.11.2.jar:1
> .11.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$assignResourceOrHandleError$6(DefaultScheduler.java:422)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2
>
> 但是 slot 资源是有的。我在其他的机器上执行这种操作是正常的。
>
> 有大神帮助解释一下吗?
>
> 谢谢,
> 王磊


Re: flink写es和hbase反压

2021-06-17 Thread yidan zhao
推荐使用批量+异步方式写。

田磊  于2021年6月18日周五 下午1:12写道:

>
> 现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。
>
>
>
> *totorobabyfans*邮箱:totorobabyf...@163.com
>
> 签名由
> 网易邮箱大师  定制
>


Re: flink1.12.2 sql session窗口间隔时间段内没有新数据窗口不关闭

2021-06-17 Thread yidan zhao
感觉应该是水印时间没到的原因,和session window本身关系不大。

raofang <295070...@qq.com.invalid> 于2021年6月18日周五 下午12:54写道:
>
> hi,请教大家一个问题:
> flink1.12.2 sql BlinkPlanner
> 使用基于routime的session窗口时,在设置的间隔时间10分钟内没有接收到新数据,窗口没有关闭输出计算结果;但是接收到10分钟之后的新数据时上一次的session窗口才关闭输出结果。不知道是什么原因导致超过间隔时间没有新数据窗口没有关闭的问题呢?
>  谢谢~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink写es和hbase反压

2021-06-18 Thread yidan zhao
注意需要结合flink的异步算子哈,不是随意直接改造成异步请求。

田磊  于2021年6月18日周五 下午2:33写道:
>
> 好的,我试试,谢谢解答!
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年06月18日 14:16,yidan zhao 写道:
> 推荐使用批量+异步方式写。
>
> 田磊  于2021年6月18日周五 下午1:12写道:
>
> >
> > 现在用flink自定义source读取hbase的其中一张表的数据,表中这张表的总数据有三千万条,处理完之后的数据写入es和hbase,但是每次写的时候到一千多万条就出现反压,之前怀疑是es的问题,最后单独写hbase也出现相同的问题,出问题后就一条都不写了,大佬指点一下。日志也没有异常。详见附件。es和hbase都是批量写。source和sink的并行度都是1,中间map算子并行度16。
> >
> >
> >
> > *totorobabyfans*邮箱:totorobabyf...@163.com
> >
> > <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1&name=totorobabyfans&uid=totorobabyfans%40163.com&iconUrl=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png&items=%5B%22%E9%82%AE%E7%AE%B1%EF%BC%9Atotorobabyfans%40163.com%22%5D>签名由
> > 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail88>; 定制
> >


KafkaSource检查点的end to end duration较长(1min)原因

2021-06-20 Thread yidan zhao
如题,我任务的检查点(对齐检查点)大多数时间成功,偶现失败。目前针对超时类失败做了分析,存在部分特点,希望大佬们分析下原因。

(1)KafkaSouce的e2e时间达到1min+,正常xxx
ms就结束了。同时对应e2e达到1min+的情况下,sync、async、alignment、start delay等都为0,偶尔几个x
ms的。   这个不是很明白什么情况会是这样呢?

(2)对于部分task,start delay假设为31s,alignment duration为43s,但是processed
data才xxxKB(几十到几百KB)。从我任务的正常处理情况对比来说,这个数据量几乎不需要时间就能处理完。

(3)有个window算子(前边是hash进来,keyBy的),检查点时间1m14s。然后看了下subtask的检查点,大多数都是2s内完成,其中1个subtask耗时1m14s。这个subtask对应的start-delay为1m13s。
 
这个就更奇怪了,首先前边是keyBy,所以是hash分区方式进入window算子。那么,对于正常subtask0,其start_delay为1s,那么subtask0收到第一个barrier耗时1s,假设这个barrier来自上游算子的
0 
号子任务(preTask0)。那么preTask0既然已经发送了barrier,对于window任务的异常subtask就应该也能很快收到barrier,可是实际却耗时1min14s(start
delay)。


Re: KafkaSource检查点的end to end duration较长(1min)原因

2021-06-21 Thread yidan zhao
存在间断性周期性反压。  因为有window算子,时间窗口触发。

熊云昆  于2021年6月21日周一 下午11:03写道:
>
> 我觉得问题是不是出在反压上面,你的job是不是有反压?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-21 10:38:36,"yidan zhao"  写道:
> >如题,我任务的检查点(对齐检查点)大多数时间成功,偶现失败。目前针对超时类失败做了分析,存在部分特点,希望大佬们分析下原因。
> >
> >(1)KafkaSouce的e2e时间达到1min+,正常xxx
> >ms就结束了。同时对应e2e达到1min+的情况下,sync、async、alignment、start delay等都为0,偶尔几个x
> >ms的。   这个不是很明白什么情况会是这样呢?
> >
> >(2)对于部分task,start delay假设为31s,alignment duration为43s,但是processed
> >data才xxxKB(几十到几百KB)。从我任务的正常处理情况对比来说,这个数据量几乎不需要时间就能处理完。
> >
> >(3)有个window算子(前边是hash进来,keyBy的),检查点时间1m14s。然后看了下subtask的检查点,大多数都是2s内完成,其中1个subtask耗时1m14s。这个subtask对应的start-delay为1m13s。
> > 这个就更奇怪了,首先前边是keyBy,所以是hash分区方式进入window算子。那么,对于正常subtask0,其start_delay为1s,那么subtask0收到第一个barrier耗时1s,假设这个barrier来自上游算子的
> >0 
> >号子任务(preTask0)。那么preTask0既然已经发送了barrier,对于window任务的异常subtask就应该也能很快收到barrier,可是实际却耗时1min14s(start
> >delay)。


rocksdb对比filestatebackend

2021-06-21 Thread yidan zhao
如题,我生产中目前一直都是使用的FileStateBackend,然后使用一个对象存储服务作为后端。

按照我的理解,这种方式下,状态的操作性能很高,都是在内存内部,只有检查点时候才会输出到对象存储中。但是,不支持增量检查点。

RocksDB支持增量检查点,但是缺点是每个状态的操作都是需要序列化/反序列化,至于是文件还是内存操作可能还和rocksdb的块大小,多久刷新等有关。
不过我现在在想,既然我的任务的状态当前使用内存存储,也就是内存存储是能够容纳我的全状态的。
那么是否我从FileStateBackend切换到RocksDB其实性能也不会降低很多呢? 就是牺牲很少性能,换来增量检查点。

此外,还有个点。RocksDB的增量检查点在每次检查点时候,输出到对象存储的部分也是增量?还是全量。
只是rocksdb自身使用增量状态,然后检查点存储的时候是全量到对象存储吗?
还是说,对象存储中存储的ckpt-1,ckpt-2,ckpt-3等也是增量的,即ckpt-3可能依赖ckpt-2等这样。


flink1.12某个jobmanager一直处于leader选举中

2021-06-23 Thread yidan zhao
出问题的Jobmanager日志如下,貌似是被隔离?? 然后重启该Jobmanager后就OK了。

2021-06-24 11:30:18,756 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:30:18,826 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:30:18,897 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:30:18,967 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:31:19,059 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]

2021-06-24 11:31:19,135 WARN  akka.remote.Remoting
[] - Tried to associate with unreachable remote
address [akka.tcp://fl...@bjhw-aisecurity-flink03.bjhw:13141]. Address
is now gated for 50 ms, all messages to this address will be delivered
to dead letters. Reason: [The remote system has quarantined this
system. No further associations to the remote system are possible
until this system is restarted.]


  1   2   3   4   >