I think there is an error in the code snippet describing the ProcessFunction 
time out example :  

    public void onTimer(long timestamp, OnTimerContext ctx, 
Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified) {
            // emit the state
            out.collect(new Tuple2<String, Long>(result.key, result.count));
If, as stated in the example, the CountWithTimeoutFunction should emit a 
key/count if no further update occurred during the  minute elapsed since last 
update, the test should be : 

if (timestamp == result.lastModified + 60000) { 
        // emit the state on timeout 
        out.collect(new Tuple2<String, Long>(result.key, result.count)); 

As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer 
method is the timestamp of the firing timer.

Reply via email to