[ 
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196280#comment-17196280
 ] 

tinny cat edited comment on FLINK-19167 at 9/15/20, 4:35 PM:
-------------------------------------------------------------

thank you [~aljoscha] [~dwysakowicz] [~twalthr],  I tried the data of multiple 
keys, and the example works correctly! 

The conclusion is:

the timer of each key will be fired when the next watermark is greater than the 
current watermark, right?


was (Author: tinny):
thank you [~aljoscha] [~dwysakowicz] [~twalthr],  I tried the data of multiple 
keys, and the example works correctly! 

The conclusion is: different watermarks share the same timer, right?

> Proccess Function Example could not work
> ----------------------------------------
>
>                 Key: FLINK-19167
>                 URL: https://issues.apache.org/jira/browse/FLINK-19167
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.11.1
>            Reporter: tinny cat
>            Priority: Major
>
> Section "*Porccess Function Example*" of 
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
>  current is:
> {code:java}
> // Some comments here
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         current.lastModified = ctx.timestamp();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         // this will be never happened
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> however, it should be: 
> {code:java}
> @Override
>     public void processElement(
>             Tuple2<String, String> value, 
>             Context ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // retrieve the current count
>         CountWithTimestamp current = state.value();
>         if (current == null) {
>             current = new CountWithTimestamp();
>             current.key = value.f0;
>         }
>         // update the state's count
>         current.count++;
>         // set the state's timestamp to the record's assigned event time 
> timestamp
>         // it should be the previous watermark
>         current.lastModified = ctx.timerService().currentWatermark();
>         // write the state back
>         state.update(current);
>         // schedule the next timer 60 seconds from the current event time
>         ctx.timerService().registerEventTimeTimer(current.lastModified + 
> 60000);
>     }
>     @Override
>     public void onTimer(
>             long timestamp, 
>             OnTimerContext ctx, 
>             Collector<Tuple2<String, Long>> out) throws Exception {
>         // get the state for the key that scheduled the timer
>         CountWithTimestamp result = state.value();
>         // check if this is an outdated timer or the latest timer
>         if (timestamp == result.lastModified + 60000) {
>             // emit the state on timeout
>             out.collect(new Tuple2<String, Long>(result.key, result.count));
>         }
>     }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified = 
> ctx.timerService().currentWatermark();`  otherwise, `timestamp == 
> result.lastModified + 60000` will be never happend



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to