Hi Kenn,

On 4/26/21 5:59 PM, Kenneth Knowles wrote:
In +Reza Rokni <mailto:[email protected]>'s example of looping timers, it is necessary to "seed" each key, for just the reason you say. The looping timer itself for a key should be in the global window. The outputs of the looping timer are windowed.
Yes, exactly.

All that said, your example seems possibly easier if you are OK with no output for windows with no data.

The problem is actually not with windows with no data. But with windows containing only droppable data. This "toy example" is interestingly much more complex than I expected. Pretty much due to the reason, that there is no access to watermark while processing elements. But yes, there are probably more efficient ways to solve that, the best option would be to have access to the input watermark (e.g. at the start of the bundle, that seems to be well defined, though I understand there is some negative experience with that approach). But I don't want to discuss the solutions, actually.

My "motivating example" was merely a motivation for me to ask this question (and possible one more about side inputs is to follow :)), but - giving all examples and possible solutions aside, the question is - is a minimal duration an intrinsic property of a WindowFn, or not? If yes, I think there are reasons to include this property into the model. If no, then we can discuss the reason why is it the case. I see the only problem with data-driven windows, all other windows are time-based and as such, probably carry this property. The data-driven WindowFns could have this property defined as zero. This is not a super critical request, more of a philosophical discussion.

 Jan

It sounds like you don't actually want to drop the data, yes? You want to partition elements at some time X that is in the middle of some event time interval. If I understand your chosen approach, you could buffer the element w/ metadata and set the timer in @ProcessElement. It is no problem if the timestamp of the timer has already passed. It will fire immediately then. In the @OnTimer you output from the buffer. I think there may be more efficient ways to achieve this output.

Kenn

On Thu, Apr 22, 2021 at 2:48 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi,

    I have come across a "problem" while implementing some toy
    Pipeline. I
    would like to split input PCollection into two parts - droppable data
    (delayed for more than allowed lateness from the end of the
    window) from
    the rest. I will not go into details, as that is not relevant, the
    problem is that I need to setup something like "looping timer" to be
    able to create state for a window, even when there is no data, yet
    (to
    be able to setup timer for the end of a window, to be able to
    recognize
    droppable data). I would like the solution to be generic, so I would
    like to "infer" the duration of the looping timer from the input
    PCollection. What I would need is to know a _minimal guaranteed
    duration
    of a window that a WindowFn can generate_. I would then setup the
    looping timer to tick with interval of this minimal duration and that
    would guarantee the timer will hit all the windows.

    I could try to infer this duration from the input windowing with some
    hackish ways - e.g. using some "instanceof" approach, or by using the
    WindowFn to generate set of windows for some fixed timestamp (without
    data element) and then infer the time from maxTimestamp of the
    returned
    windows. That would probably break for sliding windows, because the
    result would be the duration of the slide, not the duration of the
    window (at least when doing naive computation).

    It seems to me, that all WindowFns have such a minimal Duration -
    obvious for Fixed Windows, but every other window type seems to have
    such property (including Sessions - that is the gap duration). The
    only
    problem would be with data-driven windows, but we don't have
    currently
    strong support for these.

    The question is then - would it make sense to introduce
    WindowFn.getMinimalWindowDuration() to the model? Default value
    could be
    zero, which would mean such WindowFn would be unsupported in my
    motivating example.

      Jan

Reply via email to