The role of the watermarks in your job will be to trigger the closing
of the sliding event time windows. In order to play that role
properly, they should be based on the timestamps in the events, rather
than some arbitrary constant (9999L). The reason why the same object
is responsible for extracting timestamps and supplying watermarks is
so that this object can base the watermarks it creates on its
observations of the timestamps in the event stream. So unless your
event timestamps are also based on incrementing a similar counter,
this may explain some of the behavior you are seeing.

Another issue is that while extractTimestamp is called for every
event, in a periodic watermark assigner the getCurrentWatermark method
is called in a separate thread once every 200 msec (by default). If
you want watermarks after every event you'll need to use an
AssignerWithPunctuatedWatermarks, though doing so is something of an
anti-pattern (because having that many watermarks adds overhead).

If your timestamps are completely artificial, you might find a
SlidingCountWindow a more natural fit for what you're doing.

On Wed, Aug 21, 2019 at 6:20 PM Eric Isling <out.code....@gmail.com> wrote:
>
> Dear list-members,
>
> I have a question regarding window-firing and element accumulation for a 
> slidindingwindow on a DataStream (Flink 1.8.1-2.12).
>
> My DataStream is derived from a custom SourceFunction, which emits 
> stirng-sequences of WINDOW size, in a deterministic sequence.
> The aim is to crete sliding windows over the keyedstream for processing on 
> the accumulated strings, based on EventTime.
> To assign EventTime and Watermark, I attech an 
> AssignerWithPeriodicWaterMarks, to the stream.
> The sliding window is processed with a custom ProcessWindowFunction.
>
> env.setStreamTimeCharacteristic(EventTime)
> val seqStream = env.addSource(Seqstream)
>     .assignTimestampsAndWatermarks(SeqTimeStampExtractor())
>     .keyBy(getEventtimeKey)
>     .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), 
> Time.milliseconds(slideSize)))
>
> val result = seqStream.process(ProcessSeqWindow(target1))
>
> My AssignerWithPeriodicWaterMarks looks like this.
> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> {
>     var waterMark  = 9999L
>     override fun extractTimestamp(element: FASTAstring, 
> previousElementTimestamp: Long): Long {
>         return element.f1
>     }
>
>     override fun getCurrentWatermark(): Watermark? {
>         waterMark += 1
>         return Watermark(waterMark)
>     }
> }
>
> In other words, each element emitted by the source should have its own 
> EvenTime, and the WaterMark should be emitted allowing no further events for 
> that time.
> Stepping through the stream in a debugger, indicates that EventTime / 
> Watremarks are generated as would expect.
>
> My expectation is that ProcessSeqWindow.run() ought to be called with a 
> number of elements proportional to the time window (e.g. 10 ms), over 
> EventTime. However, what I observe is that run() is called multiple times 
> with single elemnts, and in an arbitrary sequence with respect to EventTime.
>
> My question is whether this is likely to be caused by multiple trigger-events 
> on each window, or are there other possible explainations? How can I debug 
> the cause?
>
> Thanks,
>
> Eric

Reply via email to