On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote:

> On 10/22/22 21:47, Reuven Lax via dev wrote:
>
> I think we stated that CoGroupbyKey was also a primitive, though in
> practice it's implemented in terms of GroupByKey today.
>
> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> I have some missing pieces in my understanding of the set of Beam's
>>> primitive transforms, which I'd like to fill. First a quick recap of what I
>>> think is the current state. We have (basically) the following primitive
>>> transforms:
>>>
>>>  - DoFn (stateless, stateful, splittable)
>>>
>>>  - Window
>>>
>>>  - Impulse
>>>
>>>  - GroupByKey
>>>
>>>  - Combine
>>>
>>
>> Not a primitive, just a well-defined transform that runners can execute
>> in special ways.
>>
> Yep, OK, agree. Performance is orthogonal to semantics.
>
>
>>
>>>
>>>
>>>  - Flatten (pCollections)
>>>
>>
>> The rest, yes.
>>
>>
>>
>>> Inside runners, we most often transform GBK into ReduceFn
>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>> DoFn.
>>>
>>
>> ReduceFnRunner is for windowing / triggers and has special feature to use
>> a CombineFn while doing it. Nothing to do with stateful DoFn.
>>
> My bad, wrong wording. The point was that *all* of the semantics of GBK
> and Combine can be defined in terms of stateful DoFn. There are some
> changes needed to stateful DoFn to support the Combine functionality. But
> as mentioned above - optimization is orthogonal to semantics.
>

Not quite IMO. It is a subtle difference. Perhaps these transforms can be
*implemented* using stateful DoFn, but defining their semantics directly at
a high level is more powerful. The higher level we can make transforms, the
more flexibility we have in the runners. You *could* suggest that we take
the same approach as we do with Combine: not a primitive, but a special
transform that we optimize. You could say that "vanilla ParDo" is a
composite that has a stateful ParDo implementation, but a runner can
implement the composite more efficiently (without a shuffle). Same with
CoGBK. You could say that there is a default expansion of CoGBK that uses
stateful DoFn (which implies a shuffle) but that smart runners will not use
that expansion.

>
>>
>>
>>> I'll compare this to the set of transforms we used to use in Euphoria
>>> (currently java SDK extension):
>>>
>>>  - FlatMap ~~ stateless DoFn
>>>
>>>  - Union ~~ Flatten
>>>
>>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>>
>>
>> Stateful DoFn does not require associative or commutative operation,
>> while reduce/combine does. Windowing is really just a secondary key for
>> GBK/Combine that allows completion of unbounded aggregations but has no
>> computation associated with it.
>>
> Merging WindowFn contains some computation. The fact that stateful DoFn do
> not require specific form of reduce function is precisely what makes it the
> actual primitive, no?
>
>
>>
>>
>>>  - (missing Impulse)
>>>
>>
>> Then you must have some primitive sources with splitting?
>>
>>
>>>  - (missing splittable DoFn)
>>>
>>
>> Kind of the same question - SDF is the one and only primitive that
>> creates parallelism.
>>
> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
> in Beam works on top of PCollecions and therefore does not deal with IOs.
>
>
>> The ReduceStateByKey is a transform that is a "combinable stateful DoFn"
>>> - i.e. the state might be created pre-shuffle, on trigger the state is
>>> shuffled and then merged. In Beam we already have CombiningState and
>>> MergingState facility (sort of), which is what is needed, we just do not
>>> have the ability to shuffle the partial states and then combine them. This
>>> also relates to the inability to run stateful DoFn for merging windowFns,
>>> because that is needed there as well. Is this something that is
>>> fundamentally impossible to define for all runners? What is worth noting is
>>> that building, shuffling and merging the state before shuffle requires
>>> compatible trigger (purely based on watermark), otherwise the transform
>>> fall-backs to "classical DoFn".
>>>
>>
>> Stateful DoFn for merging windows can be defined. You could require all
>> state to be mergeable and then it is automatic. Or you could have an
>> "onMerge" callback. These should both be fine. The automatic version is
>> less likely to have nonsensical semantics, but allowing the callback to do
>> "whatever it wants" whether the result is good or not is more consistent
>> with the design of stateful DoFn.
>>
> Yes, but this is the same for CombineFn, right? The merge (or combine) has
> to be correctly aligned with the computation. The current situation is that
> we do not support stateful DoFns for merging WindowFn [1].
>
>
>> Whether and where a shuffle takes place may vary. Start with the maths.
>>
> Shuffle happens at least whenever there is a need to regroup keys. I'm not
> sure which maths you refer to, can you clarify please?
>
>  Jan
>
> [1]
> https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
>
>
>> Kenn
>>
>>
>>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>>> because it has essentially no users and actually no maintainers, but I have
>>> a feeling there is a value in the set of operators that could be
>>> transferred to Beam core, maybe. I'm pretty sure it would bring value to
>>> users to have access to a "combining stateful DoFn" primitive (even better
>>> would be "combining splittable DoFn").
>>>
>>> Looking forward to any comments on this.
>>>
>>>  Jan
>>>
>>>
>>>

Reply via email to