Hi Hung,
I see one thing that could explain the problem, the timestamp assigner should 
look like this:

new AssignerWithPeriodicWatermarks<BizEvent>() {

                   long curTimeStamp;

                   @Override
                   public long extractTimestamp(BizEvent biz, long
currentTimestamp) {
                       curTimeStamp = Math.max(curTimeStamp, 
biz.time.getMillis());
                       return biz.time.getMillis();
                   }

                   @Override
                   public long getCurrentWatermark() {
                       return (curTimeStamp - (maxEventDelay * 1000));
                   }
               }

The currentTimestamp parameter is the internal timestamp that the element had 
before, which is most likely just “-1” because no timestamp was previously 
assigned.

Does it work with that fix?

Cheers,
Aljoscha

> On 25 Feb 2016, at 17:26, HungChang <unicorn.bana...@gmail.com> wrote:
> 
> An update. The following situation works as expected. The data arrives after
> Flink job starts to execute.
> 1> (2016-02-25T17:46:25.00,13)
> 2> (2016-02-25T17:46:40.00,16)
> 3> (2016-02-25T17:46:50.00,11)
> 4> (2016-02-25T17:47:10.00,12)
> 
> But for the data arrives long time before. Strange behavior appears. Does it
> mean we cannot reply the computation?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.

Reply via email to