On 10/22/22 21:47, Reuven Lax via dev wrote:
I think we stated that CoGroupbyKey was also a primitive, though in practice it's implemented in terms of GroupByKey today.

On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote:



    On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:

        Hi,

        I have some missing pieces in my understanding of the set of
        Beam's primitive transforms, which I'd like to fill. First a
        quick recap of what I think is the current state. We have
        (basically) the following primitive transforms:

         - DoFn (stateless, stateful, splittable)

         - Window

         - Impulse

         - GroupByKey

         - Combine


    Not a primitive, just a well-defined transform that runners can
    execute in special ways.

Yep, OK, agree. Performance is orthogonal to semantics.



         - Flatten (pCollections)


    The rest, yes.

        Inside runners, we most often transform GBK into ReduceFn
        (ReduceFnRunner), which does the actual logic for both GBK and
        stateful DoFn.


    ReduceFnRunner is for windowing / triggers and has special feature
    to use a CombineFn while doing it. Nothing to do with stateful DoFn.

My bad, wrong wording. The point was that *all* of the semantics of GBK and Combine can be defined in terms of stateful DoFn. There are some changes needed to stateful DoFn to support the Combine functionality. But as mentioned above - optimization is orthogonal to semantics.

        I'll compare this to the set of transforms we used to use in
        Euphoria (currently java SDK extension):

         - FlatMap ~~ stateless DoFn

         - Union ~~ Flatten

         - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window


    Stateful DoFn does not require associative or commutative
    operation, while reduce/combine does. Windowing is really just a
    secondary key for GBK/Combine that allows completion of unbounded
    aggregations but has no computation associated with it.

Merging WindowFn contains some computation. The fact that stateful DoFn do not require specific form of reduce function is precisely what makes it the actual primitive, no?


         - (missing Impulse)


    Then you must have some primitive sources with splitting?

         - (missing splittable DoFn)


    Kind of the same question - SDF is the one and only primitive that
    creates parallelism.

Original Euphoria had an analogy to (Un)boundedReader. The SDK extension in Beam works on top of PCollecions and therefore does not deal with IOs.


        The ReduceStateByKey is a transform that is a "combinable
        stateful DoFn" - i.e. the state might be created pre-shuffle,
        on trigger the state is shuffled and then merged. In Beam we
        already have CombiningState and MergingState facility (sort
        of), which is what is needed, we just do not have the ability
        to shuffle the partial states and then combine them. This also
        relates to the inability to run stateful DoFn for merging
        windowFns, because that is needed there as well. Is this
        something that is fundamentally impossible to define for all
        runners? What is worth noting is that building, shuffling and
        merging the state before shuffle requires compatible trigger
        (purely based on watermark), otherwise the transform
        fall-backs to "classical DoFn".


    Stateful DoFn for merging windows can be defined. You could
    require all state to be mergeable and then it is automatic. Or you
    could have an "onMerge" callback. These should both be fine. The
    automatic version is less likely to have nonsensical semantics,
    but allowing the callback to do "whatever it wants" whether the
    result is good or not is more consistent with the design of
    stateful DoFn.

Yes, but this is the same for CombineFn, right? The merge (or combine) has to be correctly aligned with the computation. The current situation is that we do not support stateful DoFns for merging WindowFn [1].


    Whether and where a shuffle takes place may vary. Start with the
    maths.

Shuffle happens at least whenever there is a need to regroup keys. I'm not sure which maths you refer to, can you clarify please?

 Jan

[1] https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106


    Kenn

        Bottom line: I'm thinking of proposing to drop Euphoria
        extension, because it has essentially no users and actually no
        maintainers, but I have a feeling there is a value in the set
        of operators that could be transferred to Beam core, maybe.
        I'm pretty sure it would bring value to users to have access
        to a "combining stateful DoFn" primitive (even better would be
        "combining splittable DoFn").

        Looking forward to any comments on this.

         Jan

Reply via email to