TTL 好像不支持 TimeCharacteristic.EventTime 方式


在 2020-01-08 14:17:11,"USERNAME" <oracle...@126.com> 写道:
>我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
>
>
>
>
>
>在 2020-01-07 19:51:57,"huoguo" <greemqq...@163.com> 写道:
>>
>>
>>过期数据能通过TTL 设置过期吗?
>>
>>> 在 2020年1月7日,17:54,USERNAME <oracle...@126.com> 写道:
>>> 
>>> 各位好!
>>> 祝大家新年快乐!
>>> 
>>> 
>>> 
>>> 
>>> --版本
>>> FLINK 1.9.1 ON YARN
>>> 
>>> 
>>> --过程
>>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>>> --问题
>>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>>> 这种计算场景有更好的计算方法吗?
>>> 
>>> 
>>> --部分代码
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>> 
>>> new ProcessWindowFunction{
>>> public void process(Tuple tuple, Context context, Iterable<StringBean> 
>>> elements, Collector<String> out) throws Exception {
>>> for (Iterator<StringBean> iter = elements.iterator(); iter.hasNext(); ) {
>>> ....
>>> iter.remove();
>>> }
>>> }
>>> ....
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>

回复