If you still need help diagnosing the cause of the misbehavior, please share more of the code with us.
On Wed, Aug 21, 2019 at 6:24 PM Eric Isling <out.code....@gmail.com> wrote: > > I should add that the behaviour persists, even when I force parallelism to 1. > > On Wed, Aug 21, 2019 at 5:19 PM Eric Isling <out.code....@gmail.com> wrote: >> >> Dear list-members, >> >> I have a question regarding window-firing and element accumulation for a >> slidindingwindow on a DataStream (Flink 1.8.1-2.12). >> >> My DataStream is derived from a custom SourceFunction, which emits >> stirng-sequences of WINDOW size, in a deterministic sequence. >> The aim is to crete sliding windows over the keyedstream for processing on >> the accumulated strings, based on EventTime. >> To assign EventTime and Watermark, I attech an >> AssignerWithPeriodicWaterMarks, to the stream. >> The sliding window is processed with a custom ProcessWindowFunction. >> >> env.setStreamTimeCharacteristic(EventTime) >> val seqStream = env.addSource(Seqstream) >> .assignTimestampsAndWatermarks(SeqTimeStampExtractor()) >> .keyBy(getEventtimeKey) >> .window(SlidingEventTimeWindows.of(Time.milliseconds(windowSize), >> Time.milliseconds(slideSize))) >> >> val result = seqStream.process(ProcessSeqWindow(target1)) >> >> My AssignerWithPeriodicWaterMarks looks like this. >> class FASTATimeStampExtractor : AssignerWithPeriodicWatermarks<FASTAstring> { >> var waterMark = 9999L >> override fun extractTimestamp(element: FASTAstring, >> previousElementTimestamp: Long): Long { >> return element.f1 >> } >> >> override fun getCurrentWatermark(): Watermark? { >> waterMark += 1 >> return Watermark(waterMark) >> } >> } >> >> In other words, each element emitted by the source should have its own >> EvenTime, and the WaterMark should be emitted allowing no further events for >> that time. >> Stepping through the stream in a debugger, indicates that EventTime / >> Watremarks are generated as would expect. >> >> My expectation is that ProcessSeqWindow.run() ought to be called with a >> number of elements proportional to the time window (e.g. 10 ms), over >> EventTime. However, what I observe is that run() is called multiple times >> with single elemnts, and in an arbitrary sequence with respect to EventTime. >> >> My question is whether this is likely to be caused by multiple >> trigger-events on each window, or are there other possible explainations? >> How can I debug the cause? >> >> Thanks, >> >> Eric