关于“批”处理元素导致batch的event timestamp变化,后接window统计错乱的问题。

2022-02-18 文章 yidan zhao
背景

部分算子需要redis查询,为了性能,需要做批量redis查询,通过redis
pipeline机制查询。所以对于输入流A,我会做一个BatchGenerator算子B用于将输入数据转位batch输出,然后算子C查询redis后输出,并且输出时拆分batch为多个单独元素,进入算子D。D算子如果是window统计,...。

问题

算子D部分会发现收到的元素无timestamp、或timestamp错误,导致程序出错,或者窗口划分错误。

原因分析和解决等


无timestamp是因为BatchGenerator为了实现超时机制(避免输入元素过少,导致长时间无法累积够一个batch),使用到flink的
process time 定时机制(注意仅KeyedStream支持定时器),这导致 onTimer 输出的时候erase掉了event time。
这个问题稍微好解决,换 event time 定时器即可。
为了避免KeyedStream的限制,也可以换用DataStream,然后超时则通过java.util.Timer()实现,等等。
假设流A也是某个window的输出,对于2个A输出的元素a和b,带有ts1和ts2的event time(window 输出 event
time是window的maxtimestamp),a和b是不同窗口。  a 和
b进入batchGenerator算子被batch到一起,输出的batch元素event
time是ts2(假设batchSize=2)。这就导致后续window计算会出问题,因为b元素的ts仍然是ts2,但是a元素的ts从ts1变成了ts2,导致窗口划分错误。

如上分析后后,我改造了BatchGenerator,内部不使用一个buffer,改成使用一组buffer,通过Map组织,key为window_ts(可以用于标记window唯一性的值,比如5min窗口则采用MMddHHmm格式的日志作为window_ts),这样让不同window_ts的元素不会进入相同batch,避免被batch到一起导致ts错乱。这样看起来好像解决了,但实际没有彻底解决。原因1:超时情况,timer超时并无法输出batch,需要靠另一个进入的元素触法,会引入ts错乱。原因2:buffer通过ts组织之后,对于ts1假设输入了1个元素,为避免ts1再也不输入其他元素,因为输出的触发必须不区分ts,也就是说对于ts1对应的buffer虽然有独立的timer控制timeout,但其输出触发却必须允许其他ts的元素进入触发,否则会导致内存泄漏(元素永远无法输出)。这让一切回到原点。

如上,目前经过几个版本更改,发现徒劳了。貌似没有本质解决方法(当然后面讲的方法另外说)。无超时则不需要解决超时导致的问题。但即使不考虑超时机制,batch本身的问题也没办法。

其他解决思路1:考虑processFunc的collector实际是timestampedCollector,在函数部分强制转换collector位timestampedCollector,然后设置timestamp到合理时间(在redis查询完成,分拆输出的时候)?
解决思路2:仔细想想思路1,发现绕来绕去还是需要修正timestamp。这么来看,还不如直接redis算子后边加个
assignTimestamp重新设置ts和watermark生成。


如上,小伙伴们觉得有啥其他思路嘛。 还有就是目前flink貌似不支持仅重设ts而不重设watermark嘛?
之前我倒是实现过一个仅重设watermarkd的,主要通过在processWatermark部分加offset实现。


[ANNOUNCE] Apache Flink 1.13.6 released

2022-02-18 文章 Konstantin Knauf
The Apache Flink community is very happy to announce the release of Apache
Flink 1.13.6, which is the fifth bugfix release for the Apache Flink 1.13
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:

https://flink.apache.org/news/2022/02/09/release-1.13.6.html

The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351074

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,

Konstantin

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re:退订

2022-02-18 文章 但宝平
退订


Re:flink 不触发checkpoint

2022-02-18 文章 RS
1. 图片挂了,看不到,尽量用文字,或者用图床等工具
2. 启动任务有配置checkpoint吗?










在 2022-02-17 11:40:04,"董少杰"  写道:

flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!






| |
董少杰
|
|
eric21...@163.com
|