The role of the watermarks in your job will be to trigger the closing of the sliding event time windows. In order to play that role properly, they should be based on the timestamps in the events, rather than some arbitrary constant (9999L). The reason why the same object is responsible for extracting timestamps and supplying watermarks is so that this object can base the watermarks it creates on its observations of the timestamps in the event stream. So unless your event timestamps are also based on incrementing a similar counter, this may explain some of the behavior you are seeing.
Another issue is that while extractTimestamp is called for every event, in a periodic watermark assigner the getCurrentWatermark method is called in a separate thread once every 200 msec (by default). If you want watermarks after every event you'll need to use an AssignerWithPunctuatedWatermarks, though doing so is something of an anti-pattern (because having that many watermarks adds overhead). If your timestamps are completely artificial, you might find a SlidingCountWindow a more natural fit for what you're doing. On Wed, Aug 21, 2019 at 6:20 PM Eric Isling <out.code....@gmail.com> wrote: > > Dear list-members, > > I have a question regarding window-firing and element accumulation for a > slidindingwindow on a DataStream (Flink 1.8.1-2.12). > > My DataStream is derived from a custom SourceFunction, which emits > stirng-sequences of WINDOW size, in a deterministic sequence. > The aim is to crete sliding windows over the keyedstream for processing on > the accumulated strings, based on EventTime. > To assign EventTime and Watermark, I attech an > AssignerWithPeriodicWaterMarks, to the stream. > The sliding window is processed with a custom ProcessWindowFunction. > > env.setStreamTimeCharacteristic(EventTime) > val seqStream = env.addSource(Seqstream) > .assignTimestampsAndWatermarks(SeqTimeStampExtractor()) > .keyBy(getEventtimeKey) > .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), > Time.milliseconds(slideSize))) > > val result = seqStream.process(ProcessSeqWindow(target1)) > > My AssignerWithPeriodicWaterMarks looks like this. > class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> { > var waterMark = 9999L > override fun extractTimestamp(element: FASTAstring, > previousElementTimestamp: Long): Long { > return element.f1 > } > > override fun getCurrentWatermark(): Watermark? { > waterMark += 1 > return Watermark(waterMark) > } > } > > In other words, each element emitted by the source should have its own > EvenTime, and the WaterMark should be emitted allowing no further events for > that time. > Stepping through the stream in a debugger, indicates that EventTime / > Watremarks are generated as would expect. > > My expectation is that ProcessSeqWindow.run() ought to be called with a > number of elements proportional to the time window (e.g. 10 ms), over > EventTime. However, what I observe is that run() is called multiple times > with single elemnts, and in an arbitrary sequence with respect to EventTime. > > My question is whether this is likely to be caused by multiple trigger-events > on each window, or are there other possible explainations? How can I debug > the cause? > > Thanks, > > Eric