Kenn
On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský <je...@seznam.cz> wrote:
On 4/6/24 21:23, Reuven Lax via dev wrote:
So the problem here is that windowFn is a property of the
PCollection, not the element, and the result of Flatten is a
single PCollection.
Yes. That is the cause of why Flatten.pCollections() needs
the same windowFn.
In various cases, there is a notion of "compatible" windows.
Basically given window functions W1 and W2, provide a W3
that "works" with both.
Exactly this would be a nice feature for Flatten, something
like 'windowFn resolve strategy', so that if use does not
know the windowFn of upstream PCollections this can be
somehow resolved at pipeline construction time. Alternatively
only as a small syntactic sugar, something like:
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
or anything similar. This can be done in user code, so it is
not something deeper, but might help in some cases. It would
be cool if we could reuse concepts from other cases where
such mechanism is needed.
Note that Beam already has something similar with side
inputs, since the side input often is in a different window
than the main input. However main input elements are
supposed to see side input elements in the same window (and
in fact main inputs are blocked until the side-input window
is ready), so we must do a mapping. If for example (and very
commonly!) the side input is in the global window and the
main input is in a fixed window, by default we will remap
the global-window elements into the main-input's fixed window.
This is a one-sided merge function, there is a 'main' and
'side' input, but the generic symmetric merge might be
possible as well. E.g. if one PCollection of Flatten is in
GlobalWindow, I wonder if there are cases where users would
actually want to do anything else then apply the same global
windowing strategy to all input PCollections.
Jan
In Side input we also allow the user to control this
mapping, so for example side input elements could always map
to the previous fixed window (e.g. while processing window
12-1, you want to see summary data of all records in the
previous window 11-12). Users can do this by providing a
WindowMappingFunction to the View - essentially a function
from window to window. Unfortunately this is hard to use
(one must create their own PCollectionView class) and very
poorly documented, so I doubt many users know about this!
Reuven
On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
<je...@seznam.cz> wrote:
Immediate self-correction, although setting the strategy
directly via
setWindowingStrategyInternal() *seemed* to be working
during Pipeline
construction time, during runtime it obviously does not
work, because
the PCollection was still windowed using the old
windowFn. Make sense to
me, but there remains the other question if we can make
flattening
PCollections with incompatible windowFns more
user-friendly. The current
approach where we require the same windowFn for all
input PCollections
creates some unnecessary boilerplate code needed on user
side.
Jan
On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems legit
in user core.
> The case is roughly as follows:
>
> a) compute some streaming statistics
>
> b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these statistics yielding two
windowed
> PCollections - first is global with early trigger, the
other is
> sliding window, the specific parameters of the
windowFns are
> encapsulated in the ComputeWindowedAggregation transform
>
> c) apply the same transform on both of the above
PCollections,
> yielding two PCollections with the same types, but
different windowFns
>
> d) flatten these PCollections into single one (e.g.
for downstream
> processing - joining - or flushing to sink)
>
> Now, the flatten will not work, because these
PCollections have
> different windowFns. It would be possible to restore
the windowing for
> either of them, but it requires to somewhat break the
encapsulation of
> the transforms that produce the windowed outputs. A
more natural
> solution is to take the WindowingStrategy from the
global aggregation
> and set it via setWindowingStrategyInternal() to the
other
> PCollection. This works, but it uses API that is
marked as @Internal
> (and obviously, the name as well suggests it is not
intended for
> client-code usage).
>
> The question is, should we make a legitimate version
of this call? Or
> should we introduce a way for Flatten.pCollections()
to re-window the
> input PCollections appropriately? In the case of
conflicting
> WindowFns, where one of them is GlobalWindowing
strategy, it seems to
> me that the user's intention is quite well-defined
(this might extend
> to some 'flatten windowFn resolution strategy', maybe).
>
> WDYT?
>
> Jan
>