我修改了,但是根本执行不到5000条然后同时触发了多个时间一样的trigger,像是多线程执行的一样,flink版本是1.17.1.感觉我理解的定时器有偏差
2024-12-06 16:18:45,489 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:950,time:1733473124999 2024-12-06 16:18:45,691 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:962,time:1733473124999 2024-12-06 16:18:45,902 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:951,time:1733473124999 2024-12-06 16:18:46,083 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:929,time:1733473124999 2024-12-06 16:18:46,264 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:987,time:1733473124999 2024-12-06 16:18:46,465 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:948,time:1733473124999 2024-12-06 16:18:46,647 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:930,time:1733473124999 2024-12-06 16:18:46,842 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:956,time:1733473124999 2024-12-06 16:18:47,021 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:927,time:1733473124999 2024-12-06 16:18:50,000 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:1811,time:1733473129999 2024-12-06 16:18:50,234 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:1700,time:1733473129999 2024-12-06 16:18:50,458 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:1712,time:1733473129999 2024-12-06 16:18:50,694 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:1770,time:1733473129999 2024-12-06 16:18:50,916 INFO cn.com.chinaventure.dc.stream.etl.window.trigger.CountWithTimeoutTrigger [] - base on timeout trigger,count:1765,time:1733473129999 > 2024年12月5日 17:32,18889827...@139.com 写道: > > 达到5000条时,不要再注册新的定时器了,试试看。 > > 发件人: 丁浩 > 发送时间: 2024-12-03 21:11 > 收件人: user-zh > 主题: 我实现了一个基于数量和处理时间的trigger,但是总是在同一时间触发多次 > 我写了一个基于时间和数量的trigger,我本来希望实现的是,比如: > 1.在5秒内,如果数量达到了5000条,则提前触发, > 2.如果到达5秒后,则不管有多少条都触发. > 现在的情况是同一个时间点会触发多个基于时间的,我记得注册定时器,连续注册定时器,只有一个定时器才会触发,但是实际好像不是这样? > 请大佬们帮我看下一下的代码有什么问题吗? > import lombok.extern.slf4j.Slf4j; > import org.apache.flink.api.common.functions.ReduceFunction; > import org.apache.flink.api.common.state.ReducingState; > import org.apache.flink.api.common.state.ReducingStateDescriptor; > import org.apache.flink.api.common.typeutils.base.LongSerializer; > import org.apache.flink.streaming.api.windowing.triggers.Trigger; > import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; > import org.apache.flink.streaming.api.windowing.windows.Window; > > /** > * Description:基于时间和数量的触发器 > * > * @Author:dinghaohao > * @Create:2024-09-09-10:30 > */ > @Slf4j > public class CountWithTimeoutTrigger<T, W extends Window> extends Trigger<T, > W> { > private static final long serialVersionUID = 1L; > > private final long maxCount; > > private final ReducingStateDescriptor<Long> stateDesc = new > ReducingStateDescriptor<>("count", new Sum(), > LongSerializer.INSTANCE); > > private CountWithTimeoutTrigger(long maxCount) { > this.maxCount = maxCount; > } > > public static <T, W extends Window> CountWithTimeoutTrigger<T, W> of(long > maxCount) { > return new CountWithTimeoutTrigger<>(maxCount); > } > > @Override > public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { > return TriggerResult.CONTINUE; > } > > private void registerNextTimer(TriggerContext ctx, W window) { > // 在注册新定时器之前,先清除可能存在的旧定时器 > ctx.deleteProcessingTimeTimer(window.maxTimestamp()); > ctx.registerProcessingTimeTimer(window.maxTimestamp()); > } > > @Override > public TriggerResult onElement(T element, long timestamp, W window, > TriggerContext ctx) throws Exception { > ReducingState<Long> count = ctx.getPartitionedState(stateDesc); > count.add(1L); > > // 只在第一个元素到达时注册定时器 > if (count.get() == 1L) { > registerNextTimer(ctx, window); > } > > if (count.get() >= maxCount) { > log.info("base on count trigger,count:{}", count.get()); > count.clear(); > // 清除旧的定时器 > ctx.deleteProcessingTimeTimer(window.maxTimestamp()); > // 注册新的定时器 > registerNextTimer(ctx, window); > return TriggerResult.FIRE_AND_PURGE; > } > return TriggerResult.CONTINUE; > } > > @Override > public TriggerResult onProcessingTime(long time, W window, TriggerContext > ctx) throws Exception { > ReducingState<Long> count = ctx.getPartitionedState(stateDesc); > if (count.get() == null || count.get() == 0L) { > return TriggerResult.CONTINUE; > } > log.info("base on timeout trigger,count:{},time:{}", count.get(), > window.maxTimestamp()); > count.clear(); > return TriggerResult.FIRE_AND_PURGE; > } > > @Override > public void clear(W window, TriggerContext ctx) throws Exception { > ctx.deleteProcessingTimeTimer(window.maxTimestamp()); > ctx.getPartitionedState(stateDesc).clear(); > } > > @Override > public boolean canMerge() { > return true; > } > > @Override > public void onMerge(W window, OnMergeContext ctx) throws Exception { > ctx.mergePartitionedState(stateDesc); > // only register a timer if the time is not yet past the end of the > merged > // window > // this is in line with the logic in onElement(). If the time is past > the end of > // the window onElement() will fire and setting a timer here would > fire the > // window twice. > long windowMaxTimestamp = window.maxTimestamp(); > if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { > ctx.registerProcessingTimeTimer(windowMaxTimestamp); > } > } > > @Override > public String toString() { > return "CountTrigger(" + maxCount + ")"; > } > > private static class Sum implements ReduceFunction<Long> { > private static final long serialVersionUID = 1L; > > @Override > public Long reduce(Long value1, Long value2) throws Exception { > return value1 + value2; > } > } > } >