Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 yidan zhao
你任务A中的redis和hbase是异步还是同步访问,同步是肯定不行的。ckpt小是因为没啥状态,自然就小,时间长可能是数据对齐时间长,你估计用的是对齐检查点是吧?
 如果换成非对齐检查点,时间应该能降下来,但是状态会变得很大,你可以试试。
最佳做法是,改造成异步的,不能同步。

JasonLee <17610775...@163.com> 于2021年6月4日周五 上午10:57写道:
>
> hi
>
> source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
> 空跑,浪费资源,你只需要把 map 的并行度调大即可.
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 yidan zhao
本质需求是我一个转发任务,本身检查点失败以及任务失败一般都是压力高,所以我希望重启是忽略堆积的数据从最新数据开始消费。我希望任务失败了就自动重启从最新开始继续转发。

yidan zhao  于2021年6月4日周五 上午11:51写道:
>
> 那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web
> UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。
>
> JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道:
> >
> > hi
> >
> > sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 yidan zhao
那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web
UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。

JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道:
>
> hi
>
> sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.
>
>
>
> -
> Best Wishes
> JasonLee
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 JasonLee
hi

sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?

2021-06-03 文章 yidan zhao
这个问题比较复杂,具体最后糊里糊涂的半解决了。大概就是考虑用hashMap,以及最好不要继承,通过组合方式用。比如hashMap作为内层成员,最外边一层不要做成Map。这样可能会解决一定问题。

Lin Hou  于2021年4月1日周四 下午1:55写道:
>
> 你好,请问一下,这个问题是怎么解决的啊?
>
> 赵一旦  于2021年2月3日周三 下午1:59写道:
>
> > 我看Flink的要求是public,每个属性要么public,要么有getter/setter。估计内嵌的属性也会递归检查的。
> >
> > ℡小新的蜡笔不见嘞、 <1515827...@qq.com> 于2021年2月3日周三 下午1:52写道:
> >
> > > 你好,我们是否可以通过对该类LinkedHashMap进行包装来实现当前功能呢?如果你需要PojoSerializer来序列化数据的话。
> > >
> > >
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: "赵一旦" > > 发送时间: 2021年2月3日(星期三) 中午1:24
> > > 收件人: "user-zh" > > 主题: 关于无法被看作POJO的类型,怎么转变可以让其不当作GenericType呢?
> > >
> > >
> > >
> > > 如题,按照flink对POJO的定义,感觉还是比较严格的。
> > >
> > >
> > 我有个类是继承了LinkedHashMap的,就被当作GenericType了。像这种情况,我没办法去修改LinkedHashMap实现,同时也不好不继承。因为我一个实体是动态扩展,不清楚有多少属性的,需要json方式反序列化到Map类型上。
> >


Re: Flink/FlinkSQL如何开启检查点,但自动重启不基于检查点。

2021-06-03 文章 yidan zhao
这个问题拾起来,还有人回答下吗。

yidan zhao  于2021年5月24日周一 上午10:25写道:
>
> 如题,Flink/FlinkSQL如何开启检查点,但自动重启不基于检查点,或者基于检查点重启但忽略kafkaSource的状态。
> 目前Flink部分我自己覆盖了部分实现,可以实现基于检查点重启但忽略KafkaSource的offset状态。
> 现在是FlinkSQL部分,我目前都是设置很大的重启次数,但是自动重启后经常还是慢等导致继续ckpt失败,这个是恶性循环的。所以我目前希望是自动重启后忽略堆积的数据。
> 有个方法是不开启检查点,并设置自动重启。可以实现效果,不过还有个麻烦点是:如果不开启检查点,目前就没办法从 flink 的 web-ui 
> 上看出任务是否经历过重启,比如ckpt_restored这个指标。我之前都是基于这个指标知道任务已经重启多少次的,我虽然希望任务能自动重启,并忽略堆积的数据,但偶尔人工看的时候还是需要知道任务是什么运行情况的。


关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 yidan zhao
如题,按照官方文档的kafka source部分,有如下配置说明。

scan.startup.mode : optionalgroup-offsetsStringStartup mode for Kafka
consumer, valid values are 'earliest-offset', 'latest-offset',
'group-offsets', 'timestamp' and 'specific-offsets'. See the following
Start Reading Position for more details.

其中Reading Positions部分说明如下:

The config option scan.startup.mode specifies the startup mode for
Kafka consumer. The valid enumerations are:

`group-offsets`: start from committed offsets in ZK / Kafka brokers of
a specific consumer group.
`earliest-offset`: start from the earliest offset possible.
`latest-offset`: start from the latest offset.
`timestamp`: start from user-supplied timestamp for each partition.
`specific-offsets`: start from user-supplied specific offsets for each
partition.

可见,latest-offset和group-offsets是2个配置,所以我配置latest-offset肯定是从最新部分开始消费的,而不管使用的说明group
id,以及这个group id已提交的offset,这个估计没问题。

然后我想知道的是:带有latest-offset这个配置的情况下,sql任务自动重启基于检查点的情况呢?是否从最新消费,还是基于检查点的offset消费。

对于flink stream中实现,我知道是从检查点offset的,为此我还覆盖过实现。  现在想知道下sql部分实现是否也类似,不想去查sql部分,不熟悉。


Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 JasonLee
hi

source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
空跑,浪费资源,你只需要把 map 的并行度调大即可.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


开启checkpoint,kafka事务OutOfOrderSequenceException

2021-06-03 文章 SmileSmile
Dear all:
  flink版本是1.12.4,kafka版本是1.1.1。作业topology很简单,source-->flatmap--->sink 
,在开启checkpoint,作业运行几个小时后会报错。报错内容如下


Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failed to send data to Kafka: The broker received an out of order sequence 
number.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
 ~[LogAnalyseFlink-1.0.jar:?]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
 ~[LogAnalyseFlink-1.0.jar:?]
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
 




product没有特殊的配置, 默认使用的是AT_LEAST_ONCE semantic
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5");

properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
"1");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "2000");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "524288");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
"6");


如果报错了,就得把作业停掉,等事务15分钟后过期启动程序才可以恢复。 目前没有排查问题的思路,需要帮助。

Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
@JasonLee 谢谢回复

A job 的背压情况如下图

 

我清楚job处理数据速度的确赶不上消息的生产速度这一事实,但暂时想不到一个合理的解决办法,并行度都已经设置的比较大了(从等于topic分区数量已经调整为大于partition数量了)。

我把各个task的并行度设置是一样的,让他们链在一个task上,从而优化线程切换的性能
其中 Map算子是最耗时的,所有的逻辑和数据加工都在这个Map算子,前后两个Flat Map
都是简单的将List数据扁平化而已,没有什么耗时操作。开始它们的并行度我设25(topic01
partition数量、消息速率是:1000~2000条/s),后面直接改成80,但并没有明显效果。





-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

mysql主从切换导致通过flink mysql-cdc消费binlog 点位出错

2021-06-03 文章 董建
由于各种原因,dba进行了数据库主从切换。
目前我采用flink mysql-cdc采集binlog,但是数据库主从切换后,导致binlog的pos不一致。
flink 程序会自动重启,在经过配置的重启策略后就会挂掉,日志打印
org.apache.kafka.connect.errors.ConnectException: The connector is trying to 
read binlog starting at GTIDs 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004 
and binlog file 'mysql-bin.000650', pos=521310219, skipping 2 events plus 1 
rows, but this is no longer available on the server. Reconfigure the connector 
to use a snapshot when needed.


由于pos=521310219在新的数据库服务器上位置不对,flink最后一次自动保存的checkpoint已经存储了pos=521310219,导致通过flink
 -s  的方式无法接着继续消费,并且job无法成功启动。
不知道大家有什么好的办法解决这个问题?





退订

2021-06-03 文章 李朋辉
退订


| |
李朋辉
|
|
邮箱:lipengh...@126.com
|

签名由 网易邮箱大师 定制

退订

2021-06-03 文章 李朋辉





| |
李朋辉
|
|
邮箱:lipengh...@126.com
|

签名由 网易邮箱大师 定制

- 转发的邮件 -

发件人: Fighting
发送日期: 2021年06月02日 11:00
收件人: user-zh
抄送人:
主题: 退订
退订

Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 HunterXHunter
一般是 Job A出现背压了,checkpoint的时候是要等背压的数据都处理完了才会处理barrier。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 JasonLee
hi

你理解的可能有点偏差,应该是因为任务出现了反压或者数据倾斜的问题导致了cp时间长,01消息堆积说明已经反压到source端了,需要先定位反压的位置,看是具体什么原因导致的,然后再根据情况解决.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 fileSystem source table 读取 fileSystem sink table 分区错误问题

2021-06-03 文章 范未太
1.问题描述

基于flink filesystem connect 创建create table  source_test(id string,name string 
dayno sring,`hour` string)  partitioned (dayno ,`hour`) 
with('connector'='filesystm',path='x/data/')
报错堆栈如下:
|
ava.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.CommandLineWrapper.main(CommandLineWrapper.java:66)
Caused by: java.util.NoSuchElementException: key not found: hour
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:155)
at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow$1.apply(PartitionPruner.scala:153)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$.org$apache$flink$table$planner$plan$utils$PartitionPruner$$convertPartitionToRow(PartitionPruner.scala:153)
at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:130)
at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$$anonfun$prunePartitions$1.apply(PartitionPruner.scala:129)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
org.apache.flink.table.planner.plan.utils.PartitionPruner$.prunePartitions(PartitionPruner.scala:129)
at 
org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.internalPartitionPrune$1(PushPartitionIntoLegacyTableSourceScanRule.scala:134)
at 
org.apache.flink.table.planner.plan.rules.logical.PushPartitionIntoLegacyTableSourceScanRule.onMatch(PushPartitionIntoLegacyTableSourceScanRule.scala:144)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562)
at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223)
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
at 

回复: flink 1.13.0 中cumulate window 使用

2021-06-03 文章 邹云鹤
hello 大佬, 
我现在 使用 cumulate 的SQL 如下:insert into `test_out` select a.uid, 'dt', max(a.age) 
from 
TABLE(
CUMULATE(TABLE test_in, DESCRIPTOR(proctime), INTERVAL '1' MINUTES, INTERVAL 
'1' hours)) as a group by uid, window_start, window_end;


是可以运行了,但是发现每次窗口触发, 通过JDBC Sink 写入到数据库执行的都是insert 操作, 如果这个地方需要根据key 
在数据库里面进行update 操作,使用CUMULATE WINDOW 可以实现吗?该怎么用这个SQL?
| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制
在2021年5月28日 11:51,Leonard Xu 写道:
Hi,

Cumulate window 是基于 window TVF 语法的,和之前的 groupWindowFunction 不一样, 你可以参考 [1]
Window TVF 也支持了 tumble window, hop window, 并且性能更优,session window 预计会在1.14支持, 
session window 有需要可以使用老的语法。

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-agg/#windowing-tvfs


在 2021年5月28日,11:43,邹云鹤  写道:

insert into `test_out` select a.uid, 'dt', max(a.age) from `test_in` as a group 
by uid, cumulate(PROCTIME(), interval '1' minute, interval '1' hour);


hello, 各位大佬, flink 1.13.0 中流式作业中该怎么使用cumulate window呢?我用上面的SQL 貌似是不行, 
有没有使用过的大佬给点建议?




| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制


scala2.12错误:Error: scala/Product$class

2021-06-03 文章 maker_d...@foxmail.com
我使用maven构建了一个scala2.12的flink项目
我希望使用flink消费kafka,但项目运行时报错:

scalac: Error: scala/Product$class
java.lang.NoClassDefFoundError: scala/Product$class
at 
org.apache.flink.api.scala.codegen.TypeDescriptors$RecursiveDescriptor.(TypeDescriptors.scala:155)
at 
org.apache.flink.api.scala.codegen.TypeAnalyzer$UDTAnalyzerInstance$UDTAnalyzerCache$$anonfun$getOrElseUpdate$2.apply(TypeAnalyzer.scala:479)
at 
org.apache.flink.api.scala.codegen.TypeAnalyzer$UDTAnalyzerInstance$UDTAnalyzerCache$$anonfun$getOrElseUpdate$2.apply(TypeAnalyzer.scala:478)
at scala.Option.getOrElse(Option.scala:138)
...

pom文件相关依赖:
1.12.3
2.12
2.12.8
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
provided




org.scala-lang
scala-library
${scala.version}
provided






org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}

flink消费kafka代码:...val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xx:9092")
val kafkaConsumer = new FlinkKafkaConsumer[String]("xxx", new 
SimpleStringSchema(), properties)
kafkaConsumer.setStartFromGroupOffsets()
val kafkaDataStream: DataStream[String] = env.addSource(kafkaConsumer)
...在网上搜索基本都是说依赖版本问题,但我检查了lib,与scala相关的都是2.12版本的。找不到问题在哪儿,请各位指教,谢谢!



maker_d...@foxmail.com


回复: flink 1.13.0 中cumulate window 使用

2021-06-03 文章 邹云鹤
大佬, 你好


| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制
在2021年5月28日 11:52,邹云鹤 写道:
好的,我再研究下。


| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制
在2021年5月28日 11:51,Leonard Xu 写道:
Hi,

Cumulate window 是基于 window TVF 语法的,和之前的 groupWindowFunction 不一样, 你可以参考 [1]
Window TVF 也支持了 tumble window, hop window, 并且性能更优,session window 预计会在1.14支持, 
session window 有需要可以使用老的语法。

Best,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-agg/#windowing-tvfs


在 2021年5月28日,11:43,邹云鹤  写道:

insert into `test_out` select a.uid, 'dt', max(a.age) from `test_in` as a group 
by uid, cumulate(PROCTIME(), interval '1' minute, interval '1' hour);


hello, 各位大佬, flink 1.13.0 中流式作业中该怎么使用cumulate window呢?我用上面的SQL 貌似是不行, 
有没有使用过的大佬给点建议?




| |
邹云鹤
|
|
kevinyu...@163.com
|
签名由网易邮箱大师定制


Re: 回复:Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
@lian 谢谢回复

我通过webui查询,没有背压的情况。
hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。
谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
@lian 谢谢回复

我通过webui查询,没有背压的情况。
hbase性能这块确实存在一定问题,公司的hbase性能一直不佳,但我的程序中,是先从缓存redis中取,大部分数据都能查到,只有少部分会查hbase。
谢谢你提供的思路,我将会从代码级别考虑,看是否能优化相关逻辑



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 lian
排查一下任务在执行过程中,是否有背压,以及在ck过程中,buffer积压了多少数据量。
很可能是在访问hbase的过程,性能不是很好。


在2021年06月03日 15:27,Jacob 写道:
Dear all,

我有一个两个Flink Job A和B

A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02

其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。

B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc
file,state大小几百兆,但耗时是秒级别。

我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。

请各位指教



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 Jacob
Dear all,

我有一个两个Flink Job A和B

A job任务是消费kafka topic01数据,经过一系列逻辑加工,最终将数据sink到topic02

其中加工大致过程是:消费到topic01消息后,根据数据相关字段查询redis、查询hbase,然后组装业务数据(过程比较复杂),然后将业务数据写到另一个topic02,30s做一次checkpoint,state大小只有几十kb,但做一次checkpoint平均需要两分钟,导致topic01消息产生堆积,实时性降低。

B job任务简单,消费上一步的的业务数据topic02,开一个半个小时的窗口将数据进行聚合(keyby、max)之后写到orc
file,state大小几百兆,但耗时是秒级别。

我比较疑惑的是为什么A job的state那么小,但checkpoint却很耗时,不知道从哪个角度去优化该问题。

请各位指教



-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/