[ 
https://issues.apache.org/jira/browse/FLINK-13383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892611#comment-16892611
 ] 

jinguishi edited comment on FLINK-13383 at 7/25/19 10:00 AM:
-------------------------------------------------------------

all right


was (Author: shijingui):
all rights

> Customize the problem in the trigger
> ------------------------------------
>
>                 Key: FLINK-13383
>                 URL: https://issues.apache.org/jira/browse/FLINK-13383
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.8.0
>         Environment: The development environment is idea, the flink version 
> is 1.8
>            Reporter: jinguishi
>            Priority: Blocker
>         Attachments: WX20190723-174236.png, WechatIMG2.png
>
>
> Using a Tumbling time window, I created a time-based and counter trigger. The 
> parameters in the onElement method TriggerContext, ctx.getCurrentWatermark(), 
> get negative values, 
> There are screenshots in the attachment。
> code show as below
> {code:java}
> public class CountTrigger extends Trigger<Object, TimeWindow> {
>     private static final long serialVersionUID = 1L;
>     private CountTrigger(int count) {
>         this.threshold = count;
>     }
>     private int count = 0;
>     private int threshold;
>     @Override
>     public TriggerResult onElement(Object element, long timestamp, TimeWindow 
> window, TriggerContext ctx) {
>         long watermark = ctx.getCurrentWatermark();
>         ctx.registerEventTimeTimer(window.maxTimestamp());
>         if (count > threshold) {
>             count = 0;
>             return TriggerResult.FIRE;
>         } else {
>             count++;
>         }
>         System.out.println("onElement: " + element);
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public TriggerResult onEventTime(long time, TimeWindow window, 
> Trigger.TriggerContext ctx) throws Exception {
>         return TriggerResult.CONTINUE;
>     }
>     @Override
>     public TriggerResult onProcessingTime(long time, TimeWindow window, 
> Trigger.TriggerContext ctx) throws Exception {
>         return TriggerResult.FIRE;
>     }
>     @Override
>     public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws 
> Exception {
>         ctx.deleteProcessingTimeTimer(window.maxTimestamp());
>     }
>     @Override
>     public String toString() {
>         return "CountTrigger";
>     }
>     public static <W extends Window> CountTrigger of(int threshold) {
>         return new CountTrigger(threshold);
>     }
> }
> {code}
> {code:java}
>   public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>         ReducingState<Long> fireTimestamp = 
> ctx.getPartitionedState(stateDesc);
>         timestamp = ctx.getCurrentWatermark();
>         long end = window.maxTimestamp();
>         System.out.println(element + "  " + timestamp + "  " + 
> window.maxTimestamp() + "  " + fireTimestamp.get());
>         if (fireTimestamp.get() == null) {
>             long start = timestamp - (timestamp % interval);
>             long nextFireTimestamp = start + interval;
>             ctx.registerEventTimeTimer(nextFireTimestamp);
>             fireTimestamp.add(nextFireTimestamp);
>         
>             return TriggerResult.CONTINUE;
>         }
>        
>         return TriggerResult.CONTINUE;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to