Hi marble, 看到你是在 window 内一直使用 agg 累加的,所以可以使用 filesystem backend 加速,但是可能内存会相对耗的比较多。因为rocksdb backend的话,每一条数据都会有一次put 和 get 的 IO 操作,故会比较慢些。 至于你提到的为什么 24h size,2s slide 的窗口没有延迟,5 min,1s 的连续 trigger 缺延迟了。这两者的行为不一样,其实没有什么可比的。 对于第二种,trigger 是依靠 timer 注册触发的,这样的话每秒都需要进行触发(如果是 process time),这样可能会太密集了。
Best, Hailong Wang 在 2020-10-28 16:21:24,"marble.zh...@coinflex.com.INVALID" <marble.zh...@coinflex.com.INVALID> 写道: >大家好。 > >我用的tumbling window, >ds.keyBy(CandleView::getMarketCode) > .timeWindow(Time.minutes(5L)) > >.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) > .aggregate(new OhlcAggregateFunction(), new >OhlcWindowFunction()) > .addSink(new PgSink(jdbcUrl, userName, password, >candle_table_5m)) > .name(candle_table_5m); > >Sliding Window: > >ds.keyBy(CandleView::getMarketCode) > .timeWindow(Time.hours(24L), Time.seconds(2)) > .aggregate(new OhlcAggregateFunction(), new >TickerWindowFunction()) > .addSink(new PgSink(jdbcUrl, userName, password, >candle_table_24h)) > .name(candle_table_24h); > >一个是基于5分钟的窗口,一个是基于24小时的sliding窗口,24小时的窗口都已经update到了最新时间,但5分钟的滞后了越来越长时间,job运行不到2小时,已经滞后快20分钟,即将近4个窗口。 >基于的是同一个dataStream > >有没有什么建议,或者哪个地方用错了? 谢谢 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/