[ https://issues.apache.org/jira/browse/FLINK-13383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892611#comment-16892611 ]
jinguishi edited comment on FLINK-13383 at 7/25/19 10:00 AM: ------------------------------------------------------------- all right was (Author: shijingui): all rights > Customize the problem in the trigger > ------------------------------------ > > Key: FLINK-13383 > URL: https://issues.apache.org/jira/browse/FLINK-13383 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.8.0 > Environment: The development environment is idea, the flink version > is 1.8 > Reporter: jinguishi > Priority: Blocker > Attachments: WX20190723-174236.png, WechatIMG2.png > > > Using a Tumbling time window, I created a time-based and counter trigger. The > parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), > get negative values, > There are screenshots in the attachment。 > code show as below > {code:java} > public class CountTrigger extends Trigger<Object, TimeWindow> { > private static final long serialVersionUID = 1L; > private CountTrigger(int count) { > this.threshold = count; > } > private int count = 0; > private int threshold; > @Override > public TriggerResult onElement(Object element, long timestamp, TimeWindow > window, TriggerContext ctx) { > long watermark = ctx.getCurrentWatermark(); > ctx.registerEventTimeTimer(window.maxTimestamp()); > if (count > threshold) { > count = 0; > return TriggerResult.FIRE; > } else { > count++; > } > System.out.println("onElement: " + element); > return TriggerResult.CONTINUE; > } > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > Trigger.TriggerContext ctx) throws Exception { > return TriggerResult.CONTINUE; > } > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > Trigger.TriggerContext ctx) throws Exception { > return TriggerResult.FIRE; > } > @Override > public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws > Exception { > ctx.deleteProcessingTimeTimer(window.maxTimestamp()); > } > @Override > public String toString() { > return "CountTrigger"; > } > public static <W extends Window> CountTrigger of(int threshold) { > return new CountTrigger(threshold); > } > } > {code} > {code:java} > public TriggerResult onElement(Object element, long timestamp, W window, > TriggerContext ctx) throws Exception { > ReducingState<Long> fireTimestamp = > ctx.getPartitionedState(stateDesc); > timestamp = ctx.getCurrentWatermark(); > long end = window.maxTimestamp(); > System.out.println(element + " " + timestamp + " " + > window.maxTimestamp() + " " + fireTimestamp.get()); > if (fireTimestamp.get() == null) { > long start = timestamp - (timestamp % interval); > long nextFireTimestamp = start + interval; > ctx.registerEventTimeTimer(nextFireTimestamp); > fireTimestamp.add(nextFireTimestamp); > > return TriggerResult.CONTINUE; > } > > return TriggerResult.CONTINUE; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)