????????????????????????????????????????????

public class ImmediatelyTrigger<T> extends Trigger<T, TimeWindow> {

    private final long milliseconds;
    private ValueStateDescriptor<Long> triggerStateDescriptor = new 
ValueStateDescriptor<>("last-trigger-time", Long.class);

    public ImmediatelyTrigger(Time time) {
        milliseconds = time.toMilliseconds();
    }

    /**
     * use state for judge if need fire

     */
    @Override
    public TriggerResult onElement(T element, long timestamp, TimeWindow 
window, TriggerContext ctx) throws Exception {
        ValueState<Long> lastTriggerTimeState = 
ctx.getPartitionedState(triggerStateDescriptor);
        Long ltt = lastTriggerTimeState.value();
        if (ltt == null) {
            ltt = window.getStart();
            lastTriggerTimeState.update(ltt);
        }

        if (timestamp > ltt + milliseconds) {
            lastTriggerTimeState.update(timestamp);
            return TriggerResult.FIRE;
        } else {
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.getPartitionedState(triggerStateDescriptor).clear();
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception 
{
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }
}streamOperator
?6?9?6?9.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor<EventItem>() {
?6?9?6?9@Override
?6?9?6?9public long extractAscendingTimestamp(EventItem eventItem) {
?6?9?6?9?6?9return eventItem.getWindowEnd();
?6?9?6?9}
?6?9?6?9})
?6?9?6?9.map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
?6?9?6?9.keyBy(1)
 ?6?9?6?9// ????????????23:59:59:999??????????????????
?6?9?6?9.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))   
.trigger(new ImmediatelyTrigger<>(Time.seconds(5)))   
.reduce((ReduceFunction<Tuple2<String, Long>>) (t0, t1) -> Tuple2.of(t0.f0, 
t0.f1 + t1.f1))
?6?9?6?9.addSink(textLongSink);








------------------ Original ------------------
From:  "taowang"<taow...@deepglint.com>;
Date:  Wed, Mar 6, 2019 03:20 PM
To:  "user-zh"<user-zh@flink.apache.org>;"??????"<m...@zhangzuofeng.cn>;

Subject:  Re:????????????????????????????????????????



??????.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16))) 
????????????????Trigger??????????????????????????????Trigger??????
public class WindowTrigger extends Trigger
@Override
public TriggerResult onElement(final T element, final long timestamp, final 
TimeWindow window, final TriggerContext ctx) {
return TriggerResult.FIRE;
}

@Override
public TriggerResult onProcessingTime(final long time, final TimeWindow window, 
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(final long time, final TimeWindow window, 
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public void clear(final TimeWindow window, final TriggerContext ctx) {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}

????????????????

?? ...
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
.tirgger(new WindowTrigger<>())
?? ...

*********************
?? 2019??3??6?? +0800 11:52??????@?????? <taow...@deepglint.com>????????
> ??????????????????????????????????????????key????????????????????????????????????????????????????
> ????????????????
> ?6?9????????????windows????Tupel2??????reduce??????????????????????????ProcessTimeWindow????????????????windwos????evictor(CountEvictor.of(1))????????????
> ?6?9??????????
> streamOperator
> ?6?9?6?9.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor<EventItem>() {
> ?6?9?6?9@Override
> ?6?9?6?9public long extractAscendingTimestamp(EventItem eventItem) {
> ?6?9?6?9?6?9return eventItem.getWindowEnd();
> ?6?9?6?9}
> ?6?9?6?9})
> ?6?9?6?9.map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
> ?6?9?6?9.keyBy(1)
>  ?6?9?6?9// ????????????23:59:59:999??????????????????
> ?6?9?6?9.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
> ?6?9?6?9// ??window??key????????????
> ?6?9?6?9.reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))
>
> ?6?9?6?9// ??5??????ProcessTime????????????????????????????
> ?6?9?6?9.keyBy(1)
> ?6?9?6?9.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
> ?6?9?6?9.evictor(CountEvictor.of(1))
> ?6?9?6?9.reduce((ReduceFunction) (value1, value2) -> value2)
>
> ?6?9?6?9.addSink(textLongSink);
>
>
>
> ??????????????????????????????????????????????

Reply via email to