On 4/9/24 18:33, Kenneth Knowles wrote:
At a top level `setWindowingStrategyInternal` exists to set up the metadata without actually assigning windows. If we were more clever we might have found a way for it to not be public... it is something that can easily lead to an invalid pipeline.
Yes, that was what hit me about one minute after I started this thread. :)

I think "compatible windows" today in Beam doesn't have very good uses anyhow. I do see how when you are flattening PCollections you might also want to explicitly have a function that says "and here is how to reconcile their different metadata". But is it not reasonable to use Window.into(global window)? It doesn't seem like boilerplate to me actually, but something you really want to know is happening.

:)

Of course this was the way out, but I was somewhat intuitively seeking something that could go this autonomously.

Generally speaking, we might have some room for improvement in the way we handle windows and triggers - windows relate only to GBK and stateful ParDo, triggers relate to GBK only. They have no semantics if downstream processing does not use any of these. There could be a pipeline preprocessing stage that would discard (replace with meaningful defaults) any of these metadata that is unused, but can cause Pipeline to fail at construction time. It is also (to me) somewhat questionable if triggers are really a property of a PCollection or a property of a specific transform (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but that is completely different story :)) because (non-default) triggers are likely not preserved across multiple transforms. Maybe the correct subject of this thread could be "are we sure our windowing and triggering semantics is 100% correct"? Probably the - wrong - expectations at the beginning of this thread were due to conflict in my mental model of how things 'could' work as opposed to how they actually work. :)

 Jan


Kenn

On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský <je...@seznam.cz> wrote:

    On 4/6/24 21:23, Reuven Lax via dev wrote:
    So the problem here is that windowFn is a property of the
    PCollection, not the element, and the result of Flatten is a
    single PCollection.
    Yes. That is the cause of why Flatten.pCollections() needs the
    same windowFn.

    In various cases, there is a notion of "compatible" windows.
    Basically given window functions W1 and W2, provide a W3 that
    "works" with both.
    Exactly this would be a nice feature for Flatten, something like
    'windowFn resolve strategy', so that if use does not know the
    windowFn of upstream PCollections this can be somehow resolved at
    pipeline construction time. Alternatively only as a small
    syntactic sugar, something like:
     
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

    or anything similar. This can be done in user code, so it is not
    something deeper, but might help in some cases. It would be cool
    if we could reuse concepts from other cases where such mechanism
    is needed.


    Note that Beam already has something similar with side inputs,
    since the side input often is in a different window than the main
    input. However main input elements are supposed to see side input
    elements in the same window (and in fact main inputs are blocked
    until the side-input window is ready), so we must do a mapping.
    If for example (and very commonly!) the side input is in the
    global window and the main input is in a fixed window, by default
    we will remap the global-window elements into the main-input's
    fixed window.

    This is a one-sided merge function, there is a 'main' and 'side'
    input, but the generic symmetric merge might be possible as well.
    E.g. if one PCollection of Flatten is in GlobalWindow, I wonder if
    there are cases where users would actually want to do anything
    else then apply the same global windowing strategy to all input
    PCollections.

     Jan


    In Side input we also allow the user to control this mapping, so
    for example side input elements could always map to the previous
    fixed window (e.g. while processing window 12-1, you want to see
    summary data of all records in the previous window 11-12). Users
    can do this by providing a WindowMappingFunction to the View -
    essentially a function from window to window. Unfortunately this
    is hard to use (one must create their own PCollectionView class)
    and very poorly documented, so I doubt many users know about this!

    Reuven

    On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský <je...@seznam.cz> wrote:

        Immediate self-correction, although setting the strategy
        directly via
        setWindowingStrategyInternal() *seemed* to be working during
        Pipeline
        construction time, during runtime it obviously does not work,
        because
        the PCollection was still windowed using the old windowFn.
        Make sense to
        me, but there remains the other question if we can make
        flattening
        PCollections with incompatible windowFns more user-friendly.
        The current
        approach where we require the same windowFn for all input
        PCollections
        creates some unnecessary boilerplate code needed on user side.

          Jan

        On 4/6/24 15:45, Jan Lukavský wrote:
        > Hi,
        >
        > I came across a case where using
        > PCollection#applyWindowingStrategyInternal seems legit in
        user core.
        > The case is roughly as follows:
        >
        >  a) compute some streaming statistics
        >
        >  b) apply the same transform (say
        ComputeWindowedAggregation) with
        > different parameters on these statistics yielding two windowed
        > PCollections - first is global with early trigger, the
        other is
        > sliding window, the specific parameters of the windowFns are
        > encapsulated in the ComputeWindowedAggregation transform
        >
        >  c) apply the same transform on both of the above
        PCollections,
        > yielding two PCollections with the same types, but
        different windowFns
        >
        >  d) flatten these PCollections into single one (e.g. for
        downstream
        > processing - joining - or flushing to sink)
        >
        > Now, the flatten will not work, because these PCollections
        have
        > different windowFns. It would be possible to restore the
        windowing for
        > either of them, but it requires to somewhat break the
        encapsulation of
        > the transforms that produce the windowed outputs. A more
        natural
        > solution is to take the WindowingStrategy from the global
        aggregation
        > and set it via setWindowingStrategyInternal() to the other
        > PCollection. This works, but it uses API that is marked as
        @Internal
        > (and obviously, the name as well suggests it is not
        intended for
        > client-code usage).
        >
        > The question is, should we make a legitimate version of
        this call? Or
        > should we introduce a way for Flatten.pCollections() to
        re-window the
        > input PCollections appropriately? In the case of conflicting
        > WindowFns, where one of them is GlobalWindowing strategy,
        it seems to
        > me that the user's intention is quite well-defined (this
        might extend
        > to some 'flatten windowFn resolution strategy', maybe).
        >
        > WDYT?
        >
        >  Jan
        >

Reply via email to