In +Reza Rokni <[email protected]>'s example of looping timers, it is necessary to "seed" each key, for just the reason you say. The looping timer itself for a key should be in the global window. The outputs of the looping timer are windowed.
All that said, your example seems possibly easier if you are OK with no output for windows with no data. It sounds like you don't actually want to drop the data, yes? You want to partition elements at some time X that is in the middle of some event time interval. If I understand your chosen approach, you could buffer the element w/ metadata and set the timer in @ProcessElement. It is no problem if the timestamp of the timer has already passed. It will fire immediately then. In the @OnTimer you output from the buffer. I think there may be more efficient ways to achieve this output. Kenn On Thu, Apr 22, 2021 at 2:48 AM Jan Lukavský <[email protected]> wrote: > Hi, > > I have come across a "problem" while implementing some toy Pipeline. I > would like to split input PCollection into two parts - droppable data > (delayed for more than allowed lateness from the end of the window) from > the rest. I will not go into details, as that is not relevant, the > problem is that I need to setup something like "looping timer" to be > able to create state for a window, even when there is no data, yet (to > be able to setup timer for the end of a window, to be able to recognize > droppable data). I would like the solution to be generic, so I would > like to "infer" the duration of the looping timer from the input > PCollection. What I would need is to know a _minimal guaranteed duration > of a window that a WindowFn can generate_. I would then setup the > looping timer to tick with interval of this minimal duration and that > would guarantee the timer will hit all the windows. > > I could try to infer this duration from the input windowing with some > hackish ways - e.g. using some "instanceof" approach, or by using the > WindowFn to generate set of windows for some fixed timestamp (without > data element) and then infer the time from maxTimestamp of the returned > windows. That would probably break for sliding windows, because the > result would be the duration of the slide, not the duration of the > window (at least when doing naive computation). > > It seems to me, that all WindowFns have such a minimal Duration - > obvious for Fixed Windows, but every other window type seems to have > such property (including Sessions - that is the gap duration). The only > problem would be with data-driven windows, but we don't have currently > strong support for these. > > The question is then - would it make sense to introduce > WindowFn.getMinimalWindowDuration() to the model? Default value could be > zero, which would mean such WindowFn would be unsupported in my > motivating example. > > Jan > >
