Re: checkpoint stage size的问题

2019-06-26 文章 Yun Tang
你好

从附件的web监控看,其实你的整体checkpoint state其实很小(只有20几MB),所以对于这个问题其实有些过度关注了。
关于checkpoint state的变化,需要观察不同operator的情况,可以点开详细页看每个并发的情况。对比operator 
state和window所使用的keyed state的变化情况。我估计keyed state部分会有些许波动,主要是因为你使用的是RocksDB state 
backend,其实上传的是rocksDB的sst文件,当register timer时,window 
state会进行存储,当onTimer时,相关state会取出并更新或者删除,这里涉及到一个写放大和compaction的问题,rocksDB对某个key的删除不会直接对应物理上的存储的立刻减少。

祝好
唐云

From: ReignsDYL <1945627...@qq.com>
Sent: Wednesday, June 26, 2019 17:38
To: user-zh@flink.apache.org
Subject: Re: checkpoint stage size的问题

这是web ui的监控




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Flink如何实现Job间的协同联系?

2019-06-26 文章 徐涛
Hi 军长,
谢谢您的回复。

对于问题“如果上游逻辑更改,重新跑数据,那么可能会存在最开始的那天数据不完整,导致污染下游数据”。我觉得这个问题对于想拿Flink做一些规模稍大的系统(或者说基于Kappa架构的设计)可能都会遇到相同的问题。如果对于跑批的场景,因为上游重跑可以覆盖中间结果,下游可以拿到更新后的数据并进行计算;但是流式计算Job的中间结果落盘于Kafka,而且下游的Job已经累积了一些状态,这个时候上游的计算逻辑如果发生了更改,如果还是写到同一个Kafka
 topic,那么很难保证下游数据的正确性;如果写到不同的Kafka topic,那么下游的实时任务可能都需要重启,这样操作的负担会很重。 
不知道一般对于实时数仓来说是怎样解决这样的问题的?


谢谢!
徐涛






在 2019-06-21 21:21:34,"Hequn Cheng"  写道:
>Hi 徐涛,
>
>最后四个问题,3和4和业务逻辑关系大些,我大致说下1和2的想法,
>
>1. 因为每个变化都要发一个message,Kafka中的消息数可能很大。
>这个问题,可以借助一些攒批的策略来减少数据量,比如在自定义的sink里加些cache,每1000条刷一次,刷的时候对这1000条按key去重。需要注意的是,攒批需要结合checkpoint一起来考虑,防止丢数据,可以参考下JDBCOutputFormat的实现(做checkpoint的时候需要flush)。此外,blink里面aggregate也有miniBatch的逻辑,也会减少输出的数据量(不过,flink里面还没有miniBatch)。
>
>2. 下游做幂等的时候,因为要对每个Key做Group by,因此消耗的资源也很大。
>这个是的,目前应该没有很好的办法。后期,如果支持了RetractSource,下游job可以不用再做groupBy+last。只需要上游job用RetractSink输出存一份数据到Kafka。
>
>一些其他问题:
>> ① 需要实现一个retract kafka sink
>这里应该是需要实现一个upsert kafka sink,目前flink还没法输入retract message。
>
>Best,Hequn
>
>On Wed, Jun 19, 2019 at 1:18 PM 徐涛  wrote:
>
>> 大家好,
>>
>> 我这边想做一个实时数仓项目。但随着Flink间Job的数量越来越多,发现很多Job之间的代码存在大量的重复,且迫切需要一个类似于Batch的中间层解决方案来减少冗余,增加整体的清晰度和层次感。
>> 我这边能想到的解决方案是:采用Kafka作为Job之间的联系纽带,例如有两个Job
>> Job_1:   从原始kafka topic TOPIC_ORIG获取数据, 进行一定的业务逻辑处理后,写到另一个kafka topic,
>> TOPIC_JOB_1_SINK 。注意
>>① 需要实现一个retract kafka sink
>>② 没有使用kafka exactly-once sink
>>③ TOPIC_JOB_1_SINK 中的每条记录应该有一个 unique key.
>>④ 每个Key相同的记录应该被发往相同的kafka partition.
>> Job_2:从TOPIC_JOB_1_SINK读取数据, 接着做幂等(先对唯一Key做group by取最新),
>> 然后运行Job_2的逻辑 , 最后把数据写道最终Sink中(例如es, hbase, mysql)。 之所以要对unique
>> key做一轮幂等处理,因为Job_1可能会失败重试,TOPIC_JOB_1_SINK中可能会有一些重试脏数据。
>>
>>
>> 从整体上看,结构大概如下图所示:
>> Job_1Job_2
>> -
>>
>>  
>> ---
>> |  TOPIC_ORIG -> Logic_Job_1 -> TOPIC_JOB_1_SINK  |   ——>
>>| TOPIC_JOB_1_SINK -> GROUP_BY_UNIQUE_KEY_GET_LATEST -> Logic_Job_2
>> -> FINAL_JOB_2_SINK|
>> -
>>
>>  
>> ---
>>
>>
>> 即:每个Job往下游发送的数据整体有唯一Key;每个下游需要对上游发来的数据做幂等处理。
>> 但是,可能存在的问题有:
>> 1. 因为每个变化都要发一个message,Kafka中的消息数可能很大。
>> 2. 下游做幂等的时候,因为要对每个Key做Group by,因此消耗的资源也很大。
>> 3. 如果上游逻辑更改,重新跑数据,那么可能会存在最开始的那天数据不完整,导致污染下游数据
>> 4. 如果上游逻辑更改,重新跑数据,但是某条数据这个时候已经不应该出现了,下游的数据得不到更新
>>
>>
>> 请问大家是如何管理Flink Job之间的依赖或者说血缘关系的?有没有比较好的方案?
>> 谢谢大家!
>>
>>
>> 谢谢
>> 徐涛


Re: checkpoint stage size的问题

2019-06-26 文章 ReignsDYL
这是web ui的监控
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: checkpoint stage size的问题

2019-06-26 文章 ReignsDYL
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: checkpoint stage size的问题

2019-06-26 文章 ReignsDYL
老师你好,首先感谢你在百忙之中回复我。
我这面观察到的现象是,当有数据流入时,每个checkpoint的stage
size比上一个checkpoint多几百k左右,只要数据持续流入,这个stage
size就一直增长,当没有数据流入时,checkpoint的stage size就维持不变了,再有数据流入时,stage
size就在原来基础上继续增长。

数据流:
SingleOutputStreamOperator studentSubjectStream =
dataStream
.filter(new Question2SubjectFilter())
.keyBy(new TaskStudentSubjectKeySelector())
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new StudentSubjectScoreAgg());
   
studentSubjectStream.addSink(getKafkaProducer(KafkaTopic.STUDENT_SUBJECT_AGG.getTopic(),
StudentAggResult.class));

聚合函数:
public abstract class BaseAgg implements
AggregateFunction {

public abstract R create(T input);
public abstract void merge(R aggResult, T t);

@Override
public R createAccumulator() {
return null;
}

@Override
public R add(T t, R aggResult) {
if (aggResult == null) {
aggResult = create(t);
}
merge(aggResult, t);
return aggResult;
}

@Override
public R getResult(R aggResult) {
return aggResult;
}

@Override
public R merge(R aggResult, R acc1) {
if (acc1 == null) {
return aggResult;
}
if (aggResult == null) {
return acc1;
}
aggResult.merge(acc1);
return aggResult;
}
}

checkpoint配置:
env.enableCheckpointing(5000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);
   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

状态存储通过rocksdb。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: checkpoint stage size的问题

2019-06-26 文章 Yun Tang
你好

这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state 
size变大有几个原因:

  1.  上游数据量增大。
  2.  window设置时间较长,尚未触发,导致window内积攒的数据比较大。
  3.  window的类型决定了所需要存储的state size较大。

可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state 
size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽泛的建议。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations

祝好
唐云

From: ReignsDYL <1945627...@qq.com>
Sent: Wednesday, June 26, 2019 14:22
To: user-zh@flink.apache.org
Subject: checkpoint stage size的问题

各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: checkpoint stage size的问题

2019-06-26 文章 ReignsDYL
我发现窗口的trigger只进行了fire,并没有进行purge,我不清楚是不是这个原因,或者还是有其他的原因。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

来自小乐的邮件

2019-06-26 文章 小乐


checkpoint stage size的问题

2019-06-26 文章 ReignsDYL
各位好,我的项目的流计算模型source(kafka)->filter->keyby->window->aggregate->sink(hbase),现在发现window的subtask的checkpoint的stage
size越来越大,请问是什么原因啊?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


关于使用Flink建设基于CDC方式的OGG数据湖

2019-06-26 文章 唐门小师兄
一:背景描述现有数据中心基于GP做数仓,但是在OGG数据到GP做贴源ODS过程耗费集群太多资源,导致集群性能瓶颈,故想考虑基于GP + 
HADOOP的架构来重构数据仓库。
二:思路将入库贴源ODS的工作交由Hadoop生态来完成,数据从OGG实时流入kafka时,都是类似DDL中的"I, U ,D"类型数据, 
然后Flink采用动态表方式找出每条主键的最新记录,内存中的动态表将按天写入HDFS, 
然后GP通过外部表的方式,将今天的数据加载到GP中与存量做meger更新,这样既完成性能瓶颈的调优, 数据流转图简要如下:
三:问题1、现采用新技术路线,在kafka数据处理后转为toRetractStream后,过滤出tuple中f0为true的记录, 
按天写入BucketingSink, 
本意是想将一天中的每个主键的最新记录取出,结果所有曾经标记为TRUE的记录都取出了,想请教是否有解决办法?(PS:幂等存储如HBase, 
ElasticSearch已测试过,是可以更新替换,然后每个主键只保留一条最新的操作,但是数据链路变长,增加了开发维护成本)
(1)输入:{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 
16:21:56.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"0524730317232030","DLBS":"1302","RYBS":"10200709","DLLX":"2","DLZH":"wucl1...@lzlz.gx.csg.cn","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09
 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 
15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 
15:33:48","ZHFWSJ":"2019-05-08 
15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08
 
16:21:57.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"0524730317232031","DLBS":"1302","RYBS":"10200709","DLLX":"2","DLZH":"wucl1...@lzlz.gx.csg.cn","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09
 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 
15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 
15:33:48","ZHFWSJ":"2019-05-08 
15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}
(2)处理代码如下:---
输入数据会在toRetractStream处理,产生3条记录,依次为:
true,   false,...   true,
(3)输出如下:{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 
16:21:56.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"0524730317232030","DLBS":"1302","RYBS":"10200709","DLLX":"2","DLZH":"wucl1...@lzlz.gx.csg.cn","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09
 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 
15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 
15:33:48","ZHFWSJ":"2019-05-08 
15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08
 
16:21:57.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"0524730317232031","DLBS":"1302","RYBS":"10200709","DLLX":"2","DLZH":"wucl1...@lzlz.gx.csg.cn","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09
 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 
15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 
15:33:48","ZHFWSJ":"2019-05-08 
15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}
(4)期望输出:{"table":"04.PMS_GXGL.XT_DLRZ","op_type":"U","op_ts":"2019-05-08 
16:21:57.081124","current_ts":"2019-05-09T11:19:43.103001","pos":"0524730317232031","DLBS":"1302","RYBS":"10200709","DLLX":"2","DLZH":"wucl1...@lzlz.gx.csg.cn","YHXM":"小明","DLIP":"10.103.158.98","FWQIP":"10.100.80.120","FWQMC":null,"ZXSJ":"2019-05-09
 00:21:48","ZZBM":"0402031001","DQBM":"信息","CJSJ":"2019-05-08 
15:33:48","CZSJ":"2019-05-09 00:21:48","DLSJ":"2019-05-08 
15:33:48","ZHFWSJ":"2019-05-08 
15:33:48","JLRZSJ":"2019-06-18T11:11:38.122Z","SBCS":"2","XYSC":"240","JDMC":"yxscapp03_02"}