Re: Questions on primitive transforms hierarchy

2022-11-14 Thread Jan Lukavský

I don't think it is necessary in this particular case.

In general, it would be nice to document design decisions that were made 
during the history of Beam and which let to some aspects of the current 
implementation. But I'm afraid it would be rather costly and time 
consuming. We have design docs, which should be fine for most cases.


 Jan

On 11/14/22 15:25, Sachin Agarwal via dev wrote:

Would it be helpful to add these answers to the Beam docs?

On Mon, Nov 14, 2022 at 4:35 AM Jan Lukavský  wrote:

I somehow missed these answers, Reuven and Kenn, thanks for the
discussion, it helped me clarify my understanding.

 Jan

On 10/26/22 21:10, Kenneth Knowles wrote:



On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but
defining their semantics directly at a high level is more
powerful. The higher level we can make transforms, the more
flexibility we have in the runners. You *could* suggest that
we take the same approach as we do with Combine: not a
primitive, but a special transform that we optimize. You
could say that "vanilla ParDo" is a composite that has a
stateful ParDo implementation, but a runner can implement the
composite more efficiently (without a shuffle). Same with
CoGBK. You could say that there is a default expansion of
CoGBK that uses stateful DoFn (which implies a shuffle) but
that smart runners will not use that expansion.

Yes, semantics > optimizations. For optimizations Beam
already has a facility - PTransformOverride. There is no
fundamental difference about how we treat Combine wrt GBK. It
*can* be expanded using GBK, but "smart runners will not use
that expansion". This is essentially the root of this discussion.

If I rephrase it:

 a) why do we distinguish between "some" actually composite
transforms treating them as primitive, while others have
expansions, although the fundamental reasoning seems the same
for both (performance)?

It is identical to why you can choose different axioms for formal
logic and get all the same provable statements. You have to
choose something. But certainly a runner that just executes
primitives is the bare minimum and all runners are really
expected to take advantage of known composites. Before
portability, the benefit was minimal to have the runner (written
in Java) execute a transform directly vs calling a user DoFn. Now
with portability it could be huge if it avoids a Fn API crossing.

 b) is there a fundamental reason why we do not support
stateful DoFn for merging windows?

No reason. The original design was to force users to only use
"mergeable" state in a stateful DoFn for merging windows. That is
an annoying restriction that we don't really need. So I think the
best way is to have an OnMerge callback. The internal legacy Java
APIs for this are way too complex. But portability wire protocols
support it (I think?) and making a good user facing API for all
the SDKs shouldn't be too hard.

Kenn

I feel that these are related and have historical reasons,
but I'd like to know that for sure. :)

 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský
 wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a
primitive, though in practice it's implemented in terms
of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
 wrote:



On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding
of the set of Beam's primitive transforms,
which I'd like to fill. First a quick recap of
what I think is the current state. We have
(basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that
runners can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK
into ReduceFn (ReduceFnRunner), which does the
actual logic for both GBK and stateful DoFn.



Re: Questions on primitive transforms hierarchy

2022-11-14 Thread Sachin Agarwal via dev
Would it be helpful to add these answers to the Beam docs?

On Mon, Nov 14, 2022 at 4:35 AM Jan Lukavský  wrote:

> I somehow missed these answers, Reuven and Kenn, thanks for the
> discussion, it helped me clarify my understanding.
>
>  Jan
> On 10/26/22 21:10, Kenneth Knowles wrote:
>
>
>
> On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:
>
>> > Not quite IMO. It is a subtle difference. Perhaps these transforms can
>> be *implemented* using stateful DoFn, but defining their semantics directly
>> at a high level is more powerful. The higher level we can make transforms,
>> the more flexibility we have in the runners. You *could* suggest that we
>> take the same approach as we do with Combine: not a primitive, but a
>> special transform that we optimize. You could say that "vanilla ParDo" is a
>> composite that has a stateful ParDo implementation, but a runner can
>> implement the composite more efficiently (without a shuffle). Same with
>> CoGBK. You could say that there is a default expansion of CoGBK that uses
>> stateful DoFn (which implies a shuffle) but that smart runners will not use
>> that expansion.
>>
>> Yes, semantics > optimizations. For optimizations Beam already has a
>> facility - PTransformOverride. There is no fundamental difference about how
>> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart
>> runners will not use that expansion". This is essentially the root of this
>> discussion.
>>
>> If I rephrase it:
>>
>>  a) why do we distinguish between "some" actually composite transforms
>> treating them as primitive, while others have expansions, although the
>> fundamental reasoning seems the same for both (performance)?
>>
> It is identical to why you can choose different axioms for formal logic
> and get all the same provable statements. You have to choose something. But
> certainly a runner that just executes primitives is the bare minimum and
> all runners are really expected to take advantage of known composites.
> Before portability, the benefit was minimal to have the runner (written in
> Java) execute a transform directly vs calling a user DoFn. Now with
> portability it could be huge if it avoids a Fn API crossing.
>
>  b) is there a fundamental reason why we do not support stateful DoFn for
>> merging windows?
>>
> No reason. The original design was to force users to only use "mergeable"
> state in a stateful DoFn for merging windows. That is an annoying
> restriction that we don't really need. So I think the best way is to have
> an OnMerge callback. The internal legacy Java APIs for this are way too
> complex. But portability wire protocols support it (I think?) and making a
> good user facing API for all the SDKs shouldn't be too hard.
>
> Kenn
>
>
>> I feel that these are related and have historical reasons, but I'd like
>> to know that for sure. :)
>>
>>  Jan
>> On 10/24/22 19:59, Kenneth Knowles wrote:
>>
>>
>>
>> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:
>>
>>> On 10/22/22 21:47, Reuven Lax via dev wrote:
>>>
>>> I think we stated that CoGroupbyKey was also a primitive, though in
>>> practice it's implemented in terms of GroupByKey today.
>>>
>>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>>>


 On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:

> Hi,
>
> I have some missing pieces in my understanding of the set of Beam's
> primitive transforms, which I'd like to fill. First a quick recap of what 
> I
> think is the current state. We have (basically) the following primitive
> transforms:
>
>  - DoFn (stateless, stateful, splittable)
>
>  - Window
>
>  - Impulse
>
>  - GroupByKey
>
>  - Combine
>

 Not a primitive, just a well-defined transform that runners can execute
 in special ways.

>>> Yep, OK, agree. Performance is orthogonal to semantics.
>>>
>>>

>
>
>  - Flatten (pCollections)
>

 The rest, yes.



> Inside runners, we most often transform GBK into ReduceFn
> (ReduceFnRunner), which does the actual logic for both GBK and stateful
> DoFn.
>

 ReduceFnRunner is for windowing / triggers and has special feature to
 use a CombineFn while doing it. Nothing to do with stateful DoFn.

>>> My bad, wrong wording. The point was that *all* of the semantics of GBK
>>> and Combine can be defined in terms of stateful DoFn. There are some
>>> changes needed to stateful DoFn to support the Combine functionality. But
>>> as mentioned above - optimization is orthogonal to semantics.
>>>
>>
>> Not quite IMO. It is a subtle difference. Perhaps these transforms can be
>> *implemented* using stateful DoFn, but defining their semantics directly at
>> a high level is more powerful. The higher level we can make transforms, the
>> more flexibility we have in the runners. You *could* suggest that we take
>> the same approach as we do with Combine: not a primitive, but 

Re: Questions on primitive transforms hierarchy

2022-11-14 Thread Jan Lukavský
I somehow missed these answers, Reuven and Kenn, thanks for the 
discussion, it helped me clarify my understanding.


 Jan

On 10/26/22 21:10, Kenneth Knowles wrote:



On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but defining
their semantics directly at a high level is more powerful. The
higher level we can make transforms, the more flexibility we have
in the runners. You *could* suggest that we take the same approach
as we do with Combine: not a primitive, but a special transform
that we optimize. You could say that "vanilla ParDo" is a
composite that has a stateful ParDo implementation, but a runner
can implement the composite more efficiently (without a shuffle).
Same with CoGBK. You could say that there is a default expansion
of CoGBK that uses stateful DoFn (which implies a shuffle) but
that smart runners will not use that expansion.

Yes, semantics > optimizations. For optimizations Beam already has
a facility - PTransformOverride. There is no fundamental
difference about how we treat Combine wrt GBK. It *can* be
expanded using GBK, but "smart runners will not use that
expansion". This is essentially the root of this discussion.

If I rephrase it:

 a) why do we distinguish between "some" actually composite
transforms treating them as primitive, while others have
expansions, although the fundamental reasoning seems the same for
both (performance)?

It is identical to why you can choose different axioms for formal 
logic and get all the same provable statements. You have to choose 
something. But certainly a runner that just executes primitives is the 
bare minimum and all runners are really expected to take advantage of 
known composites. Before portability, the benefit was minimal to have 
the runner (written in Java) execute a transform directly vs calling a 
user DoFn. Now with portability it could be huge if it avoids a Fn API 
crossing.


 b) is there a fundamental reason why we do not support stateful
DoFn for merging windows?

No reason. The original design was to force users to only use 
"mergeable" state in a stateful DoFn for merging windows. That is an 
annoying restriction that we don't really need. So I think the best 
way is to have an OnMerge callback. The internal legacy Java APIs for 
this are way too complex. But portability wire protocols support it (I 
think?) and making a good user facing API for all the SDKs shouldn't 
be too hard.


Kenn

I feel that these are related and have historical reasons, but I'd
like to know that for sure. :)

 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a primitive,
though in practice it's implemented in terms of GroupByKey
today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
 wrote:



On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding of
the set of Beam's primitive transforms, which I'd
like to fill. First a quick recap of what I think is
the current state. We have (basically) the following
primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that
runners can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into
ReduceFn (ReduceFnRunner), which does the actual
logic for both GBK and stateful DoFn.


ReduceFnRunner is for windowing / triggers and has
special feature to use a CombineFn while doing it.
Nothing to do with stateful DoFn.


My bad, wrong wording. The point was that *all* of the
semantics of GBK and Combine can be defined in terms of
stateful DoFn. There are some changes needed to stateful DoFn
to support the Combine functionality. But as mentioned above
- optimization is orthogonal to semantics.


Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but defining
their semantics directly at a high level is more powerful. The
higher level we can make transforms, the more flexibility we have
in the runners. You *could* suggest that we take the same
approach as we do 

Re: Questions on primitive transforms hierarchy

2022-10-26 Thread Kenneth Knowles
On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> > Not quite IMO. It is a subtle difference. Perhaps these transforms can
> be *implemented* using stateful DoFn, but defining their semantics directly
> at a high level is more powerful. The higher level we can make transforms,
> the more flexibility we have in the runners. You *could* suggest that we
> take the same approach as we do with Combine: not a primitive, but a
> special transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
> Yes, semantics > optimizations. For optimizations Beam already has a
> facility - PTransformOverride. There is no fundamental difference about how
> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart
> runners will not use that expansion". This is essentially the root of this
> discussion.
>
> If I rephrase it:
>
>  a) why do we distinguish between "some" actually composite transforms
> treating them as primitive, while others have expansions, although the
> fundamental reasoning seems the same for both (performance)?
>
It is identical to why you can choose different axioms for formal logic and
get all the same provable statements. You have to choose something. But
certainly a runner that just executes primitives is the bare minimum and
all runners are really expected to take advantage of known composites.
Before portability, the benefit was minimal to have the runner (written in
Java) execute a transform directly vs calling a user DoFn. Now with
portability it could be huge if it avoids a Fn API crossing.

 b) is there a fundamental reason why we do not support stateful DoFn for
> merging windows?
>
No reason. The original design was to force users to only use "mergeable"
state in a stateful DoFn for merging windows. That is an annoying
restriction that we don't really need. So I think the best way is to have
an OnMerge callback. The internal legacy Java APIs for this are way too
complex. But portability wire protocols support it (I think?) and making a
good user facing API for all the SDKs shouldn't be too hard.

Kenn


> I feel that these are related and have historical reasons, but I'd like to
> know that for sure. :)
>
>  Jan
> On 10/24/22 19:59, Kenneth Knowles wrote:
>
>
>
> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:
>
>> On 10/22/22 21:47, Reuven Lax via dev wrote:
>>
>> I think we stated that CoGroupbyKey was also a primitive, though in
>> practice it's implemented in terms of GroupByKey today.
>>
>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>>>
 Hi,

 I have some missing pieces in my understanding of the set of Beam's
 primitive transforms, which I'd like to fill. First a quick recap of what I
 think is the current state. We have (basically) the following primitive
 transforms:

  - DoFn (stateless, stateful, splittable)

  - Window

  - Impulse

  - GroupByKey

  - Combine

>>>
>>> Not a primitive, just a well-defined transform that runners can execute
>>> in special ways.
>>>
>> Yep, OK, agree. Performance is orthogonal to semantics.
>>
>>
>>>


  - Flatten (pCollections)

>>>
>>> The rest, yes.
>>>
>>>
>>>
 Inside runners, we most often transform GBK into ReduceFn
 (ReduceFnRunner), which does the actual logic for both GBK and stateful
 DoFn.

>>>
>>> ReduceFnRunner is for windowing / triggers and has special feature to
>>> use a CombineFn while doing it. Nothing to do with stateful DoFn.
>>>
>> My bad, wrong wording. The point was that *all* of the semantics of GBK
>> and Combine can be defined in terms of stateful DoFn. There are some
>> changes needed to stateful DoFn to support the Combine functionality. But
>> as mentioned above - optimization is orthogonal to semantics.
>>
>
> Not quite IMO. It is a subtle difference. Perhaps these transforms can be
> *implemented* using stateful DoFn, but defining their semantics directly at
> a high level is more powerful. The higher level we can make transforms, the
> more flexibility we have in the runners. You *could* suggest that we take
> the same approach as we do with Combine: not a primitive, but a special
> transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
>>
>>>
>>>
 I'll compare this to 

Re: Questions on primitive transforms hierarchy

2022-10-26 Thread Reuven Lax via dev
On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský  wrote:

> > Not quite IMO. It is a subtle difference. Perhaps these transforms can
> be *implemented* using stateful DoFn, but defining their semantics directly
> at a high level is more powerful. The higher level we can make transforms,
> the more flexibility we have in the runners. You *could* suggest that we
> take the same approach as we do with Combine: not a primitive, but a
> special transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
> Yes, semantics > optimizations. For optimizations Beam already has a
> facility - PTransformOverride. There is no fundamental difference about how
> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart
> runners will not use that expansion". This is essentially the root of this
> discussion.
>
> If I rephrase it:
>
>  a) why do we distinguish between "some" actually composite transforms
> treating them as primitive, while others have expansions, although the
> fundamental reasoning seems the same for both (performance)?
>
>  b) is there a fundamental reason why we do not support stateful DoFn for
> merging windows?
>
Mostly because we would need the API to include a merge capability, and
that has never been implemented.


> I feel that these are related and have historical reasons, but I'd like to
> know that for sure. :)
>
>  Jan
> On 10/24/22 19:59, Kenneth Knowles wrote:
>
>
>
> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:
>
>> On 10/22/22 21:47, Reuven Lax via dev wrote:
>>
>> I think we stated that CoGroupbyKey was also a primitive, though in
>> practice it's implemented in terms of GroupByKey today.
>>
>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>>
>>>
>>>
>>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>>>
 Hi,

 I have some missing pieces in my understanding of the set of Beam's
 primitive transforms, which I'd like to fill. First a quick recap of what I
 think is the current state. We have (basically) the following primitive
 transforms:

  - DoFn (stateless, stateful, splittable)

  - Window

  - Impulse

  - GroupByKey

  - Combine

>>>
>>> Not a primitive, just a well-defined transform that runners can execute
>>> in special ways.
>>>
>> Yep, OK, agree. Performance is orthogonal to semantics.
>>
>>
>>>


  - Flatten (pCollections)

>>>
>>> The rest, yes.
>>>
>>>
>>>
 Inside runners, we most often transform GBK into ReduceFn
 (ReduceFnRunner), which does the actual logic for both GBK and stateful
 DoFn.

>>>
>>> ReduceFnRunner is for windowing / triggers and has special feature to
>>> use a CombineFn while doing it. Nothing to do with stateful DoFn.
>>>
>> My bad, wrong wording. The point was that *all* of the semantics of GBK
>> and Combine can be defined in terms of stateful DoFn. There are some
>> changes needed to stateful DoFn to support the Combine functionality. But
>> as mentioned above - optimization is orthogonal to semantics.
>>
>
> Not quite IMO. It is a subtle difference. Perhaps these transforms can be
> *implemented* using stateful DoFn, but defining their semantics directly at
> a high level is more powerful. The higher level we can make transforms, the
> more flexibility we have in the runners. You *could* suggest that we take
> the same approach as we do with Combine: not a primitive, but a special
> transform that we optimize. You could say that "vanilla ParDo" is a
> composite that has a stateful ParDo implementation, but a runner can
> implement the composite more efficiently (without a shuffle). Same with
> CoGBK. You could say that there is a default expansion of CoGBK that uses
> stateful DoFn (which implies a shuffle) but that smart runners will not use
> that expansion.
>
>>
>>>
>>>
 I'll compare this to the set of transforms we used to use in Euphoria
 (currently java SDK extension):

  - FlatMap ~~ stateless DoFn

  - Union ~~ Flatten

  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window

>>>
>>> Stateful DoFn does not require associative or commutative operation,
>>> while reduce/combine does. Windowing is really just a secondary key for
>>> GBK/Combine that allows completion of unbounded aggregations but has no
>>> computation associated with it.
>>>
>> Merging WindowFn contains some computation. The fact that stateful DoFn
>> do not require specific form of reduce function is precisely what makes it
>> the actual primitive, no?
>>
>>
>>>
>>>
  - (missing Impulse)

>>>
>>> Then you must have some primitive sources with splitting?
>>>
>>>
  - (missing 

Re: Questions on primitive transforms hierarchy

2022-10-25 Thread Jan Lukavský
> Not quite IMO. It is a subtle difference. Perhaps these transforms 
can be *implemented* using stateful DoFn, but defining their semantics 
directly at a high level is more powerful. The higher level we can make 
transforms, the more flexibility we have in the runners. You *could* 
suggest that we take the same approach as we do with Combine: not a 
primitive, but a special transform that we optimize. You could say that 
"vanilla ParDo" is a composite that has a stateful ParDo implementation, 
but a runner can implement the composite more efficiently (without a 
shuffle). Same with CoGBK. You could say that there is a default 
expansion of CoGBK that uses stateful DoFn (which implies a shuffle) but 
that smart runners will not use that expansion.


Yes, semantics > optimizations. For optimizations Beam already has a 
facility - PTransformOverride. There is no fundamental difference about 
how we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart 
runners will not use that expansion". This is essentially the root of 
this discussion.


If I rephrase it:

 a) why do we distinguish between "some" actually composite transforms 
treating them as primitive, while others have expansions, although the 
fundamental reasoning seems the same for both (performance)?


 b) is there a fundamental reason why we do not support stateful DoFn 
for merging windows?


I feel that these are related and have historical reasons, but I'd like 
to know that for sure. :)


 Jan

On 10/24/22 19:59, Kenneth Knowles wrote:



On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:

On 10/22/22 21:47, Reuven Lax via dev wrote:

I think we stated that CoGroupbyKey was also a primitive, though
in practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles 
wrote:



On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
 wrote:

Hi,

I have some missing pieces in my understanding of the set
of Beam's primitive transforms, which I'd like to fill.
First a quick recap of what I think is the current state.
We have (basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that runners
can execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both
GBK and stateful DoFn.


ReduceFnRunner is for windowing / triggers and has special
feature to use a CombineFn while doing it. Nothing to do with
stateful DoFn.


My bad, wrong wording. The point was that *all* of the semantics
of GBK and Combine can be defined in terms of stateful DoFn. There
are some changes needed to stateful DoFn to support the Combine
functionality. But as mentioned above - optimization is orthogonal
to semantics.


Not quite IMO. It is a subtle difference. Perhaps these transforms can 
be *implemented* using stateful DoFn, but defining their semantics 
directly at a high level is more powerful. The higher level we can 
make transforms, the more flexibility we have in the runners. You 
*could* suggest that we take the same approach as we do with Combine: 
not a primitive, but a special transform that we optimize. You could 
say that "vanilla ParDo" is a composite that has a stateful ParDo 
implementation, but a runner can implement the composite more 
efficiently (without a shuffle). Same with CoGBK. You could say that 
there is a default expansion of CoGBK that uses stateful DoFn (which 
implies a shuffle) but that smart runners will not use that expansion.



I'll compare this to the set of transforms we used to use
in Euphoria (currently java SDK extension):

 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window


Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really
just a secondary key for GBK/Combine that allows completion
of unbounded aggregations but has no computation associated
with it.


Merging WindowFn contains some computation. The fact that stateful
DoFn do not require specific form of reduce function is precisely
what makes it the actual primitive, no?



 - (missing Impulse)


Then you must have some primitive sources with splitting?

 - (missing splittable DoFn)


Kind of the same question - SDF is the one and only primitive
that creates parallelism.



Re: Questions on primitive transforms hierarchy

2022-10-24 Thread Reuven Lax via dev
On Mon, Oct 24, 2022 at 5:50 AM Jan Lukavský  wrote:

> On 10/22/22 21:47, Reuven Lax via dev wrote:
>
> I think we stated that CoGroupbyKey was also a primitive, though in
> practice it's implemented in terms of GroupByKey today.
>
> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> I have some missing pieces in my understanding of the set of Beam's
>>> primitive transforms, which I'd like to fill. First a quick recap of what I
>>> think is the current state. We have (basically) the following primitive
>>> transforms:
>>>
>>>  - DoFn (stateless, stateful, splittable)
>>>
>>>  - Window
>>>
>>>  - Impulse
>>>
>>>  - GroupByKey
>>>
>>>  - Combine
>>>
>>
>> Not a primitive, just a well-defined transform that runners can execute
>> in special ways.
>>
> Yep, OK, agree. Performance is orthogonal to semantics.
>
>
>>
>>>
>>>
>>>  - Flatten (pCollections)
>>>
>>
>> The rest, yes.
>>
>>
>>
>>> Inside runners, we most often transform GBK into ReduceFn
>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>> DoFn.
>>>
>>
>> ReduceFnRunner is for windowing / triggers and has special feature to use
>> a CombineFn while doing it. Nothing to do with stateful DoFn.
>>
> My bad, wrong wording. The point was that *all* of the semantics of GBK
> and Combine can be defined in terms of stateful DoFn. There are some
> changes needed to stateful DoFn to support the Combine functionality. But
> as mentioned above - optimization is orthogonal to semantics.
>

Yes, though we would need Multimap state to do it properly, which isn't yet
available on all runners. (You could model it _very_ inefficiently with
BagState, but that would be quite bad)


>
>>
>>
>>> I'll compare this to the set of transforms we used to use in Euphoria
>>> (currently java SDK extension):
>>>
>>>  - FlatMap ~~ stateless DoFn
>>>
>>>  - Union ~~ Flatten
>>>
>>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>>
>>
>> Stateful DoFn does not require associative or commutative operation,
>> while reduce/combine does. Windowing is really just a secondary key for
>> GBK/Combine that allows completion of unbounded aggregations but has no
>> computation associated with it.
>>
> Merging WindowFn contains some computation. The fact that stateful DoFn do
> not require specific form of reduce function is precisely what makes it the
> actual primitive, no?
>
>
>>
>>
>>>  - (missing Impulse)
>>>
>>
>> Then you must have some primitive sources with splitting?
>>
>>
>>>  - (missing splittable DoFn)
>>>
>>
>> Kind of the same question - SDF is the one and only primitive that
>> creates parallelism.
>>
> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
> in Beam works on top of PCollecions and therefore does not deal with IOs.
>
>
>> The ReduceStateByKey is a transform that is a "combinable stateful DoFn"
>>> - i.e. the state might be created pre-shuffle, on trigger the state is
>>> shuffled and then merged. In Beam we already have CombiningState and
>>> MergingState facility (sort of), which is what is needed, we just do not
>>> have the ability to shuffle the partial states and then combine them. This
>>> also relates to the inability to run stateful DoFn for merging windowFns,
>>> because that is needed there as well. Is this something that is
>>> fundamentally impossible to define for all runners? What is worth noting is
>>> that building, shuffling and merging the state before shuffle requires
>>> compatible trigger (purely based on watermark), otherwise the transform
>>> fall-backs to "classical DoFn".
>>>
>>
>> Stateful DoFn for merging windows can be defined. You could require all
>> state to be mergeable and then it is automatic. Or you could have an
>> "onMerge" callback. These should both be fine. The automatic version is
>> less likely to have nonsensical semantics, but allowing the callback to do
>> "whatever it wants" whether the result is good or not is more consistent
>> with the design of stateful DoFn.
>>
> Yes, but this is the same for CombineFn, right? The merge (or combine) has
> to be correctly aligned with the computation. The current situation is that
> we do not support stateful DoFns for merging WindowFn [1].
>
>
>> Whether and where a shuffle takes place may vary. Start with the maths.
>>
> Shuffle happens at least whenever there is a need to regroup keys. I'm not
> sure which maths you refer to, can you clarify please?
>
>  Jan
>
> [1]
> https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
>
>
>> Kenn
>>
>>
>>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>>> because it has essentially no users and actually no maintainers, but I have
>>> a feeling there is a value in the set of operators that could be
>>> transferred to Beam core, maybe. I'm pretty sure it would bring 

Re: Questions on primitive transforms hierarchy

2022-10-24 Thread Kenneth Knowles
On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský  wrote:

> On 10/22/22 21:47, Reuven Lax via dev wrote:
>
> I think we stated that CoGroupbyKey was also a primitive, though in
> practice it's implemented in terms of GroupByKey today.
>
> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> I have some missing pieces in my understanding of the set of Beam's
>>> primitive transforms, which I'd like to fill. First a quick recap of what I
>>> think is the current state. We have (basically) the following primitive
>>> transforms:
>>>
>>>  - DoFn (stateless, stateful, splittable)
>>>
>>>  - Window
>>>
>>>  - Impulse
>>>
>>>  - GroupByKey
>>>
>>>  - Combine
>>>
>>
>> Not a primitive, just a well-defined transform that runners can execute
>> in special ways.
>>
> Yep, OK, agree. Performance is orthogonal to semantics.
>
>
>>
>>>
>>>
>>>  - Flatten (pCollections)
>>>
>>
>> The rest, yes.
>>
>>
>>
>>> Inside runners, we most often transform GBK into ReduceFn
>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>> DoFn.
>>>
>>
>> ReduceFnRunner is for windowing / triggers and has special feature to use
>> a CombineFn while doing it. Nothing to do with stateful DoFn.
>>
> My bad, wrong wording. The point was that *all* of the semantics of GBK
> and Combine can be defined in terms of stateful DoFn. There are some
> changes needed to stateful DoFn to support the Combine functionality. But
> as mentioned above - optimization is orthogonal to semantics.
>

Not quite IMO. It is a subtle difference. Perhaps these transforms can be
*implemented* using stateful DoFn, but defining their semantics directly at
a high level is more powerful. The higher level we can make transforms, the
more flexibility we have in the runners. You *could* suggest that we take
the same approach as we do with Combine: not a primitive, but a special
transform that we optimize. You could say that "vanilla ParDo" is a
composite that has a stateful ParDo implementation, but a runner can
implement the composite more efficiently (without a shuffle). Same with
CoGBK. You could say that there is a default expansion of CoGBK that uses
stateful DoFn (which implies a shuffle) but that smart runners will not use
that expansion.

>
>>
>>
>>> I'll compare this to the set of transforms we used to use in Euphoria
>>> (currently java SDK extension):
>>>
>>>  - FlatMap ~~ stateless DoFn
>>>
>>>  - Union ~~ Flatten
>>>
>>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>>
>>
>> Stateful DoFn does not require associative or commutative operation,
>> while reduce/combine does. Windowing is really just a secondary key for
>> GBK/Combine that allows completion of unbounded aggregations but has no
>> computation associated with it.
>>
> Merging WindowFn contains some computation. The fact that stateful DoFn do
> not require specific form of reduce function is precisely what makes it the
> actual primitive, no?
>
>
>>
>>
>>>  - (missing Impulse)
>>>
>>
>> Then you must have some primitive sources with splitting?
>>
>>
>>>  - (missing splittable DoFn)
>>>
>>
>> Kind of the same question - SDF is the one and only primitive that
>> creates parallelism.
>>
> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
> in Beam works on top of PCollecions and therefore does not deal with IOs.
>
>
>> The ReduceStateByKey is a transform that is a "combinable stateful DoFn"
>>> - i.e. the state might be created pre-shuffle, on trigger the state is
>>> shuffled and then merged. In Beam we already have CombiningState and
>>> MergingState facility (sort of), which is what is needed, we just do not
>>> have the ability to shuffle the partial states and then combine them. This
>>> also relates to the inability to run stateful DoFn for merging windowFns,
>>> because that is needed there as well. Is this something that is
>>> fundamentally impossible to define for all runners? What is worth noting is
>>> that building, shuffling and merging the state before shuffle requires
>>> compatible trigger (purely based on watermark), otherwise the transform
>>> fall-backs to "classical DoFn".
>>>
>>
>> Stateful DoFn for merging windows can be defined. You could require all
>> state to be mergeable and then it is automatic. Or you could have an
>> "onMerge" callback. These should both be fine. The automatic version is
>> less likely to have nonsensical semantics, but allowing the callback to do
>> "whatever it wants" whether the result is good or not is more consistent
>> with the design of stateful DoFn.
>>
> Yes, but this is the same for CombineFn, right? The merge (or combine) has
> to be correctly aligned with the computation. The current situation is that
> we do not support stateful DoFns for merging WindowFn [1].
>
>
>> Whether and where a shuffle takes place may vary. Start with the maths.
>>
> Shuffle happens at least whenever there is a need to regroup 

Re: Questions on primitive transforms hierarchy

2022-10-24 Thread Jan Lukavský

On 10/22/22 21:47, Reuven Lax via dev wrote:
I think we stated that CoGroupbyKey was also a primitive, though in 
practice it's implemented in terms of GroupByKey today.


On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:



On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:

Hi,

I have some missing pieces in my understanding of the set of
Beam's primitive transforms, which I'd like to fill. First a
quick recap of what I think is the current state. We have
(basically) the following primitive transforms:

 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine


Not a primitive, just a well-defined transform that runners can
execute in special ways.


Yep, OK, agree. Performance is orthogonal to semantics.




 - Flatten (pCollections)


The rest, yes.

Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both GBK and
stateful DoFn.


ReduceFnRunner is for windowing / triggers and has special feature
to use a CombineFn while doing it. Nothing to do with stateful DoFn.

My bad, wrong wording. The point was that *all* of the semantics of GBK 
and Combine can be defined in terms of stateful DoFn. There are some 
changes needed to stateful DoFn to support the Combine functionality. 
But as mentioned above - optimization is orthogonal to semantics.


I'll compare this to the set of transforms we used to use in
Euphoria (currently java SDK extension):

 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window


Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really just a
secondary key for GBK/Combine that allows completion of unbounded
aggregations but has no computation associated with it.

Merging WindowFn contains some computation. The fact that stateful DoFn 
do not require specific form of reduce function is precisely what makes 
it the actual primitive, no?



 - (missing Impulse)


Then you must have some primitive sources with splitting?

 - (missing splittable DoFn)


Kind of the same question - SDF is the one and only primitive that
creates parallelism.

Original Euphoria had an analogy to (Un)boundedReader. The SDK extension 
in Beam works on top of PCollecions and therefore does not deal with IOs.



The ReduceStateByKey is a transform that is a "combinable
stateful DoFn" - i.e. the state might be created pre-shuffle,
on trigger the state is shuffled and then merged. In Beam we
already have CombiningState and MergingState facility (sort
of), which is what is needed, we just do not have the ability
to shuffle the partial states and then combine them. This also
relates to the inability to run stateful DoFn for merging
windowFns, because that is needed there as well. Is this
something that is fundamentally impossible to define for all
runners? What is worth noting is that building, shuffling and
merging the state before shuffle requires compatible trigger
(purely based on watermark), otherwise the transform
fall-backs to "classical DoFn".


Stateful DoFn for merging windows can be defined. You could
require all state to be mergeable and then it is automatic. Or you
could have an "onMerge" callback. These should both be fine. The
automatic version is less likely to have nonsensical semantics,
but allowing the callback to do "whatever it wants" whether the
result is good or not is more consistent with the design of
stateful DoFn.

Yes, but this is the same for CombineFn, right? The merge (or combine) 
has to be correctly aligned with the computation. The current situation 
is that we do not support stateful DoFns for merging WindowFn [1].



Whether and where a shuffle takes place may vary. Start with the
maths.

Shuffle happens at least whenever there is a need to regroup keys. I'm 
not sure which maths you refer to, can you clarify please?


 Jan

[1] 
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106




Kenn

Bottom line: I'm thinking of proposing to drop Euphoria
extension, because it has essentially no users and actually no
maintainers, but I have a feeling there is a value in the set
of operators that could be transferred to Beam core, maybe.
I'm pretty sure it would bring value to users to have access
to a "combining stateful DoFn" primitive (even better would be
"combining splittable DoFn").

Looking forward to any comments on this.

 Jan



Re: Questions on primitive transforms hierarchy

2022-10-22 Thread Reuven Lax via dev
I think we stated that CoGroupbyKey was also a primitive, though in
practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles  wrote:

>
>
> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> I have some missing pieces in my understanding of the set of Beam's
>> primitive transforms, which I'd like to fill. First a quick recap of what I
>> think is the current state. We have (basically) the following primitive
>> transforms:
>>
>>  - DoFn (stateless, stateful, splittable)
>>
>>  - Window
>>
>>  - Impulse
>>
>>  - GroupByKey
>>
>>  - Combine
>>
>
> Not a primitive, just a well-defined transform that runners can execute in
> special ways.
>
>
>>
>>
>>  - Flatten (pCollections)
>>
>
> The rest, yes.
>
>
>
>> Inside runners, we most often transform GBK into ReduceFn
>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>> DoFn.
>>
>
> ReduceFnRunner is for windowing / triggers and has special feature to use
> a CombineFn while doing it. Nothing to do with stateful DoFn.
>
>
>
>> I'll compare this to the set of transforms we used to use in Euphoria
>> (currently java SDK extension):
>>
>>  - FlatMap ~~ stateless DoFn
>>
>>  - Union ~~ Flatten
>>
>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>
>
> Stateful DoFn does not require associative or commutative operation, while
> reduce/combine does. Windowing is really just a secondary key for
> GBK/Combine that allows completion of unbounded aggregations but has no
> computation associated with it.
>
>
>
>>  - (missing Impulse)
>>
>
> Then you must have some primitive sources with splitting?
>
>
>>  - (missing splittable DoFn)
>>
>
> Kind of the same question - SDF is the one and only primitive that creates
> parallelism.
>
> The ReduceStateByKey is a transform that is a "combinable stateful DoFn" -
>> i.e. the state might be created pre-shuffle, on trigger the state is
>> shuffled and then merged. In Beam we already have CombiningState and
>> MergingState facility (sort of), which is what is needed, we just do not
>> have the ability to shuffle the partial states and then combine them. This
>> also relates to the inability to run stateful DoFn for merging windowFns,
>> because that is needed there as well. Is this something that is
>> fundamentally impossible to define for all runners? What is worth noting is
>> that building, shuffling and merging the state before shuffle requires
>> compatible trigger (purely based on watermark), otherwise the transform
>> fall-backs to "classical DoFn".
>>
>
> Stateful DoFn for merging windows can be defined. You could require all
> state to be mergeable and then it is automatic. Or you could have an
> "onMerge" callback. These should both be fine. The automatic version is
> less likely to have nonsensical semantics, but allowing the callback to do
> "whatever it wants" whether the result is good or not is more consistent
> with the design of stateful DoFn.
>
> Whether and where a shuffle takes place may vary. Start with the maths.
>
> Kenn
>
>
>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>> because it has essentially no users and actually no maintainers, but I have
>> a feeling there is a value in the set of operators that could be
>> transferred to Beam core, maybe. I'm pretty sure it would bring value to
>> users to have access to a "combining stateful DoFn" primitive (even better
>> would be "combining splittable DoFn").
>>
>> Looking forward to any comments on this.
>>
>>  Jan
>>
>>
>>


Re: Questions on primitive transforms hierarchy

2022-10-21 Thread Kenneth Knowles
On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský  wrote:

> Hi,
>
> I have some missing pieces in my understanding of the set of Beam's
> primitive transforms, which I'd like to fill. First a quick recap of what I
> think is the current state. We have (basically) the following primitive
> transforms:
>
>  - DoFn (stateless, stateful, splittable)
>
>  - Window
>
>  - Impulse
>
>  - GroupByKey
>
>  - Combine
>

Not a primitive, just a well-defined transform that runners can execute in
special ways.


>
>
>  - Flatten (pCollections)
>

The rest, yes.



> Inside runners, we most often transform GBK into ReduceFn
> (ReduceFnRunner), which does the actual logic for both GBK and stateful
> DoFn.
>

ReduceFnRunner is for windowing / triggers and has special feature to use a
CombineFn while doing it. Nothing to do with stateful DoFn.



> I'll compare this to the set of transforms we used to use in Euphoria
> (currently java SDK extension):
>
>  - FlatMap ~~ stateless DoFn
>
>  - Union ~~ Flatten
>
>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>

Stateful DoFn does not require associative or commutative operation, while
reduce/combine does. Windowing is really just a secondary key for
GBK/Combine that allows completion of unbounded aggregations but has no
computation associated with it.



>  - (missing Impulse)
>

Then you must have some primitive sources with splitting?


>  - (missing splittable DoFn)
>

Kind of the same question - SDF is the one and only primitive that creates
parallelism.

The ReduceStateByKey is a transform that is a "combinable stateful DoFn" -
> i.e. the state might be created pre-shuffle, on trigger the state is
> shuffled and then merged. In Beam we already have CombiningState and
> MergingState facility (sort of), which is what is needed, we just do not
> have the ability to shuffle the partial states and then combine them. This
> also relates to the inability to run stateful DoFn for merging windowFns,
> because that is needed there as well. Is this something that is
> fundamentally impossible to define for all runners? What is worth noting is
> that building, shuffling and merging the state before shuffle requires
> compatible trigger (purely based on watermark), otherwise the transform
> fall-backs to "classical DoFn".
>

Stateful DoFn for merging windows can be defined. You could require all
state to be mergeable and then it is automatic. Or you could have an
"onMerge" callback. These should both be fine. The automatic version is
less likely to have nonsensical semantics, but allowing the callback to do
"whatever it wants" whether the result is good or not is more consistent
with the design of stateful DoFn.

Whether and where a shuffle takes place may vary. Start with the maths.

Kenn


> Bottom line: I'm thinking of proposing to drop Euphoria extension, because
> it has essentially no users and actually no maintainers, but I have a
> feeling there is a value in the set of operators that could be transferred
> to Beam core, maybe. I'm pretty sure it would bring value to users to have
> access to a "combining stateful DoFn" primitive (even better would be
> "combining splittable DoFn").
>
> Looking forward to any comments on this.
>
>  Jan
>
>
>


Questions on primitive transforms hierarchy

2022-10-21 Thread Jan Lukavský

Hi,

I have some missing pieces in my understanding of the set of Beam's 
primitive transforms, which I'd like to fill. First a quick recap of 
what I think is the current state. We have (basically) the following 
primitive transforms:


 - DoFn (stateless, stateful, splittable)

 - Window

 - Impulse

 - GroupByKey

 - Combine

 - Flatten (pCollections)


Inside runners, we most often transform GBK into ReduceFn 
(ReduceFnRunner), which does the actual logic for both GBK and stateful 
DoFn.


I'll compare this to the set of transforms we used to use in Euphoria 
(currently java SDK extension):


 - FlatMap ~~ stateless DoFn

 - Union ~~ Flatten

 - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window

 - (missing Impulse)

 - (missing splittable DoFn)


The ReduceStateByKey is a transform that is a "combinable stateful DoFn" 
- i.e. the state might be created pre-shuffle, on trigger the state is 
shuffled and then merged. In Beam we already have CombiningState and 
MergingState facility (sort of), which is what is needed, we just do not 
have the ability to shuffle the partial states and then combine them. 
This also relates to the inability to run stateful DoFn for merging 
windowFns, because that is needed there as well. Is this something that 
is fundamentally impossible to define for all runners? What is worth 
noting is that building, shuffling and merging the state before shuffle 
requires compatible trigger (purely based on watermark), otherwise the 
transform fall-backs to "classical DoFn".


Bottom line: I'm thinking of proposing to drop Euphoria extension, 
because it has essentially no users and actually no maintainers, but I 
have a feeling there is a value in the set of operators that could be 
transferred to Beam core, maybe. I'm pretty sure it would bring value to 
users to have access to a "combining stateful DoFn" primitive (even 
better would be "combining splittable DoFn").


Looking forward to any comments on this.

 Jan