Kenn
On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský
<je...@seznam.cz> wrote:
On 4/6/24 21:23, Reuven Lax via dev wrote:
So the problem here is that windowFn is a property of
the PCollection, not the element, and the result of
Flatten is a single PCollection.
Yes. That is the cause of why Flatten.pCollections()
needs the same windowFn.
In various cases, there is a notion of "compatible"
windows. Basically given window functions W1 and W2,
provide a W3 that "works" with both.
Exactly this would be a nice feature for Flatten,
something like 'windowFn resolve strategy', so that if
use does not know the windowFn of upstream PCollections
this can be somehow resolved at pipeline construction
time. Alternatively only as a small syntactic sugar,
something like:
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
or anything similar. This can be done in user code, so
it is not something deeper, but might help in some
cases. It would be cool if we could reuse concepts from
other cases where such mechanism is needed.
Note that Beam already has something similar with side
inputs, since the side input often is in a different
window than the main input. However main input elements
are supposed to see side input elements in the same
window (and in fact main inputs are blocked until the
side-input window is ready), so we must do a mapping.
If for example (and very commonly!) the side input is
in the global window and the main input is in a fixed
window, by default we will remap the global-window
elements into the main-input's fixed window.
This is a one-sided merge function, there is a 'main'
and 'side' input, but the generic symmetric merge might
be possible as well. E.g. if one PCollection of Flatten
is in GlobalWindow, I wonder if there are cases where
users would actually want to do anything else then apply
the same global windowing strategy to all input
PCollections.
Jan
In Side input we also allow the user to control this
mapping, so for example side input elements could
always map to the previous fixed window (e.g. while
processing window 12-1, you want to see summary data of
all records in the previous window 11-12). Users can do
this by providing a WindowMappingFunction to the View -
essentially a function from window to window.
Unfortunately this is hard to use (one must create
their own PCollectionView class) and very poorly
documented, so I doubt many users know about this!
Reuven
On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
<je...@seznam.cz> wrote:
Immediate self-correction, although setting the
strategy directly via
setWindowingStrategyInternal() *seemed* to be
working during Pipeline
construction time, during runtime it obviously does
not work, because
the PCollection was still windowed using the old
windowFn. Make sense to
me, but there remains the other question if we can
make flattening
PCollections with incompatible windowFns more
user-friendly. The current
approach where we require the same windowFn for all
input PCollections
creates some unnecessary boilerplate code needed on
user side.
Jan
On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems
legit in user core.
> The case is roughly as follows:
>
> a) compute some streaming statistics
>
> b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these statistics yielding
two windowed
> PCollections - first is global with early
trigger, the other is
> sliding window, the specific parameters of the
windowFns are
> encapsulated in the ComputeWindowedAggregation
transform
>
> c) apply the same transform on both of the above
PCollections,
> yielding two PCollections with the same types,
but different windowFns
>
> d) flatten these PCollections into single one
(e.g. for downstream
> processing - joining - or flushing to sink)
>
> Now, the flatten will not work, because these
PCollections have
> different windowFns. It would be possible to
restore the windowing for
> either of them, but it requires to somewhat break
the encapsulation of
> the transforms that produce the windowed outputs.
A more natural
> solution is to take the WindowingStrategy from
the global aggregation
> and set it via setWindowingStrategyInternal() to
the other
> PCollection. This works, but it uses API that is
marked as @Internal
> (and obviously, the name as well suggests it is
not intended for
> client-code usage).
>
> The question is, should we make a legitimate
version of this call? Or
> should we introduce a way for
Flatten.pCollections() to re-window the
> input PCollections appropriately? In the case of
conflicting
> WindowFns, where one of them is GlobalWindowing
strategy, it seems to
> me that the user's intention is quite
well-defined (this might extend
> to some 'flatten windowFn resolution strategy',
maybe).
>
> WDYT?
>
> Jan
>