Would it be helpful to add these answers to the Beam docs? On Mon, Nov 14, 2022 at 4:35 AM Jan Lukavský <je...@seznam.cz> wrote:
> I somehow missed these answers, Reuven and Kenn, thanks for the > discussion, it helped me clarify my understanding. > > Jan > On 10/26/22 21:10, Kenneth Knowles wrote: > > > > On Tue, Oct 25, 2022 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote: > >> > Not quite IMO. It is a subtle difference. Perhaps these transforms can >> be *implemented* using stateful DoFn, but defining their semantics directly >> at a high level is more powerful. The higher level we can make transforms, >> the more flexibility we have in the runners. You *could* suggest that we >> take the same approach as we do with Combine: not a primitive, but a >> special transform that we optimize. You could say that "vanilla ParDo" is a >> composite that has a stateful ParDo implementation, but a runner can >> implement the composite more efficiently (without a shuffle). Same with >> CoGBK. You could say that there is a default expansion of CoGBK that uses >> stateful DoFn (which implies a shuffle) but that smart runners will not use >> that expansion. >> >> Yes, semantics > optimizations. For optimizations Beam already has a >> facility - PTransformOverride. There is no fundamental difference about how >> we treat Combine wrt GBK. It *can* be expanded using GBK, but "smart >> runners will not use that expansion". This is essentially the root of this >> discussion. >> >> If I rephrase it: >> >> a) why do we distinguish between "some" actually composite transforms >> treating them as primitive, while others have expansions, although the >> fundamental reasoning seems the same for both (performance)? >> > It is identical to why you can choose different axioms for formal logic > and get all the same provable statements. You have to choose something. But > certainly a runner that just executes primitives is the bare minimum and > all runners are really expected to take advantage of known composites. > Before portability, the benefit was minimal to have the runner (written in > Java) execute a transform directly vs calling a user DoFn. Now with > portability it could be huge if it avoids a Fn API crossing. > > b) is there a fundamental reason why we do not support stateful DoFn for >> merging windows? >> > No reason. The original design was to force users to only use "mergeable" > state in a stateful DoFn for merging windows. That is an annoying > restriction that we don't really need. So I think the best way is to have > an OnMerge callback. The internal legacy Java APIs for this are way too > complex. But portability wire protocols support it (I think?) and making a > good user facing API for all the SDKs shouldn't be too hard. > > Kenn > > >> I feel that these are related and have historical reasons, but I'd like >> to know that for sure. :) >> >> Jan >> On 10/24/22 19:59, Kenneth Knowles wrote: >> >> >> >> On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> On 10/22/22 21:47, Reuven Lax via dev wrote: >>> >>> I think we stated that CoGroupbyKey was also a primitive, though in >>> practice it's implemented in terms of GroupByKey today. >>> >>> On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> >>>> >>>> On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Hi, >>>>> >>>>> I have some missing pieces in my understanding of the set of Beam's >>>>> primitive transforms, which I'd like to fill. First a quick recap of what >>>>> I >>>>> think is the current state. We have (basically) the following primitive >>>>> transforms: >>>>> >>>>> - DoFn (stateless, stateful, splittable) >>>>> >>>>> - Window >>>>> >>>>> - Impulse >>>>> >>>>> - GroupByKey >>>>> >>>>> - Combine >>>>> >>>> >>>> Not a primitive, just a well-defined transform that runners can execute >>>> in special ways. >>>> >>> Yep, OK, agree. Performance is orthogonal to semantics. >>> >>> >>>> >>>>> >>>>> >>>>> - Flatten (pCollections) >>>>> >>>> >>>> The rest, yes. >>>> >>>> >>>> >>>>> Inside runners, we most often transform GBK into ReduceFn >>>>> (ReduceFnRunner), which does the actual logic for both GBK and stateful >>>>> DoFn. >>>>> >>>> >>>> ReduceFnRunner is for windowing / triggers and has special feature to >>>> use a CombineFn while doing it. Nothing to do with stateful DoFn. >>>> >>> My bad, wrong wording. The point was that *all* of the semantics of GBK >>> and Combine can be defined in terms of stateful DoFn. There are some >>> changes needed to stateful DoFn to support the Combine functionality. But >>> as mentioned above - optimization is orthogonal to semantics. >>> >> >> Not quite IMO. It is a subtle difference. Perhaps these transforms can be >> *implemented* using stateful DoFn, but defining their semantics directly at >> a high level is more powerful. The higher level we can make transforms, the >> more flexibility we have in the runners. You *could* suggest that we take >> the same approach as we do with Combine: not a primitive, but a special >> transform that we optimize. You could say that "vanilla ParDo" is a >> composite that has a stateful ParDo implementation, but a runner can >> implement the composite more efficiently (without a shuffle). Same with >> CoGBK. You could say that there is a default expansion of CoGBK that uses >> stateful DoFn (which implies a shuffle) but that smart runners will not use >> that expansion. >> >>> >>>> >>>> >>>>> I'll compare this to the set of transforms we used to use in Euphoria >>>>> (currently java SDK extension): >>>>> >>>>> - FlatMap ~~ stateless DoFn >>>>> >>>>> - Union ~~ Flatten >>>>> >>>>> - ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window >>>>> >>>> >>>> Stateful DoFn does not require associative or commutative operation, >>>> while reduce/combine does. Windowing is really just a secondary key for >>>> GBK/Combine that allows completion of unbounded aggregations but has no >>>> computation associated with it. >>>> >>> Merging WindowFn contains some computation. The fact that stateful DoFn >>> do not require specific form of reduce function is precisely what makes it >>> the actual primitive, no? >>> >>> >>>> >>>> >>>>> - (missing Impulse) >>>>> >>>> >>>> Then you must have some primitive sources with splitting? >>>> >>>> >>>>> - (missing splittable DoFn) >>>>> >>>> >>>> Kind of the same question - SDF is the one and only primitive that >>>> creates parallelism. >>>> >>> Original Euphoria had an analogy to (Un)boundedReader. The SDK extension >>> in Beam works on top of PCollecions and therefore does not deal with IOs. >>> >>> >>>> The ReduceStateByKey is a transform that is a "combinable stateful >>>>> DoFn" - i.e. the state might be created pre-shuffle, on trigger the state >>>>> is shuffled and then merged. In Beam we already have CombiningState and >>>>> MergingState facility (sort of), which is what is needed, we just do not >>>>> have the ability to shuffle the partial states and then combine them. This >>>>> also relates to the inability to run stateful DoFn for merging windowFns, >>>>> because that is needed there as well. Is this something that is >>>>> fundamentally impossible to define for all runners? What is worth noting >>>>> is >>>>> that building, shuffling and merging the state before shuffle requires >>>>> compatible trigger (purely based on watermark), otherwise the transform >>>>> fall-backs to "classical DoFn". >>>>> >>>> >>>> Stateful DoFn for merging windows can be defined. You could require all >>>> state to be mergeable and then it is automatic. Or you could have an >>>> "onMerge" callback. These should both be fine. The automatic version is >>>> less likely to have nonsensical semantics, but allowing the callback to do >>>> "whatever it wants" whether the result is good or not is more consistent >>>> with the design of stateful DoFn. >>>> >>> Yes, but this is the same for CombineFn, right? The merge (or combine) >>> has to be correctly aligned with the computation. The current situation is >>> that we do not support stateful DoFns for merging WindowFn [1]. >>> >>> >>>> Whether and where a shuffle takes place may vary. Start with the maths. >>>> >>> Shuffle happens at least whenever there is a need to regroup keys. I'm >>> not sure which maths you refer to, can you clarify please? >>> >>> Jan >>> >>> [1] >>> https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106 >>> >>> >>>> Kenn >>>> >>>> >>>>> Bottom line: I'm thinking of proposing to drop Euphoria extension, >>>>> because it has essentially no users and actually no maintainers, but I >>>>> have >>>>> a feeling there is a value in the set of operators that could be >>>>> transferred to Beam core, maybe. I'm pretty sure it would bring value to >>>>> users to have access to a "combining stateful DoFn" primitive (even better >>>>> would be "combining splittable DoFn"). >>>>> >>>>> Looking forward to any comments on this. >>>>> >>>>> Jan >>>>> >>>>> >>>>>