I don't think it is necessary in this particular case.

In general, it would be nice to document design decisions that were made during the history of Beam and which let to some aspects of the current implementation. But I'm afraid it would be rather costly and time consuming. We have design docs, which should be fine for most cases.

 Jan

On 11/14/22 15:25, Sachin Agarwal via dev wrote:
Would it be helpful to add these answers to the Beam docs?

On Mon, Nov 14, 2022 at 4:35 AM Jan Lukavský <je...@seznam.cz> wrote:

    I somehow missed these answers, Reuven and Kenn, thanks for the
    discussion, it helped me clarify my understanding.

     Jan

    On 10/26/22 21:10, Kenneth Knowles wrote:


    On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:

        > Not quite IMO. It is a subtle difference. Perhaps these
        transforms can be *implemented* using stateful DoFn, but
        defining their semantics directly at a high level is more
        powerful. The higher level we can make transforms, the more
        flexibility we have in the runners. You *could* suggest that
        we take the same approach as we do with Combine: not a
        primitive, but a special transform that we optimize. You
        could say that "vanilla ParDo" is a composite that has a
        stateful ParDo implementation, but a runner can implement the
        composite more efficiently (without a shuffle). Same with
        CoGBK. You could say that there is a default expansion of
        CoGBK that uses stateful DoFn (which implies a shuffle) but
        that smart runners will not use that expansion.

        Yes, semantics > optimizations. For optimizations Beam
        already has a facility - PTransformOverride. There is no
        fundamental difference about how we treat Combine wrt GBK. It
        *can* be expanded using GBK, but "smart runners will not use
        that expansion". This is essentially the root of this discussion.

        If I rephrase it:

         a) why do we distinguish between "some" actually composite
        transforms treating them as primitive, while others have
        expansions, although the fundamental reasoning seems the same
        for both (performance)?

    It is identical to why you can choose different axioms for formal
    logic and get all the same provable statements. You have to
    choose something. But certainly a runner that just executes
    primitives is the bare minimum and all runners are really
    expected to take advantage of known composites. Before
    portability, the benefit was minimal to have the runner (written
    in Java) execute a transform directly vs calling a user DoFn. Now
    with portability it could be huge if it avoids a Fn API crossing.

         b) is there a fundamental reason why we do not support
        stateful DoFn for merging windows?

    No reason. The original design was to force users to only use
    "mergeable" state in a stateful DoFn for merging windows. That is
    an annoying restriction that we don't really need. So I think the
    best way is to have an OnMerge callback. The internal legacy Java
    APIs for this are way too complex. But portability wire protocols
    support it (I think?) and making a good user facing API for all
    the SDKs shouldn't be too hard.

    Kenn

        I feel that these are related and have historical reasons,
        but I'd like to know that for sure. :)

         Jan

        On 10/24/22 19:59, Kenneth Knowles wrote:


        On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            On 10/22/22 21:47, Reuven Lax via dev wrote:
            I think we stated that CoGroupbyKey was also a
            primitive, though in practice it's implemented in terms
            of GroupByKey today.

            On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
            <k...@apache.org> wrote:



                On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
                <je...@seznam.cz> wrote:

                    Hi,

                    I have some missing pieces in my understanding
                    of the set of Beam's primitive transforms,
                    which I'd like to fill. First a quick recap of
                    what I think is the current state. We have
                    (basically) the following primitive transforms:

                     - DoFn (stateless, stateful, splittable)

                     - Window

                     - Impulse

                     - GroupByKey

                     - Combine


                Not a primitive, just a well-defined transform that
                runners can execute in special ways.

            Yep, OK, agree. Performance is orthogonal to semantics.



                     - Flatten (pCollections)


                The rest, yes.

                    Inside runners, we most often transform GBK
                    into ReduceFn (ReduceFnRunner), which does the
                    actual logic for both GBK and stateful DoFn.


                ReduceFnRunner is for windowing / triggers and has
                special feature to use a CombineFn while doing it.
                Nothing to do with stateful DoFn.

            My bad, wrong wording. The point was that *all* of the
            semantics of GBK and Combine can be defined in terms of
            stateful DoFn. There are some changes needed to stateful
            DoFn to support the Combine functionality. But as
            mentioned above - optimization is orthogonal to semantics.


        Not quite IMO. It is a subtle difference. Perhaps these
        transforms can be *implemented* using stateful DoFn, but
        defining their semantics directly at a high level is more
        powerful. The higher level we can make transforms, the more
        flexibility we have in the runners. You *could* suggest that
        we take the same approach as we do with Combine: not a
        primitive, but a special transform that we optimize. You
        could say that "vanilla ParDo" is a composite that has a
        stateful ParDo implementation, but a runner can implement
        the composite more efficiently (without a shuffle). Same
        with CoGBK. You could say that there is a default expansion
        of CoGBK that uses stateful DoFn (which implies a shuffle)
        but that smart runners will not use that expansion.

                    I'll compare this to the set of transforms we
                    used to use in Euphoria (currently java SDK
                    extension):

                     - FlatMap ~~ stateless DoFn

                     - Union ~~ Flatten

                     - ReduceStateByKey ~~ stateful DoFn, GBK,
                    Combine, Window


                Stateful DoFn does not require associative or
                commutative operation, while reduce/combine does.
                Windowing is really just a secondary key for
                GBK/Combine that allows completion of unbounded
                aggregations but has no computation associated with it.

            Merging WindowFn contains some computation. The fact
            that stateful DoFn do not require specific form of
            reduce function is precisely what makes it the actual
            primitive, no?


                     - (missing Impulse)


                Then you must have some primitive sources with
                splitting?

                     - (missing splittable DoFn)


                Kind of the same question - SDF is the one and only
                primitive that creates parallelism.

            Original Euphoria had an analogy to (Un)boundedReader.
            The SDK extension in Beam works on top of PCollecions
            and therefore does not deal with IOs.


                    The ReduceStateByKey is a transform that is a
                    "combinable stateful DoFn" - i.e. the state
                    might be created pre-shuffle, on trigger the
                    state is shuffled and then merged. In Beam we
                    already have CombiningState and MergingState
                    facility (sort of), which is what is needed, we
                    just do not have the ability to shuffle the
                    partial states and then combine them. This also
                    relates to the inability to run stateful DoFn
                    for merging windowFns, because that is needed
                    there as well. Is this something that is
                    fundamentally impossible to define for all
                    runners? What is worth noting is that building,
                    shuffling and merging the state before shuffle
                    requires compatible trigger (purely based on
                    watermark), otherwise the transform fall-backs
                    to "classical DoFn".


                Stateful DoFn for merging windows can be defined.
                You could require all state to be mergeable and
                then it is automatic. Or you could have an
                "onMerge" callback. These should both be fine. The
                automatic version is less likely to have
                nonsensical semantics, but allowing the callback to
                do "whatever it wants" whether the result is good
                or not is more consistent with the design of
                stateful DoFn.

            Yes, but this is the same for CombineFn, right? The
            merge (or combine) has to be correctly aligned with the
            computation. The current situation is that we do not
            support stateful DoFns for merging WindowFn [1].


                Whether and where a shuffle takes place may vary.
                Start with the maths.

            Shuffle happens at least whenever there is a need to
            regroup keys. I'm not sure which maths you refer to, can
            you clarify please?

             Jan

            [1]
            
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106


                Kenn

                    Bottom line: I'm thinking of proposing to drop
                    Euphoria extension, because it has essentially
                    no users and actually no maintainers, but I
                    have a feeling there is a value in the set of
                    operators that could be transferred to Beam
                    core, maybe. I'm pretty sure it would bring
                    value to users to have access to a "combining
                    stateful DoFn" primitive (even better would be
                    "combining splittable DoFn").

                    Looking forward to any comments on this.

                     Jan

Reply via email to