Hi Hung, I see one thing that could explain the problem, the timestamp assigner should look like this:
new AssignerWithPeriodicWatermarks<BizEvent>() { long curTimeStamp; @Override public long extractTimestamp(BizEvent biz, long currentTimestamp) { curTimeStamp = Math.max(curTimeStamp, biz.time.getMillis()); return biz.time.getMillis(); } @Override public long getCurrentWatermark() { return (curTimeStamp - (maxEventDelay * 1000)); } } The currentTimestamp parameter is the internal timestamp that the element had before, which is most likely just “-1” because no timestamp was previously assigned. Does it work with that fix? Cheers, Aljoscha > On 25 Feb 2016, at 17:26, HungChang <unicorn.bana...@gmail.com> wrote: > > An update. The following situation works as expected. The data arrives after > Flink job starts to execute. > 1> (2016-02-25T17:46:25.00,13) > 2> (2016-02-25T17:46:40.00,16) > 3> (2016-02-25T17:46:50.00,11) > 4> (2016-02-25T17:47:10.00,12) > > But for the data arrives long time before. Strange behavior appears. Does it > mean we cannot reply the computation? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/The-way-to-itearte-instances-in-AllWindowFunction-in-current-Master-branch-tp5137p5156.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.