Hi Vijay, Could you provide more information about your problem? For example - Which kind of window do you use? - What's the window size? - A relatively complete code is better :-)
As for the problem, it is probably the event time has not reached the end of the window. You can monitor the watermark in the web dashboard[1]. Also, changing even time to processing time is another way to verify if it is a watermark problem. Best, Hequn [1] https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html On Sat, Dec 15, 2018 at 12:59 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi, > Observations on Watermarks: > Read this great article: > https://data-artisans.com/blog/watermarks-in-apache-flink-made-easy > > * Watermark means when for any event TS, when to stop waiting for arrival > of earlier events. > * Watermark t means all events with Timestamp < t have already arrived. > * When to push data out - When watermark with TS >= t arrives > > Only *using incrementing current time for watermark seems to be working > correctly* but not sure if it aligns up correctly with EventTime > processing. > *Using the incoming records intervalStart as the Watermark source for > EventTime causes data to not be pushed at all* in cases when i have just > 5 records in the Source. > > My source generation for intervalStart has intervalStart incrementing at a > regular interval. > I tried using the intervalStart for my Watermark with a out of order late > boundedness of 3 secs. > The *AggregateFunction* I am using calls the add() fine but *never calls > the getResult().* > My assumption was that the AggregateFunction I am using would push the > data to getResult > based on the Watermark based on intervalStart incrementing beyong the > previous watermark t. > But it doesn't -is it because I have limited number of input records and > once intervalStart gets to the end > of the input records too fast, it stops incrementing the watermar and > hence doesn't push data ? > > With System.currentTimeMillis, it happily keeps increasing and hence > pushes the data. > > Created this class: > public class MonitoringAssigner implements > AssignerWithPunctuatedWatermarks<Monitoring> { > private long bound = 3 * 1000;//3 secs out of order bound in millisecs > > public MonitoringAssigner(long bound) { > this.bound = bound; > } > public Watermark checkAndGetNextWatermark(Monitoring monitoring, long > extractedTimestamp) { > long nextWatermark = extractedTimestamp - bound; > //simply emit a Watermark with every event > return new Watermark(nextWatermark); > } > > @Override > public long extractTimestamp(Monitoring monitoring, long previousTS) { > /*LocalDateTime intervalStart = > Utils.getLocalDateTime(monitoring.getIntervalStart());//2012-07-12 > 02:21:06.057 > long extractedTS = > Utils.getLongFromLocalDateTime(intervalStart);//*using > this stopped pushing recs after a certain time* > return extractedTS;*/ > return *System.currentTimeMillis*();//incrementing current time > > } > >