我修改了,但是根本执行不到5000条然后同时触发了多个时间一样的trigger,像是多线程执行的一样,flink版本是1.17.1.感觉我理解的定时器有偏差

2024-12-06 16:18:45,489 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:950,time:1733473124999
2024-12-06 16:18:45,691 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:962,time:1733473124999
2024-12-06 16:18:45,902 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:951,time:1733473124999
2024-12-06 16:18:46,083 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:929,time:1733473124999
2024-12-06 16:18:46,264 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:987,time:1733473124999
2024-12-06 16:18:46,465 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:948,time:1733473124999
2024-12-06 16:18:46,647 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:930,time:1733473124999
2024-12-06 16:18:46,842 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:956,time:1733473124999
2024-12-06 16:18:47,021 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:927,time:1733473124999
2024-12-06 16:18:50,000 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:1811,time:1733473129999
2024-12-06 16:18:50,234 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:1700,time:1733473129999
2024-12-06 16:18:50,458 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:1712,time:1733473129999
2024-12-06 16:18:50,694 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:1770,time:1733473129999
2024-12-06 16:18:50,916 INFO  
cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - 
base on timeout trigger,count:1765,time:1733473129999

> 2024年12月5日 17:32,18889827...@139.com 写道:
> 
> 达到5000条时,不要再注册新的定时器了,试试看。
> 
> 发件人: 丁浩
> 发送时间: 2024-12-03 21:11
> 收件人: user-zh
> 主题: 我实现了一个基于数量和处理时间的trigger,但是总是在同一时间触发多次
> 我写了一个基于时间和数量的trigger,我本来希望实现的是,比如:
> 1.在5秒内,如果数量达到了5000条,则提前触发,
> 2.如果到达5秒后,则不管有多少条都触发.
> 现在的情况是同一个时间点会触发多个基于时间的,我记得注册定时器,连续注册定时器,只有一个定时器才会触发,但是实际好像不是这样?
> 请大佬们帮我看下一下的代码有什么问题吗?
> import lombok.extern.slf4j.Slf4j;
> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.state.ReducingState;
> import org.apache.flink.api.common.state.ReducingStateDescriptor;
> import org.apache.flink.api.common.typeutils.base.LongSerializer;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
> import org.apache.flink.streaming.api.windowing.windows.Window;
> 
> /**
> * Description:基于时间和数量的触发器
> *
> * @Author:dinghaohao
> * @Create:2024-09-09-10:30
> */
> @Slf4j
> public class CountWithTimeoutTrigger<T, W extends Window> extends Trigger<T, 
> W> {
>    private static final long serialVersionUID = 1L;
> 
>    private final long maxCount;
> 
>    private final ReducingStateDescriptor<Long> stateDesc = new 
> ReducingStateDescriptor<>("count", new Sum(),
>            LongSerializer.INSTANCE);
> 
>    private CountWithTimeoutTrigger(long maxCount) {
>        this.maxCount = maxCount;
>    }
> 
>    public static <T, W extends Window> CountWithTimeoutTrigger<T, W> of(long 
> maxCount) {
>        return new CountWithTimeoutTrigger<>(maxCount);
>    }
> 
>    @Override
>    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
>        return TriggerResult.CONTINUE;
>    }
> 
>    private void registerNextTimer(TriggerContext ctx, W window) {
>        // 在注册新定时器之前,先清除可能存在的旧定时器
>        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
>        ctx.registerProcessingTimeTimer(window.maxTimestamp());
>    }
> 
>    @Override
>    public TriggerResult onElement(T element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>        count.add(1L);
> 
>        // 只在第一个元素到达时注册定时器
>        if (count.get() == 1L) {
>            registerNextTimer(ctx, window);
>        }
> 
>        if (count.get() >= maxCount) {
>            log.info("base on count trigger,count:{}", count.get());
>            count.clear();
>            // 清除旧的定时器
>            ctx.deleteProcessingTimeTimer(window.maxTimestamp());
>            // 注册新的定时器
>            registerNextTimer(ctx, window);
>            return TriggerResult.FIRE_AND_PURGE;
>        }
>        return TriggerResult.CONTINUE;
>    }
> 
>    @Override
>    public TriggerResult onProcessingTime(long time, W window, TriggerContext 
> ctx) throws Exception {
>        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
>        if (count.get() == null || count.get() == 0L) {
>            return TriggerResult.CONTINUE;
>        }
>        log.info("base on timeout trigger,count:{},time:{}", count.get(), 
> window.maxTimestamp());
>        count.clear();
>        return TriggerResult.FIRE_AND_PURGE;
>    }
> 
>    @Override
>    public void clear(W window, TriggerContext ctx) throws Exception {
>        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
>        ctx.getPartitionedState(stateDesc).clear();
>    }
> 
>    @Override
>    public boolean canMerge() {
>        return true;
>    }
> 
>    @Override
>    public void onMerge(W window, OnMergeContext ctx) throws Exception {
>        ctx.mergePartitionedState(stateDesc);
>        // only register a timer if the time is not yet past the end of the 
> merged
>        // window
>        // this is in line with the logic in onElement(). If the time is past 
> the end of
>        // the window onElement() will fire and setting a timer here would 
> fire the
>        // window twice.
>        long windowMaxTimestamp = window.maxTimestamp();
>        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
>            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
>        }
>    }
> 
>    @Override
>    public String toString() {
>        return "CountTrigger(" + maxCount + ")";
>    }
> 
>    private static class Sum implements ReduceFunction<Long> {
>        private static final long serialVersionUID = 1L;
> 
>        @Override
>        public Long reduce(Long value1, Long value2) throws Exception {
>            return value1 + value2;
>        }
>    }
> }
> 

回复