是要在.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16))) 
后面加一个自定义Trigger,对每一个元素触发。我自定义的Trigger如下:
public class WindowTrigger extends Trigger
@Override
public TriggerResult onElement(final T element, final long timestamp, final 
TimeWindow window, final TriggerContext ctx) {
return TriggerResult.FIRE;
}

@Override
public TriggerResult onProcessingTime(final long time, final TimeWindow window, 
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(final long time, final TimeWindow window, 
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public void clear(final TimeWindow window, final TriggerContext ctx) {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}

然后可以这样用:

… ...
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
.tirgger(new WindowTrigger<>())
… ...

*********************
在 2019年3月6日 +0800 11:52,王涛@深瞳云 <taow...@deepglint.com>,写道:
> 你好,如果是这样的需求:“按一天统计某一个key上有多少条数据,统计结果每五分钟输出更新一次”的话,
> 我认为可以这样:
>  
> 在一个一天的windows中做Tupel2数据的reduce,然后在下游接一个五分钟的ProcessTimeWindow,在这个五分钟的windwos中做evictor(CountEvictor.of(1)),然后输出。
>  比如这样:
> streamOperator
>   .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>() 
> {
>   @Override
>   public long extractAscendingTimestamp(EventItem eventItem) {
>    return eventItem.getWindowEnd();
>   }
>   })
>   .map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
>   .keyBy(1)
>    // 东八区零点到23:59:59:999的滑动事件时间窗口
>   .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
>   // 在window中key上的消息条数
>   .reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))
>
>   // 在5分钟的ProcessTime滑动窗口里,只取最后一条输出
>   .keyBy(1)
>   .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
>   .evictor(CountEvictor.of(1))
>   .reduce((ReduceFunction) (value1, value2) -> value2)
>
>   .addSink(textLongSink);
>
>
>
> 这是我在使用过程中实时刷新每天统计数据的方法。

Reply via email to