Would it be helpful to add these answers to the Beam docs?

On Mon, Nov 14, 2022 at 4:35 AM Jan Lukavský <je...@seznam.cz> wrote:

> I somehow missed these answers, Reuven and Kenn, thanks for the
> discussion, it helped me clarify my understanding.
>
>  Jan
> On 10/26/22 21:10, Kenneth Knowles wrote:
>
>
>
> On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> > Not quite IMO. It is a subtle difference. Perhaps these transforms can
>> be *implemented* using stateful DoFn, but defining their semantics directly
>> at a high level is more powerful. The higher level we can make transforms,
>> the more flexibility we have in the runners. You *could* suggest that we
>> take the same approach as we do with Combine: not a primitive, but a
>> special transform that we optimize. You could say that "vanilla ParDo" is a
>> composite that has a stateful ParDo implementation, but a runner can
>> implement the composite more efficiently (without a shuffle). Same with
>> CoGBK. You could say that there is a default expansion of CoGBK that uses
>> stateful DoFn (which implies a shuffle) but that smart runners will not use
>> that expansion.
>>
>> Yes, semantics > optimizations. For optimizations Beam already has a
>> facility - PTransformOverride. There is no fundamental difference about how
>> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart
>> runners will not use that expansion". This is essentially the root of this
>> discussion.
>>
>> If I rephrase it:
>>
>>  a) why do we distinguish between "some" actually composite transforms
>> treating them as primitive, while others have expansions, although the
>> fundamental reasoning seems the same for both (performance)?
>>
> It is identical to why you can choose different axioms for formal logic
> and get all the same provable statements. You have to choose something. But
> certainly a runner that just executes primitives is the bare minimum and
> all runners are really expected to take advantage of known composites.
> Before portability, the benefit was minimal to have the runner (written in
> Java) execute a transform directly vs calling a user DoFn. Now with
> portability it could be huge if it avoids a Fn API crossing.
>
>  b) is there a fundamental reason why we do not support stateful DoFn for
>> merging windows?
>>
> No reason. The original design was to force users to only use "mergeable"
> state in a stateful DoFn for merging windows. That is an annoying
> restriction that we don't really need. So I think the best way is to have
> an OnMerge callback. The internal legacy Java APIs for this are way too
> complex. But portability wire protocols support it (I think?) and making a
> good user facing API for all the SDKs shouldn't be too hard.
>
> Kenn
>
>
>> I feel that these are related and have historical reasons, but I'd like
>> to know that for sure. :)
>>
>>  Jan
>> On 10/24/22 19:59, Kenneth Knowles wrote:
>>
>>
>>
>> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> On 10/22/22 21:47, Reuven Lax via dev wrote:
>>>
>>> I think we stated that CoGroupbyKey was also a primitive, though in
>>> practice it's implemented in terms of GroupByKey today.
>>>
>>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have some missing pieces in my understanding of the set of Beam's
>>>>> primitive transforms, which I'd like to fill. First a quick recap of what 
>>>>> I
>>>>> think is the current state. We have (basically) the following primitive
>>>>> transforms:
>>>>>
>>>>>  - DoFn (stateless, stateful, splittable)
>>>>>
>>>>>  - Window
>>>>>
>>>>>  - Impulse
>>>>>
>>>>>  - GroupByKey
>>>>>
>>>>>  - Combine
>>>>>
>>>>
>>>> Not a primitive, just a well-defined transform that runners can execute
>>>> in special ways.
>>>>
>>> Yep, OK, agree. Performance is orthogonal to semantics.
>>>
>>>
>>>>
>>>>>
>>>>>
>>>>>  - Flatten (pCollections)
>>>>>
>>>>
>>>> The rest, yes.
>>>>
>>>>
>>>>
>>>>> Inside runners, we most often transform GBK into ReduceFn
>>>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful
>>>>> DoFn.
>>>>>
>>>>
>>>> ReduceFnRunner is for windowing / triggers and has special feature to
>>>> use a CombineFn while doing it. Nothing to do with stateful DoFn.
>>>>
>>> My bad, wrong wording. The point was that *all* of the semantics of GBK
>>> and Combine can be defined in terms of stateful DoFn. There are some
>>> changes needed to stateful DoFn to support the Combine functionality. But
>>> as mentioned above - optimization is orthogonal to semantics.
>>>
>>
>> Not quite IMO. It is a subtle difference. Perhaps these transforms can be
>> *implemented* using stateful DoFn, but defining their semantics directly at
>> a high level is more powerful. The higher level we can make transforms, the
>> more flexibility we have in the runners. You *could* suggest that we take
>> the same approach as we do with Combine: not a primitive, but a special
>> transform that we optimize. You could say that "vanilla ParDo" is a
>> composite that has a stateful ParDo implementation, but a runner can
>> implement the composite more efficiently (without a shuffle). Same with
>> CoGBK. You could say that there is a default expansion of CoGBK that uses
>> stateful DoFn (which implies a shuffle) but that smart runners will not use
>> that expansion.
>>
>>>
>>>>
>>>>
>>>>> I'll compare this to the set of transforms we used to use in Euphoria
>>>>> (currently java SDK extension):
>>>>>
>>>>>  - FlatMap ~~ stateless DoFn
>>>>>
>>>>>  - Union ~~ Flatten
>>>>>
>>>>>  - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
>>>>>
>>>>
>>>> Stateful DoFn does not require associative or commutative operation,
>>>> while reduce/combine does. Windowing is really just a secondary key for
>>>> GBK/Combine that allows completion of unbounded aggregations but has no
>>>> computation associated with it.
>>>>
>>> Merging WindowFn contains some computation. The fact that stateful DoFn
>>> do not require specific form of reduce function is precisely what makes it
>>> the actual primitive, no?
>>>
>>>
>>>>
>>>>
>>>>>  - (missing Impulse)
>>>>>
>>>>
>>>> Then you must have some primitive sources with splitting?
>>>>
>>>>
>>>>>  - (missing splittable DoFn)
>>>>>
>>>>
>>>> Kind of the same question - SDF is the one and only primitive that
>>>> creates parallelism.
>>>>
>>> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
>>> in Beam works on top of PCollecions and therefore does not deal with IOs.
>>>
>>>
>>>> The ReduceStateByKey is a transform that is a "combinable stateful
>>>>> DoFn" - i.e. the state might be created pre-shuffle, on trigger the state
>>>>> is shuffled and then merged. In Beam we already have CombiningState and
>>>>> MergingState facility (sort of), which is what is needed, we just do not
>>>>> have the ability to shuffle the partial states and then combine them. This
>>>>> also relates to the inability to run stateful DoFn for merging windowFns,
>>>>> because that is needed there as well. Is this something that is
>>>>> fundamentally impossible to define for all runners? What is worth noting 
>>>>> is
>>>>> that building, shuffling and merging the state before shuffle requires
>>>>> compatible trigger (purely based on watermark), otherwise the transform
>>>>> fall-backs to "classical DoFn".
>>>>>
>>>>
>>>> Stateful DoFn for merging windows can be defined. You could require all
>>>> state to be mergeable and then it is automatic. Or you could have an
>>>> "onMerge" callback. These should both be fine. The automatic version is
>>>> less likely to have nonsensical semantics, but allowing the callback to do
>>>> "whatever it wants" whether the result is good or not is more consistent
>>>> with the design of stateful DoFn.
>>>>
>>> Yes, but this is the same for CombineFn, right? The merge (or combine)
>>> has to be correctly aligned with the computation. The current situation is
>>> that we do not support stateful DoFns for merging WindowFn [1].
>>>
>>>
>>>> Whether and where a shuffle takes place may vary. Start with the maths.
>>>>
>>> Shuffle happens at least whenever there is a need to regroup keys. I'm
>>> not sure which maths you refer to, can you clarify please?
>>>
>>>  Jan
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
>>>
>>>
>>>> Kenn
>>>>
>>>>
>>>>> Bottom line: I'm thinking of proposing to drop Euphoria extension,
>>>>> because it has essentially no users and actually no maintainers, but I 
>>>>> have
>>>>> a feeling there is a value in the set of operators that could be
>>>>> transferred to Beam core, maybe. I'm pretty sure it would bring value to
>>>>> users to have access to a "combining stateful DoFn" primitive (even better
>>>>> would be "combining splittable DoFn").
>>>>>
>>>>> Looking forward to any comments on this.
>>>>>
>>>>>  Jan
>>>>>
>>>>>
>>>>>

Reply via email to