背景

    部分算子需要redis查询,为了性能,需要做批量redis查询,通过redis
pipeline机制查询。所以对于输入流A,我会做一个BatchGenerator算子B用于将输入数据转位batch输出,然后算子C查询redis后输出,并且输出时拆分batch为多个单独元素,进入算子D。D算子如果是window统计,...。

问题

    算子D部分会发现收到的元素无timestamp、或timestamp错误,导致程序出错,或者窗口划分错误。

原因分析和解决等


无timestamp是因为BatchGenerator为了实现超时机制(避免输入元素过少,导致长时间无法累积够一个batch),使用到flink的
process time 定时机制(注意仅KeyedStream支持定时器),这导致 onTimer 输出的时候erase掉了event time。
这个问题稍微好解决,换 event time 定时器即可。
    为了避免KeyedStream的限制,也可以换用DataStream,然后超时则通过java.util.Timer()实现,等等。
    假设流A也是某个window的输出,对于2个A输出的元素a和b,带有ts1和ts2的event time(window 输出 event
time是window的maxtimestamp),a和b是不同窗口。  a 和
b进入batchGenerator算子被batch到一起,输出的batch元素event
time是ts2(假设batchSize=2)。这就导致后续window计算会出问题,因为b元素的ts仍然是ts2,但是a元素的ts从ts1变成了ts2,导致窗口划分错误。

如上分析后后,我改造了BatchGenerator,内部不使用一个buffer,改成使用一组buffer,通过Map组织,key为window_ts(可以用于标记window唯一性的值,比如5min窗口则采用yyyyMMddHHmm格式的日志作为window_ts),这样让不同window_ts的元素不会进入相同batch,避免被batch到一起导致ts错乱。这样看起来好像解决了,但实际没有彻底解决。原因1:超时情况,timer超时并无法输出batch,需要靠另一个进入的元素触法,会引入ts错乱。原因2:buffer通过ts组织之后,对于ts1假设输入了1个元素,为避免ts1再也不输入其他元素,因为输出的触发必须不区分ts,也就是说对于ts1对应的buffer虽然有独立的timer控制timeout,但其输出触发却必须允许其他ts的元素进入触发,否则会导致内存泄漏(元素永远无法输出)。这让一切回到原点。

如上,目前经过几个版本更改,发现徒劳了。貌似没有本质解决方法(当然后面讲的方法另外说)。无超时则不需要解决超时导致的问题。但即使不考虑超时机制,batch本身的问题也没办法。

其他解决思路1:考虑processFunc的collector实际是timestampedCollector,在函数部分强制转换collector位timestampedCollector,然后设置timestamp到合理时间(在redis查询完成,分拆输出的时候)?
解决思路2:仔细想想思路1,发现绕来绕去还是需要修正timestamp。这么来看,还不如直接redis算子后边加个
assignTimestamp重新设置ts和watermark生成。


如上,小伙伴们觉得有啥其他思路嘛。 还有就是目前flink貌似不支持仅重设ts而不重设watermark嘛?
之前我倒是实现过一个仅重设watermarkd的,主要通过在processWatermark部分加offset实现。

Reply via email to