PCollection#applyWindowingStrategyInternal

2024-04-06 Thread Jan Lukavský

Hi,

I came across a case where using 
PCollection#applyWindowingStrategyInternal seems legit in user core. The 
case is roughly as follows:


 a) compute some streaming statistics

 b) apply the same transform (say ComputeWindowedAggregation) with 
different parameters on these statistics yielding two windowed 
PCollections - first is global with early trigger, the other is sliding 
window, the specific parameters of the windowFns are encapsulated in the 
ComputeWindowedAggregation transform


 c) apply the same transform on both of the above PCollections, 
yielding two PCollections with the same types, but different windowFns


 d) flatten these PCollections into single one (e.g. for downstream 
processing - joining - or flushing to sink)


Now, the flatten will not work, because these PCollections have 
different windowFns. It would be possible to restore the windowing for 
either of them, but it requires to somewhat break the encapsulation of 
the transforms that produce the windowed outputs. A more natural 
solution is to take the WindowingStrategy from the global aggregation 
and set it via setWindowingStrategyInternal() to the other PCollection. 
This works, but it uses API that is marked as @Internal (and obviously, 
the name as well suggests it is not intended for client-code usage).


The question is, should we make a legitimate version of this call? Or 
should we introduce a way for Flatten.pCollections() to re-window the 
input PCollections appropriately? In the case of conflicting WindowFns, 
where one of them is GlobalWindowing strategy, it seems to me that the 
user's intention is quite well-defined (this might extend  to some 
'flatten windowFn resolution strategy', maybe).


WDYT?

 Jan



Re: PCollection#applyWindowingStrategyInternal

2024-04-06 Thread Jan Lukavský
Immediate self-correction, although setting the strategy directly via 
setWindowingStrategyInternal() *seemed* to be working during Pipeline 
construction time, during runtime it obviously does not work, because 
the PCollection was still windowed using the old windowFn. Make sense to 
me, but there remains the other question if we can make flattening 
PCollections with incompatible windowFns more user-friendly. The current 
approach where we require the same windowFn for all input PCollections 
creates some unnecessary boilerplate code needed on user side.


 Jan

On 4/6/24 15:45, Jan Lukavský wrote:

Hi,

I came across a case where using 
PCollection#applyWindowingStrategyInternal seems legit in user core. 
The case is roughly as follows:


 a) compute some streaming statistics

 b) apply the same transform (say ComputeWindowedAggregation) with 
different parameters on these statistics yielding two windowed 
PCollections - first is global with early trigger, the other is 
sliding window, the specific parameters of the windowFns are 
encapsulated in the ComputeWindowedAggregation transform


 c) apply the same transform on both of the above PCollections, 
yielding two PCollections with the same types, but different windowFns


 d) flatten these PCollections into single one (e.g. for downstream 
processing - joining - or flushing to sink)


Now, the flatten will not work, because these PCollections have 
different windowFns. It would be possible to restore the windowing for 
either of them, but it requires to somewhat break the encapsulation of 
the transforms that produce the windowed outputs. A more natural 
solution is to take the WindowingStrategy from the global aggregation 
and set it via setWindowingStrategyInternal() to the other 
PCollection. This works, but it uses API that is marked as @Internal 
(and obviously, the name as well suggests it is not intended for 
client-code usage).


The question is, should we make a legitimate version of this call? Or 
should we introduce a way for Flatten.pCollections() to re-window the 
input PCollections appropriately? In the case of conflicting 
WindowFns, where one of them is GlobalWindowing strategy, it seems to 
me that the user's intention is quite well-defined (this might extend  
to some 'flatten windowFn resolution strategy', maybe).


WDYT?

 Jan



Re: PCollection#applyWindowingStrategyInternal

2024-04-06 Thread Reuven Lax via dev
So the problem here is that windowFn is a property of the PCollection, not
the element, and the result of Flatten is a single PCollection.

In various cases, there is a notion of "compatible" windows. Basically
given window functions W1 and W2, provide a W3 that "works" with both.

Note that Beam already has something similar with side inputs, since the
side input often is in a different window than the main input. However main
input elements are supposed to see side input elements in the same window
(and in fact main inputs are blocked until the side-input window is ready),
so we must do a mapping. If for example (and very commonly!) the side input
is in the global window and the main input is in a fixed window, by default
we will remap the global-window elements into the main-input's fixed window.

In Side input we also allow the user to control this mapping, so for
example side input elements could always map to the previous fixed window
(e.g. while processing window 12-1, you want to see summary data of all
records in the previous window 11-12). Users can do this by providing a
WindowMappingFunction to the View - essentially a function from window to
window. Unfortunately this is hard to use (one must create their own
PCollectionView class) and very poorly documented, so I doubt many users
know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:

> Immediate self-correction, although setting the strategy directly via
> setWindowingStrategyInternal() *seemed* to be working during Pipeline
> construction time, during runtime it obviously does not work, because
> the PCollection was still windowed using the old windowFn. Make sense to
> me, but there remains the other question if we can make flattening
> PCollections with incompatible windowFns more user-friendly. The current
> approach where we require the same windowFn for all input PCollections
> creates some unnecessary boilerplate code needed on user side.
>
>   Jan
>
> On 4/6/24 15:45, Jan Lukavský wrote:
> > Hi,
> >
> > I came across a case where using
> > PCollection#applyWindowingStrategyInternal seems legit in user core.
> > The case is roughly as follows:
> >
> >  a) compute some streaming statistics
> >
> >  b) apply the same transform (say ComputeWindowedAggregation) with
> > different parameters on these statistics yielding two windowed
> > PCollections - first is global with early trigger, the other is
> > sliding window, the specific parameters of the windowFns are
> > encapsulated in the ComputeWindowedAggregation transform
> >
> >  c) apply the same transform on both of the above PCollections,
> > yielding two PCollections with the same types, but different windowFns
> >
> >  d) flatten these PCollections into single one (e.g. for downstream
> > processing - joining - or flushing to sink)
> >
> > Now, the flatten will not work, because these PCollections have
> > different windowFns. It would be possible to restore the windowing for
> > either of them, but it requires to somewhat break the encapsulation of
> > the transforms that produce the windowed outputs. A more natural
> > solution is to take the WindowingStrategy from the global aggregation
> > and set it via setWindowingStrategyInternal() to the other
> > PCollection. This works, but it uses API that is marked as @Internal
> > (and obviously, the name as well suggests it is not intended for
> > client-code usage).
> >
> > The question is, should we make a legitimate version of this call? Or
> > should we introduce a way for Flatten.pCollections() to re-window the
> > input PCollections appropriately? In the case of conflicting
> > WindowFns, where one of them is GlobalWindowing strategy, it seems to
> > me that the user's intention is quite well-defined (this might extend
> > to some 'flatten windowFn resolution strategy', maybe).
> >
> > WDYT?
> >
> >  Jan
> >
>


Re: PCollection#applyWindowingStrategyInternal

2024-04-09 Thread Jan Lukavský

On 4/6/24 21:23, Reuven Lax via dev wrote:
So the problem here is that windowFn is a property of the PCollection, 
not the element, and the result of Flatten is a single PCollection.
Yes. That is the cause of why Flatten.pCollections() needs the same 
windowFn.


In various cases, there is a notion of "compatible" windows. Basically 
given window functions W1 and W2, provide a W3 that "works" with both.
Exactly this would be a nice feature for Flatten, something like 
'windowFn resolve strategy', so that if use does not know the windowFn 
of upstream PCollections this can be somehow resolved at pipeline 
construction time. Alternatively only as a small syntactic sugar, 
something like:

 
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

or anything similar. This can be done in user code, so it is not 
something deeper, but might help in some cases. It would be cool if we 
could reuse concepts from other cases where such mechanism is needed.




Note that Beam already has something similar with side inputs, since 
the side input often is in a different window than the main input. 
However main input elements are supposed to see side input elements in 
the same window (and in fact main inputs are blocked until the 
side-input window is ready), so we must do a mapping. If for example 
(and very commonly!) the side input is in the global window and the 
main input is in a fixed window, by default we will remap the 
global-window elements into the main-input's fixed window.


This is a one-sided merge function, there is a 'main' and 'side' input, 
but the generic symmetric merge might be possible as well. E.g. if one 
PCollection of Flatten is in GlobalWindow, I wonder if there are cases 
where users would actually want to do anything else then apply the same 
global windowing strategy to all input PCollections.


 Jan



In Side input we also allow the user to control this mapping, so for 
example side input elements could always map to the previous fixed 
window (e.g. while processing window 12-1, you want to see summary 
data of all records in the previous window 11-12). Users can do this 
by providing a WindowMappingFunction to the View - essentially a 
function from window to window. Unfortunately this is hard to use (one 
must create their own PCollectionView class) and very poorly 
documented, so I doubt many users know about this!


Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:

Immediate self-correction, although setting the strategy directly via
setWindowingStrategyInternal() *seemed* to be working during Pipeline
construction time, during runtime it obviously does not work, because
the PCollection was still windowed using the old windowFn. Make
sense to
me, but there remains the other question if we can make flattening
PCollections with incompatible windowFns more user-friendly. The
current
approach where we require the same windowFn for all input
PCollections
creates some unnecessary boilerplate code needed on user side.

  Jan

On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems legit in user
core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say ComputeWindowedAggregation) with
> different parameters on these statistics yielding two windowed
> PCollections - first is global with early trigger, the other is
> sliding window, the specific parameters of the windowFns are
> encapsulated in the ComputeWindowedAggregation transform
>
>  c) apply the same transform on both of the above PCollections,
> yielding two PCollections with the same types, but different
windowFns
>
>  d) flatten these PCollections into single one (e.g. for downstream
> processing - joining - or flushing to sink)
>
> Now, the flatten will not work, because these PCollections have
> different windowFns. It would be possible to restore the
windowing for
> either of them, but it requires to somewhat break the
encapsulation of
> the transforms that produce the windowed outputs. A more natural
> solution is to take the WindowingStrategy from the global
aggregation
> and set it via setWindowingStrategyInternal() to the other
> PCollection. This works, but it uses API that is marked as
@Internal
> (and obviously, the name as well suggests it is not intended for
> client-code usage).
>
> The question is, should we make a legitimate version of this
call? Or
> should we introduce a way for Flatten.pCollections() to
re-window the
> input PCollections appropriately? I

Re: PCollection#applyWindowingStrategyInternal

2024-04-09 Thread Kenneth Knowles
At a top level `setWindowingStrategyInternal` exists to set up the metadata
without actually assigning windows. If we were more clever we might have
found a way for it to not be public... it is something that can easily lead
to an invalid pipeline.

I think "compatible windows" today in Beam doesn't have very good uses
anyhow. I do see how when you are flattening PCollections you might also
want to explicitly have a function that says "and here is how to reconcile
their different metadata". But is it not reasonable to use
Window.into(global window)? It doesn't seem like boilerplate to me
actually, but something you really want to know is happening.

Kenn

On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:

> On 4/6/24 21:23, Reuven Lax via dev wrote:
>
> So the problem here is that windowFn is a property of the PCollection, not
> the element, and the result of Flatten is a single PCollection.
>
> Yes. That is the cause of why Flatten.pCollections() needs the same
> windowFn.
>
>
> In various cases, there is a notion of "compatible" windows. Basically
> given window functions W1 and W2, provide a W3 that "works" with both.
>
> Exactly this would be a nice feature for Flatten, something like 'windowFn
> resolve strategy', so that if use does not know the windowFn of upstream
> PCollections this can be somehow resolved at pipeline construction time.
> Alternatively only as a small syntactic sugar, something like:
>
>  
> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
>
> or anything similar. This can be done in user code, so it is not something
> deeper, but might help in some cases. It would be cool if we could reuse
> concepts from other cases where such mechanism is needed.
>
>
> Note that Beam already has something similar with side inputs, since the
> side input often is in a different window than the main input. However main
> input elements are supposed to see side input elements in the same window
> (and in fact main inputs are blocked until the side-input window is ready),
> so we must do a mapping. If for example (and very commonly!) the side input
> is in the global window and the main input is in a fixed window, by default
> we will remap the global-window elements into the main-input's fixed window.
>
> This is a one-sided merge function, there is a 'main' and 'side' input,
> but the generic symmetric merge might be possible as well. E.g. if one
> PCollection of Flatten is in GlobalWindow, I wonder if there are cases
> where users would actually want to do anything else then apply the same
> global windowing strategy to all input PCollections.
>
>  Jan
>
>
> In Side input we also allow the user to control this mapping, so for
> example side input elements could always map to the previous fixed window
> (e.g. while processing window 12-1, you want to see summary data of all
> records in the previous window 11-12). Users can do this by providing a
> WindowMappingFunction to the View - essentially a function from window to
> window. Unfortunately this is hard to use (one must create their own
> PCollectionView class) and very poorly documented, so I doubt many users
> know about this!
>
> Reuven
>
> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:
>
>> Immediate self-correction, although setting the strategy directly via
>> setWindowingStrategyInternal() *seemed* to be working during Pipeline
>> construction time, during runtime it obviously does not work, because
>> the PCollection was still windowed using the old windowFn. Make sense to
>> me, but there remains the other question if we can make flattening
>> PCollections with incompatible windowFns more user-friendly. The current
>> approach where we require the same windowFn for all input PCollections
>> creates some unnecessary boilerplate code needed on user side.
>>
>>   Jan
>>
>> On 4/6/24 15:45, Jan Lukavský wrote:
>> > Hi,
>> >
>> > I came across a case where using
>> > PCollection#applyWindowingStrategyInternal seems legit in user core.
>> > The case is roughly as follows:
>> >
>> >  a) compute some streaming statistics
>> >
>> >  b) apply the same transform (say ComputeWindowedAggregation) with
>> > different parameters on these statistics yielding two windowed
>> > PCollections - first is global with early trigger, the other is
>> > sliding window, the specific parameters of the windowFns are
>> > encapsulated in the ComputeWindowedAggregation transform
>> >
>> >  c) apply the same transform on both of the above PCollections,
>

Re: PCollection#applyWindowingStrategyInternal

2024-04-10 Thread Jan Lukavský
 construction time, during runtime it obviously does not work,
because
the PCollection was still windowed using the old windowFn.
Make sense to
me, but there remains the other question if we can make
flattening
PCollections with incompatible windowFns more user-friendly.
The current
approach where we require the same windowFn for all input
PCollections
creates some unnecessary boilerplate code needed on user side.

  Jan

On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems legit in
user core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these statistics yielding two windowed
> PCollections - first is global with early trigger, the
other is
> sliding window, the specific parameters of the windowFns are
> encapsulated in the ComputeWindowedAggregation transform
>
>  c) apply the same transform on both of the above
PCollections,
> yielding two PCollections with the same types, but
different windowFns
>
>  d) flatten these PCollections into single one (e.g. for
downstream
> processing - joining - or flushing to sink)
>
> Now, the flatten will not work, because these PCollections
have
> different windowFns. It would be possible to restore the
windowing for
> either of them, but it requires to somewhat break the
encapsulation of
> the transforms that produce the windowed outputs. A more
natural
> solution is to take the WindowingStrategy from the global
aggregation
> and set it via setWindowingStrategyInternal() to the other
> PCollection. This works, but it uses API that is marked as
@Internal
> (and obviously, the name as well suggests it is not
intended for
> client-code usage).
>
> The question is, should we make a legitimate version of
this call? Or
> should we introduce a way for Flatten.pCollections() to
re-window the
> input PCollections appropriately? In the case of conflicting
> WindowFns, where one of them is GlobalWindowing strategy,
it seems to
> me that the user's intention is quite well-defined (this
might extend
> to some 'flatten windowFn resolution strategy', maybe).
>
> WDYT?
>
>  Jan
>


Re: PCollection#applyWindowingStrategyInternal

2024-04-10 Thread Reuven Lax via dev
n-input's fixed window.
>>
>> This is a one-sided merge function, there is a 'main' and 'side' input,
>> but the generic symmetric merge might be possible as well. E.g. if one
>> PCollection of Flatten is in GlobalWindow, I wonder if there are cases
>> where users would actually want to do anything else then apply the same
>> global windowing strategy to all input PCollections.
>>
>>  Jan
>>
>>
>> In Side input we also allow the user to control this mapping, so for
>> example side input elements could always map to the previous fixed window
>> (e.g. while processing window 12-1, you want to see summary data of all
>> records in the previous window 11-12). Users can do this by providing a
>> WindowMappingFunction to the View - essentially a function from window to
>> window. Unfortunately this is hard to use (one must create their own
>> PCollectionView class) and very poorly documented, so I doubt many users
>> know about this!
>>
>> Reuven
>>
>> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:
>>
>>> Immediate self-correction, although setting the strategy directly via
>>> setWindowingStrategyInternal() *seemed* to be working during Pipeline
>>> construction time, during runtime it obviously does not work, because
>>> the PCollection was still windowed using the old windowFn. Make sense to
>>> me, but there remains the other question if we can make flattening
>>> PCollections with incompatible windowFns more user-friendly. The current
>>> approach where we require the same windowFn for all input PCollections
>>> creates some unnecessary boilerplate code needed on user side.
>>>
>>>   Jan
>>>
>>> On 4/6/24 15:45, Jan Lukavský wrote:
>>> > Hi,
>>> >
>>> > I came across a case where using
>>> > PCollection#applyWindowingStrategyInternal seems legit in user core.
>>> > The case is roughly as follows:
>>> >
>>> >  a) compute some streaming statistics
>>> >
>>> >  b) apply the same transform (say ComputeWindowedAggregation) with
>>> > different parameters on these statistics yielding two windowed
>>> > PCollections - first is global with early trigger, the other is
>>> > sliding window, the specific parameters of the windowFns are
>>> > encapsulated in the ComputeWindowedAggregation transform
>>> >
>>> >  c) apply the same transform on both of the above PCollections,
>>> > yielding two PCollections with the same types, but different windowFns
>>> >
>>> >  d) flatten these PCollections into single one (e.g. for downstream
>>> > processing - joining - or flushing to sink)
>>> >
>>> > Now, the flatten will not work, because these PCollections have
>>> > different windowFns. It would be possible to restore the windowing for
>>> > either of them, but it requires to somewhat break the encapsulation of
>>> > the transforms that produce the windowed outputs. A more natural
>>> > solution is to take the WindowingStrategy from the global aggregation
>>> > and set it via setWindowingStrategyInternal() to the other
>>> > PCollection. This works, but it uses API that is marked as @Internal
>>> > (and obviously, the name as well suggests it is not intended for
>>> > client-code usage).
>>> >
>>> > The question is, should we make a legitimate version of this call? Or
>>> > should we introduce a way for Flatten.pCollections() to re-window the
>>> > input PCollections appropriately? In the case of conflicting
>>> > WindowFns, where one of them is GlobalWindowing strategy, it seems to
>>> > me that the user's intention is quite well-defined (this might extend
>>> > to some 'flatten windowFn resolution strategy', maybe).
>>> >
>>> > WDYT?
>>> >
>>> >  Jan
>>> >
>>>
>>


Re: PCollection#applyWindowingStrategyInternal

2024-04-11 Thread Jan Lukavský
ide input often is in a different window
than the main input. However main input elements are
supposed to see side input elements in the same window (and
in fact main inputs are blocked until the side-input window
is ready), so we must do a mapping. If for example (and very
commonly!) the side input is in the global window and the
main input is in a fixed window, by default we will remap
the global-window elements into the main-input's fixed window.


This is a one-sided merge function, there is a 'main' and
'side' input, but the generic symmetric merge might be
possible as well. E.g. if one PCollection of Flatten is in
GlobalWindow, I wonder if there are cases where users would
actually want to do anything else then apply the same global
windowing strategy to all input PCollections.

 Jan



In Side input we also allow the user to control this
mapping, so for example side input elements could always map
to the previous fixed window (e.g. while processing window
12-1, you want to see summary data of all records in the
previous window 11-12). Users can do this by providing a
WindowMappingFunction to the View - essentially a function
from window to window. Unfortunately this is hard to use
(one must create their own PCollectionView class) and very
poorly documented, so I doubt many users know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
 wrote:

Immediate self-correction, although setting the strategy
directly via
setWindowingStrategyInternal() *seemed* to be working
during Pipeline
construction time, during runtime it obviously does not
work, because
the PCollection was still windowed using the old
windowFn. Make sense to
me, but there remains the other question if we can make
flattening
PCollections with incompatible windowFns more
user-friendly. The current
approach where we require the same windowFn for all
input PCollections
creates some unnecessary boilerplate code needed on user
side.

      Jan

    On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems legit
in user core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these statistics yielding two
windowed
> PCollections - first is global with early trigger, the
other is
> sliding window, the specific parameters of the
windowFns are
> encapsulated in the ComputeWindowedAggregation transform
>
>  c) apply the same transform on both of the above
PCollections,
> yielding two PCollections with the same types, but
different windowFns
>
>  d) flatten these PCollections into single one (e.g.
for downstream
> processing - joining - or flushing to sink)
>
> Now, the flatten will not work, because these
PCollections have
> different windowFns. It would be possible to restore
the windowing for
> either of them, but it requires to somewhat break the
encapsulation of
> the transforms that produce the windowed outputs. A
more natural
> solution is to take the WindowingStrategy from the
global aggregation
> and set it via setWindowingStrategyInternal() to the
other
> PCollection. This works, but it uses API that is
marked as @Internal
> (and obviously, the name as well suggests it is not
intended for
> client-code usage).
>
> The question is, should we make a legitimate version
of this call? Or
> should we introduce a way for Flatten.pCollections()
to re-window the
> input PCollections appropriately? In the case of
conflicting
> WindowFns, where one of them is GlobalWindowing
strategy, it seems to
> me that the user's intention is quite well-defined
(this might extend
> to some 'flatten windowFn resolution strategy', maybe).
>
> WDYT?
>
>  Jan
>


Re: PCollection#applyWindowingStrategyInternal

2024-04-11 Thread Reuven Lax via dev
gt;
>>> Yes. That is the cause of why Flatten.pCollections() needs the same
>>> windowFn.
>>>
>>>
>>> In various cases, there is a notion of "compatible" windows. Basically
>>> given window functions W1 and W2, provide a W3 that "works" with both.
>>>
>>> Exactly this would be a nice feature for Flatten, something like
>>> 'windowFn resolve strategy', so that if use does not know the windowFn of
>>> upstream PCollections this can be somehow resolved at pipeline construction
>>> time. Alternatively only as a small syntactic sugar, something like:
>>>
>>>  
>>> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
>>>
>>> or anything similar. This can be done in user code, so it is not
>>> something deeper, but might help in some cases. It would be cool if we
>>> could reuse concepts from other cases where such mechanism is needed.
>>>
>>>
>>> Note that Beam already has something similar with side inputs, since the
>>> side input often is in a different window than the main input. However main
>>> input elements are supposed to see side input elements in the same window
>>> (and in fact main inputs are blocked until the side-input window is ready),
>>> so we must do a mapping. If for example (and very commonly!) the side input
>>> is in the global window and the main input is in a fixed window, by default
>>> we will remap the global-window elements into the main-input's fixed window.
>>>
>>> This is a one-sided merge function, there is a 'main' and 'side' input,
>>> but the generic symmetric merge might be possible as well. E.g. if one
>>> PCollection of Flatten is in GlobalWindow, I wonder if there are cases
>>> where users would actually want to do anything else then apply the same
>>> global windowing strategy to all input PCollections.
>>>
>>>  Jan
>>>
>>>
>>> In Side input we also allow the user to control this mapping, so for
>>> example side input elements could always map to the previous fixed window
>>> (e.g. while processing window 12-1, you want to see summary data of all
>>> records in the previous window 11-12). Users can do this by providing a
>>> WindowMappingFunction to the View - essentially a function from window to
>>> window. Unfortunately this is hard to use (one must create their own
>>> PCollectionView class) and very poorly documented, so I doubt many users
>>> know about this!
>>>
>>> Reuven
>>>
>>> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:
>>>
>>>> Immediate self-correction, although setting the strategy directly via
>>>> setWindowingStrategyInternal() *seemed* to be working during Pipeline
>>>> construction time, during runtime it obviously does not work, because
>>>> the PCollection was still windowed using the old windowFn. Make sense
>>>> to
>>>> me, but there remains the other question if we can make flattening
>>>> PCollections with incompatible windowFns more user-friendly. The
>>>> current
>>>> approach where we require the same windowFn for all input PCollections
>>>> creates some unnecessary boilerplate code needed on user side.
>>>>
>>>>   Jan
>>>>
>>>> On 4/6/24 15:45, Jan Lukavský wrote:
>>>> > Hi,
>>>> >
>>>> > I came across a case where using
>>>> > PCollection#applyWindowingStrategyInternal seems legit in user core.
>>>> > The case is roughly as follows:
>>>> >
>>>> >  a) compute some streaming statistics
>>>> >
>>>> >  b) apply the same transform (say ComputeWindowedAggregation) with
>>>> > different parameters on these statistics yielding two windowed
>>>> > PCollections - first is global with early trigger, the other is
>>>> > sliding window, the specific parameters of the windowFns are
>>>> > encapsulated in the ComputeWindowedAggregation transform
>>>> >
>>>> >  c) apply the same transform on both of the above PCollections,
>>>> > yielding two PCollections with the same types, but different windowFns
>>>> >
>>>> >  d) flatten these PCollections into single one (e.g. for downstream
>>>> > processing - joining - or flushing to sink)
>>>> >
>>>> > Now, the flatten will not work, because these PCollections have
>>>> > different windowFns. It would be possible to restore the windowing
>>>> for
>>>> > either of them, but it requires to somewhat break the encapsulation
>>>> of
>>>> > the transforms that produce the windowed outputs. A more natural
>>>> > solution is to take the WindowingStrategy from the global aggregation
>>>> > and set it via setWindowingStrategyInternal() to the other
>>>> > PCollection. This works, but it uses API that is marked as @Internal
>>>> > (and obviously, the name as well suggests it is not intended for
>>>> > client-code usage).
>>>> >
>>>> > The question is, should we make a legitimate version of this call? Or
>>>> > should we introduce a way for Flatten.pCollections() to re-window the
>>>> > input PCollections appropriately? In the case of conflicting
>>>> > WindowFns, where one of them is GlobalWindowing strategy, it seems to
>>>> > me that the user's intention is quite well-defined (this might
>>>> extend
>>>> > to some 'flatten windowFn resolution strategy', maybe).
>>>> >
>>>> > WDYT?
>>>> >
>>>> >  Jan
>>>> >
>>>>
>>>


Re: PCollection#applyWindowingStrategyInternal

2024-04-15 Thread Jan Lukavský
 Jan



Kenn

On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský
 wrote:

On 4/6/24 21:23, Reuven Lax via dev wrote:

So the problem here is that windowFn is a property of
the PCollection, not the element, and the result of
Flatten is a single PCollection.

Yes. That is the cause of why Flatten.pCollections()
needs the same windowFn.


In various cases, there is a notion of "compatible"
windows. Basically given window functions W1 and W2,
provide a W3 that "works" with both.

Exactly this would be a nice feature for Flatten,
something like 'windowFn resolve strategy', so that if
use does not know the windowFn of upstream PCollections
this can be somehow resolved at pipeline construction
time. Alternatively only as a small syntactic sugar,
something like:
 
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

or anything similar. This can be done in user code, so
it is not something deeper, but might help in some
cases. It would be cool if we could reuse concepts from
other cases where such mechanism is needed.



Note that Beam already has something similar with side
inputs, since the side input often is in a different
window than the main input. However main input elements
are supposed to see side input elements in the same
window (and in fact main inputs are blocked until the
side-input window is ready), so we must do a mapping.
If for example (and very commonly!) the side input is
in the global window and the main input is in a fixed
window, by default we will remap the global-window
elements into the main-input's fixed window.


This is a one-sided merge function, there is a 'main'
and 'side' input, but the generic symmetric merge might
be possible as well. E.g. if one PCollection of Flatten
is in GlobalWindow, I wonder if there are cases where
users would actually want to do anything else then apply
the same global windowing strategy to all input
PCollections.

 Jan



In Side input we also allow the user to control this
mapping, so for example side input elements could
always map to the previous fixed window (e.g. while
processing window 12-1, you want to see summary data of
all records in the previous window 11-12). Users can do
this by providing a WindowMappingFunction to the View -
essentially a function from window to window.
Unfortunately this is hard to use (one must create
their own PCollectionView class) and very poorly
documented, so I doubt many users know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
 wrote:

Immediate self-correction, although setting the
strategy directly via
setWindowingStrategyInternal() *seemed* to be
working during Pipeline
construction time, during runtime it obviously does
not work, because
the PCollection was still windowed using the old
windowFn. Make sense to
me, but there remains the other question if we can
make flattening
PCollections with incompatible windowFns more
user-friendly. The current
approach where we require the same windowFn for all
input PCollections
creates some unnecessary boilerplate code needed on
user side.

      Jan

On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
> PCollection#applyWindowingStrategyInternal seems
legit in user core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these statistics yielding
two windowed
> PCollections - first is global with early
trigger, the other is
> sliding window, the specific parameters of the
windowFns are
> encapsulated in the ComputeWindowedAggregation
transform
  

Re: PCollection#applyWindowingStrategyInternal

2024-04-15 Thread Reuven Lax via dev
with meaningful defaults)
>>> any of these metadata that is unused, but can cause Pipeline to fail at
>>> construction time. It is also (to me) somewhat questionable if triggers are
>>> really a property of a PCollection or a property of a specific transform
>>> (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but that is
>>> completely different story :)) because (non-default) triggers are likely
>>> not preserved across multiple transforms. Maybe the correct subject of this
>>> thread could be "are we sure our windowing and triggering semantics is 100%
>>> correct"? Probably the - wrong - expectations at the beginning of this
>>> thread were due to conflict in my mental model of how things 'could' work
>>> as opposed to how they actually work. :)
>>>
>>>  Jan
>>>
>>>
>>> Kenn
>>>
>>> On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:
>>>
>>>> On 4/6/24 21:23, Reuven Lax via dev wrote:
>>>>
>>>> So the problem here is that windowFn is a property of the PCollection,
>>>> not the element, and the result of Flatten is a single PCollection.
>>>>
>>>> Yes. That is the cause of why Flatten.pCollections() needs the same
>>>> windowFn.
>>>>
>>>>
>>>> In various cases, there is a notion of "compatible" windows. Basically
>>>> given window functions W1 and W2, provide a W3 that "works" with both.
>>>>
>>>> Exactly this would be a nice feature for Flatten, something like
>>>> 'windowFn resolve strategy', so that if use does not know the windowFn of
>>>> upstream PCollections this can be somehow resolved at pipeline construction
>>>> time. Alternatively only as a small syntactic sugar, something like:
>>>>
>>>>  
>>>> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
>>>>
>>>> or anything similar. This can be done in user code, so it is not
>>>> something deeper, but might help in some cases. It would be cool if we
>>>> could reuse concepts from other cases where such mechanism is needed.
>>>>
>>>>
>>>> Note that Beam already has something similar with side inputs, since
>>>> the side input often is in a different window than the main input. However
>>>> main input elements are supposed to see side input elements in the same
>>>> window (and in fact main inputs are blocked until the side-input window is
>>>> ready), so we must do a mapping. If for example (and very commonly!) the
>>>> side input is in the global window and the main input is in a fixed window,
>>>> by default we will remap the global-window elements into the main-input's
>>>> fixed window.
>>>>
>>>> This is a one-sided merge function, there is a 'main' and 'side' input,
>>>> but the generic symmetric merge might be possible as well. E.g. if one
>>>> PCollection of Flatten is in GlobalWindow, I wonder if there are cases
>>>> where users would actually want to do anything else then apply the same
>>>> global windowing strategy to all input PCollections.
>>>>
>>>>  Jan
>>>>
>>>>
>>>> In Side input we also allow the user to control this mapping, so for
>>>> example side input elements could always map to the previous fixed window
>>>> (e.g. while processing window 12-1, you want to see summary data of all
>>>> records in the previous window 11-12). Users can do this by providing a
>>>> WindowMappingFunction to the View - essentially a function from window to
>>>> window. Unfortunately this is hard to use (one must create their own
>>>> PCollectionView class) and very poorly documented, so I doubt many users
>>>> know about this!
>>>>
>>>> Reuven
>>>>
>>>> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:
>>>>
>>>>> Immediate self-correction, although setting the strategy directly via
>>>>> setWindowingStrategyInternal() *seemed* to be working during Pipeline
>>>>> construction time, during runtime it obviously does not work, because
>>>>> the PCollection was still windowed using the old windowFn. Make sense
>>>>> to
>>>>> me, but there remains the other question if we can make flattening
&g

Re: PCollection#applyWindowingStrategyInternal

2024-04-22 Thread Kenneth Knowles
cases. It would be cool if we
>>>>> could reuse concepts from other cases where such mechanism is needed.
>>>>>
>>>>>
>>>>> Note that Beam already has something similar with side inputs, since
>>>>> the side input often is in a different window than the main input. However
>>>>> main input elements are supposed to see side input elements in the same
>>>>> window (and in fact main inputs are blocked until the side-input window is
>>>>> ready), so we must do a mapping. If for example (and very commonly!) the
>>>>> side input is in the global window and the main input is in a fixed 
>>>>> window,
>>>>> by default we will remap the global-window elements into the main-input's
>>>>> fixed window.
>>>>>
>>>>> This is a one-sided merge function, there is a 'main' and 'side'
>>>>> input, but the generic symmetric merge might be possible as well. E.g. if
>>>>> one PCollection of Flatten is in GlobalWindow, I wonder if there are cases
>>>>> where users would actually want to do anything else then apply the same
>>>>> global windowing strategy to all input PCollections.
>>>>>
>>>>>  Jan
>>>>>
>>>>>
>>>>> In Side input we also allow the user to control this mapping, so for
>>>>> example side input elements could always map to the previous fixed window
>>>>> (e.g. while processing window 12-1, you want to see summary data of all
>>>>> records in the previous window 11-12). Users can do this by providing a
>>>>> WindowMappingFunction to the View - essentially a function from window to
>>>>> window. Unfortunately this is hard to use (one must create their own
>>>>> PCollectionView class) and very poorly documented, so I doubt many users
>>>>> know about this!
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:
>>>>>
>>>>>> Immediate self-correction, although setting the strategy directly via
>>>>>> setWindowingStrategyInternal() *seemed* to be working during Pipeline
>>>>>> construction time, during runtime it obviously does not work, because
>>>>>> the PCollection was still windowed using the old windowFn. Make sense
>>>>>> to
>>>>>> me, but there remains the other question if we can make flattening
>>>>>> PCollections with incompatible windowFns more user-friendly. The
>>>>>> current
>>>>>> approach where we require the same windowFn for all input
>>>>>> PCollections
>>>>>> creates some unnecessary boilerplate code needed on user side.
>>>>>>
>>>>>>   Jan
>>>>>>
>>>>>> On 4/6/24 15:45, Jan Lukavský wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > I came across a case where using
>>>>>> > PCollection#applyWindowingStrategyInternal seems legit in user
>>>>>> core.
>>>>>> > The case is roughly as follows:
>>>>>> >
>>>>>> >  a) compute some streaming statistics
>>>>>> >
>>>>>> >  b) apply the same transform (say ComputeWindowedAggregation) with
>>>>>> > different parameters on these statistics yielding two windowed
>>>>>> > PCollections - first is global with early trigger, the other is
>>>>>> > sliding window, the specific parameters of the windowFns are
>>>>>> > encapsulated in the ComputeWindowedAggregation transform
>>>>>> >
>>>>>> >  c) apply the same transform on both of the above PCollections,
>>>>>> > yielding two PCollections with the same types, but different
>>>>>> windowFns
>>>>>> >
>>>>>> >  d) flatten these PCollections into single one (e.g. for downstream
>>>>>> > processing - joining - or flushing to sink)
>>>>>> >
>>>>>> > Now, the flatten will not work, because these PCollections have
>>>>>> > different windowFns. It would be possible to restore the windowing
>>>>>> for
>>>>>> > either of them, but it requires to somewhat break the encapsulation
>>>>>> of
>>>>>> > the transforms that produce the windowed outputs. A more natural
>>>>>> > solution is to take the WindowingStrategy from the global
>>>>>> aggregation
>>>>>> > and set it via setWindowingStrategyInternal() to the other
>>>>>> > PCollection. This works, but it uses API that is marked as
>>>>>> @Internal
>>>>>> > (and obviously, the name as well suggests it is not intended for
>>>>>> > client-code usage).
>>>>>> >
>>>>>> > The question is, should we make a legitimate version of this call?
>>>>>> Or
>>>>>> > should we introduce a way for Flatten.pCollections() to re-window
>>>>>> the
>>>>>> > input PCollections appropriately? In the case of conflicting
>>>>>> > WindowFns, where one of them is GlobalWindowing strategy, it seems
>>>>>> to
>>>>>> > me that the user's intention is quite well-defined (this might
>>>>>> extend
>>>>>> > to some 'flatten windowFn resolution strategy', maybe).
>>>>>> >
>>>>>> > WDYT?
>>>>>> >
>>>>>> >  Jan
>>>>>> >
>>>>>>
>>>>>


Re: PCollection#applyWindowingStrategyInternal

2024-04-23 Thread Jan Lukavský
windowFn of upstream PCollections this can be
somehow resolved at pipeline construction time.
Alternatively only as a small syntactic sugar,
something like:
 
Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))

or anything similar. This can be done in user
code, so it is not something deeper, but might
help in some cases. It would be cool if we
could reuse concepts from other cases where
such mechanism is needed.



Note that Beam already has something similar
with side inputs, since the side input often
is in a different window than the main input.
However main input elements are supposed to
see side input elements in the same window
(and in fact main inputs are blocked until the
side-input window is ready), so we must do a
mapping. If for example (and very commonly!)
the side input is in the global window and the
main input is in a fixed window, by default we
will remap the global-window elements into the
main-input's fixed window.


This is a one-sided merge function, there is a
'main' and 'side' input, but the generic
symmetric merge might be possible as well. E.g.
if one PCollection of Flatten is in
GlobalWindow, I wonder if there are cases where
users would actually want to do anything else
then apply the same global windowing strategy
to all input PCollections.

 Jan



In Side input we also allow the user to
control this mapping, so for example side
input elements could always map to the
previous fixed window (e.g. while processing
window 12-1, you want to see summary data of
all records in the previous window 11-12).
Users can do this by providing a
WindowMappingFunction to the View -
essentially a function from window to window.
Unfortunately this is hard to use (one must
create their own PCollectionView class) and
very poorly documented, so I doubt many users
know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský
 wrote:

Immediate self-correction, although
setting the strategy directly via
setWindowingStrategyInternal() *seemed* to
be working during Pipeline
construction time, during runtime it
obviously does not work, because
the PCollection was still windowed using
the old windowFn. Make sense to
me, but there remains the other question
if we can make flattening
PCollections with incompatible windowFns
more user-friendly. The current
approach where we require the same
windowFn for all input PCollections
creates some unnecessary boilerplate code
needed on user side.

      Jan

On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
>
PCollection#applyWindowingStrategyInternal
seems legit in user core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these statistics
yielding two windowed
> PCollections - first is global with
early trigger, the other is
> sliding window, the specific parameters
of the windowFns are
> encapsulated in the
ComputeWindowedAggregation transform
>

Re: PCollection#applyWindowingStrategyInternal

2024-04-23 Thread Reuven Lax via dev
al model of how things 'could' work
>>>>> as opposed to how they actually work. :)
>>>>>
>>>>>  Jan
>>>>>
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:
>>>>>
>>>>>> On 4/6/24 21:23, Reuven Lax via dev wrote:
>>>>>>
>>>>>> So the problem here is that windowFn is a property of the
>>>>>> PCollection, not the element, and the result of Flatten is a single
>>>>>> PCollection.
>>>>>>
>>>>>> Yes. That is the cause of why Flatten.pCollections() needs the same
>>>>>> windowFn.
>>>>>>
>>>>>>
>>>>>> In various cases, there is a notion of "compatible" windows.
>>>>>> Basically given window functions W1 and W2, provide a W3 that "works" 
>>>>>> with
>>>>>> both.
>>>>>>
>>>>>> Exactly this would be a nice feature for Flatten, something like
>>>>>> 'windowFn resolve strategy', so that if use does not know the windowFn of
>>>>>> upstream PCollections this can be somehow resolved at pipeline 
>>>>>> construction
>>>>>> time. Alternatively only as a small syntactic sugar, something like:
>>>>>>
>>>>>>  
>>>>>> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
>>>>>>
>>>>>> or anything similar. This can be done in user code, so it is not
>>>>>> something deeper, but might help in some cases. It would be cool if we
>>>>>> could reuse concepts from other cases where such mechanism is needed.
>>>>>>
>>>>>>
>>>>>> Note that Beam already has something similar with side inputs, since
>>>>>> the side input often is in a different window than the main input. 
>>>>>> However
>>>>>> main input elements are supposed to see side input elements in the same
>>>>>> window (and in fact main inputs are blocked until the side-input window 
>>>>>> is
>>>>>> ready), so we must do a mapping. If for example (and very commonly!) the
>>>>>> side input is in the global window and the main input is in a fixed 
>>>>>> window,
>>>>>> by default we will remap the global-window elements into the main-input's
>>>>>> fixed window.
>>>>>>
>>>>>> This is a one-sided merge function, there is a 'main' and 'side'
>>>>>> input, but the generic symmetric merge might be possible as well. E.g. if
>>>>>> one PCollection of Flatten is in GlobalWindow, I wonder if there are 
>>>>>> cases
>>>>>> where users would actually want to do anything else then apply the same
>>>>>> global windowing strategy to all input PCollections.
>>>>>>
>>>>>>  Jan
>>>>>>
>>>>>>
>>>>>> In Side input we also allow the user to control this mapping, so for
>>>>>> example side input elements could always map to the previous fixed window
>>>>>> (e.g. while processing window 12-1, you want to see summary data of all
>>>>>> records in the previous window 11-12). Users can do this by providing a
>>>>>> WindowMappingFunction to the View - essentially a function from window to
>>>>>> window. Unfortunately this is hard to use (one must create their own
>>>>>> PCollectionView class) and very poorly documented, so I doubt many users
>>>>>> know about this!
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Sat, Apr 6, 2024 at 7:09 AM Jan Lukavský  wrote:
>>>>>>
>>>>>>> Immediate self-correction, although setting the strategy directly
>>>>>>> via
>>>>>>> setWindowingStrategyInternal() *seemed* to be working during
>>>>>>> Pipeline
>>>>>>> construction time, during runtime it obviously does not work,
>>>>>>> because
>>>>>>> the PCollection was still windowed using the old windowFn. Make
>>>>>>> sense to
>>>>>>> me, but there remains the other question if we can make flattening
>>>>>>> PCollections with incompatible windowFns more user-friendly. The
>>>>>>> current
>>>>>>> approach where we require the same windowFn for all input
>>>>>>> PCollections
>>>>>>> creates some unnecessary boilerplate code needed on user side.
>>>>>>>
>>>>>>>   Jan
>>>>>>>
>>>>>>> On 4/6/24 15:45, Jan Lukavský wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I came across a case where using
>>>>>>> > PCollection#applyWindowingStrategyInternal seems legit in user
>>>>>>> core.
>>>>>>> > The case is roughly as follows:
>>>>>>> >
>>>>>>> >  a) compute some streaming statistics
>>>>>>> >
>>>>>>> >  b) apply the same transform (say ComputeWindowedAggregation) with
>>>>>>> > different parameters on these statistics yielding two windowed
>>>>>>> > PCollections - first is global with early trigger, the other is
>>>>>>> > sliding window, the specific parameters of the windowFns are
>>>>>>> > encapsulated in the ComputeWindowedAggregation transform
>>>>>>> >
>>>>>>> >  c) apply the same transform on both of the above PCollections,
>>>>>>> > yielding two PCollections with the same types, but different
>>>>>>> windowFns
>>>>>>> >
>>>>>>> >  d) flatten these PCollections into single one (e.g. for
>>>>>>> downstream
>>>>>>> > processing - joining - or flushing to sink)
>>>>>>> >
>>>>>>> > Now, the flatten will not work, because these PCollections have
>>>>>>> > different windowFns. It would be possible to restore the windowing
>>>>>>> for
>>>>>>> > either of them, but it requires to somewhat break the
>>>>>>> encapsulation of
>>>>>>> > the transforms that produce the windowed outputs. A more natural
>>>>>>> > solution is to take the WindowingStrategy from the global
>>>>>>> aggregation
>>>>>>> > and set it via setWindowingStrategyInternal() to the other
>>>>>>> > PCollection. This works, but it uses API that is marked as
>>>>>>> @Internal
>>>>>>> > (and obviously, the name as well suggests it is not intended for
>>>>>>> > client-code usage).
>>>>>>> >
>>>>>>> > The question is, should we make a legitimate version of this call?
>>>>>>> Or
>>>>>>> > should we introduce a way for Flatten.pCollections() to re-window
>>>>>>> the
>>>>>>> > input PCollections appropriately? In the case of conflicting
>>>>>>> > WindowFns, where one of them is GlobalWindowing strategy, it seems
>>>>>>> to
>>>>>>> > me that the user's intention is quite well-defined (this might
>>>>>>> extend
>>>>>>> > to some 'flatten windowFn resolution strategy', maybe).
>>>>>>> >
>>>>>>> > WDYT?
>>>>>>> >
>>>>>>> >  Jan
>>>>>>> >
>>>>>>>
>>>>>>


Re: PCollection#applyWindowingStrategyInternal

2024-04-25 Thread Jan Lukavský
se (one must create their own
PCollectionView class) and very poorly
documented, so I doubt many users know
about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan
Lukavský  wrote:

Immediate self-correction, although
setting the strategy directly via
setWindowingStrategyInternal()
*seemed* to be working during Pipeline
construction time, during runtime it
obviously does not work, because
the PCollection was still windowed
using the old windowFn. Make sense to
me, but there remains the other
question if we can make flattening
PCollections with incompatible
windowFns more user-friendly. The
current
approach where we require the same
windowFn for all input PCollections
creates some unnecessary boilerplate
    code needed on user side.

  Jan

On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
>
PCollection#applyWindowingStrategyInternal
seems legit in user core.
> The case is roughly as follows:
>
>  a) compute some streaming statistics
>
>  b) apply the same transform (say
ComputeWindowedAggregation) with
> different parameters on these
statistics yielding two windowed
> PCollections - first is global with
early trigger, the other is
> sliding window, the specific
parameters of the windowFns are
> encapsulated in the
ComputeWindowedAggregation transform
>
>  c) apply the same transform on
both of the above PCollections,
> yielding two PCollections with the
same types, but different windowFns
>
>  d) flatten these PCollections into
single one (e.g. for downstream
> processing - joining - or flushing
to sink)
>
> Now, the flatten will not work,
because these PCollections have
> different windowFns. It would be
possible to restore the windowing for
> either of them, but it requires to
somewhat break the encapsulation of
> the transforms that produce the
windowed outputs. A more natural
> solution is to take the
WindowingStrategy from the global
aggregation
> and set it via
setWindowingStrategyInternal() to the
other
> PCollection. This works, but it
uses API that is marked as @Internal
> (and obviously, the name as well
suggests it is not intended for
> client-code usage).
>
> The question is, should we make a
legitimate version of this call? Or
> should we introduce a way for
Flatten.pCollections() to re-window the
> input PCollections appropriately?
In the case of conflicting
> WindowFns, where one of them is
GlobalWindowing strategy, it seems to
> me that the user's intention is
quite well-defined (this might extend
> to some 'flatten windowFn
resolution strategy', maybe).
>
> WDYT?
>
>  Jan
>


Re: PCollection#applyWindowingStrategyInternal

2024-04-25 Thread Reuven Lax via dev
ine.
>>>>>>
>>>>>> Yes, that was what hit me about one minute after I started this
>>>>>> thread. :)
>>>>>>
>>>>>>
>>>>>> I think "compatible windows" today in Beam doesn't have very good
>>>>>> uses anyhow. I do see how when you are flattening PCollections you might
>>>>>> also want to explicitly have a function that says "and here is how to
>>>>>> reconcile their different metadata". But is it not reasonable to use
>>>>>> Window.into(global window)? It doesn't seem like boilerplate to me
>>>>>> actually, but something you really want to know is happening.
>>>>>>
>>>>>> :)
>>>>>>
>>>>>> Of course this was the way out, but I was somewhat intuitively
>>>>>> seeking something that could go this autonomously.
>>>>>>
>>>>>> Generally speaking, we might have some room for improvement in the
>>>>>> way we handle windows and triggers - windows relate only to GBK and
>>>>>> stateful ParDo, triggers relate to GBK only. They have no semantics if
>>>>>> downstream processing does not use any of these. There could be a 
>>>>>> pipeline
>>>>>> preprocessing stage that would discard (replace with meaningful defaults)
>>>>>> any of these metadata that is unused, but can cause Pipeline to fail at
>>>>>> construction time. It is also (to me) somewhat questionable if triggers 
>>>>>> are
>>>>>> really a property of a PCollection or a property of a specific transform
>>>>>> (GBK - ehm, actually (stateless) 'key by' + 'reduce by key', but that is
>>>>>> completely different story :)) because (non-default) triggers are likely
>>>>>> not preserved across multiple transforms. Maybe the correct subject of 
>>>>>> this
>>>>>> thread could be "are we sure our windowing and triggering semantics is 
>>>>>> 100%
>>>>>> correct"? Probably the - wrong - expectations at the beginning of this
>>>>>> thread were due to conflict in my mental model of how things 'could' work
>>>>>> as opposed to how they actually work. :)
>>>>>>
>>>>>>  Jan
>>>>>>
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Apr 9, 2024 at 9:19 AM Jan Lukavský  wrote:
>>>>>>
>>>>>>> On 4/6/24 21:23, Reuven Lax via dev wrote:
>>>>>>>
>>>>>>> So the problem here is that windowFn is a property of the
>>>>>>> PCollection, not the element, and the result of Flatten is a single
>>>>>>> PCollection.
>>>>>>>
>>>>>>> Yes. That is the cause of why Flatten.pCollections() needs the same
>>>>>>> windowFn.
>>>>>>>
>>>>>>>
>>>>>>> In various cases, there is a notion of "compatible" windows.
>>>>>>> Basically given window functions W1 and W2, provide a W3 that "works" 
>>>>>>> with
>>>>>>> both.
>>>>>>>
>>>>>>> Exactly this would be a nice feature for Flatten, something like
>>>>>>> 'windowFn resolve strategy', so that if use does not know the windowFn 
>>>>>>> of
>>>>>>> upstream PCollections this can be somehow resolved at pipeline 
>>>>>>> construction
>>>>>>> time. Alternatively only as a small syntactic sugar, something like:
>>>>>>>
>>>>>>>  
>>>>>>> Flatten.pCollections().withWindowingStrategy(WindowResolution.into(oneInput.getWindowingStrategy()))
>>>>>>>
>>>>>>> or anything similar. This can be done in user code, so it is not
>>>>>>> something deeper, but might help in some cases. It would be cool if we
>>>>>>> could reuse concepts from other cases where such mechanism is needed.
>>>>>>>
>>>>>>>
>>>>>>> Note that Beam already has something similar with side inputs, since
>>>>>>> the side input often is in a different window than the main input. 
>>>&g

Re: PCollection#applyWindowingStrategyInternal

2024-05-05 Thread Jan Lukavský
   similar with side inputs, since the
side input often is in a different
window than the main input. However
main input elements are supposed to
see side input elements in the same
window (and in fact main inputs are
blocked until the side-input window
is ready), so we must do a mapping.
If for example (and very commonly!)
the side input is in the global
window and the main input is in a
fixed window, by default we will
remap the global-window elements
into the main-input's fixed window.


This is a one-sided merge function,
there is a 'main' and 'side' input,
but the generic symmetric merge might
be possible as well. E.g. if one
PCollection of Flatten is in
GlobalWindow, I wonder if there are
cases where users would actually want
to do anything else then apply the
same global windowing strategy to all
input PCollections.

 Jan



In Side input we also allow the user
to control this mapping, so for
example side input elements could
always map to the previous fixed
window (e.g. while processing window
12-1, you want to see summary data
of all records in the previous
window 11-12). Users can do this by
providing a WindowMappingFunction to
the View - essentially a function
from window to window. Unfortunately
this is hard to use (one must create
their own PCollectionView class) and
very poorly documented, so I doubt
many users know about this!

Reuven

On Sat, Apr 6, 2024 at 7:09 AM Jan
Lukavský  wrote:

Immediate self-correction,
although setting the strategy
directly via
setWindowingStrategyInternal()
*seemed* to be working during
Pipeline
construction time, during
runtime it obviously does not
work, because
the PCollection was still
windowed using the old windowFn.
Make sense to
me, but there remains the other
question if we can make flattening
PCollections with incompatible
windowFns more user-friendly.
The current
approach where we require the
same windowFn for all input
PCollections
creates some unnecessary
boilerplate code needed on user
        side.

      Jan

On 4/6/24 15:45, Jan Lukavský wrote:
> Hi,
>
> I came across a case where using
>
PCollection#applyWindowingStrategyInternal
seems legit in user core.
> The case is roughly as follows:
>
>  a) compute some streaming
statistics
>
>  b) apply the same transform
(say ComputeWindowedAggregation)
with
> different parameters on these
statistics yielding two windowed
> PCollections - first is global
with ear