On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský <je...@seznam.cz> wrote:
On 10/22/22 21:47, Reuven Lax via dev wrote:
I think we stated that CoGroupbyKey was also a primitive, though
in practice it's implemented in terms of GroupByKey today.
On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org>
wrote:
On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
<je...@seznam.cz> wrote:
Hi,
I have some missing pieces in my understanding of the set
of Beam's primitive transforms, which I'd like to fill.
First a quick recap of what I think is the current state.
We have (basically) the following primitive transforms:
- DoFn (stateless, stateful, splittable)
- Window
- Impulse
- GroupByKey
- Combine
Not a primitive, just a well-defined transform that runners
can execute in special ways.
Yep, OK, agree. Performance is orthogonal to semantics.
- Flatten (pCollections)
The rest, yes.
Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both
GBK and stateful DoFn.
ReduceFnRunner is for windowing / triggers and has special
feature to use a CombineFn while doing it. Nothing to do with
stateful DoFn.
My bad, wrong wording. The point was that *all* of the semantics
of GBK and Combine can be defined in terms of stateful DoFn. There
are some changes needed to stateful DoFn to support the Combine
functionality. But as mentioned above - optimization is orthogonal
to semantics.
Not quite IMO. It is a subtle difference. Perhaps these transforms can
be *implemented* using stateful DoFn, but defining their semantics
directly at a high level is more powerful. The higher level we can
make transforms, the more flexibility we have in the runners. You
*could* suggest that we take the same approach as we do with Combine:
not a primitive, but a special transform that we optimize. You could
say that "vanilla ParDo" is a composite that has a stateful ParDo
implementation, but a runner can implement the composite more
efficiently (without a shuffle). Same with CoGBK. You could say that
there is a default expansion of CoGBK that uses stateful DoFn (which
implies a shuffle) but that smart runners will not use that expansion.
I'll compare this to the set of transforms we used to use
in Euphoria (currently java SDK extension):
- FlatMap ~~ stateless DoFn
- Union ~~ Flatten
- ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really
just a secondary key for GBK/Combine that allows completion
of unbounded aggregations but has no computation associated
with it.
Merging WindowFn contains some computation. The fact that stateful
DoFn do not require specific form of reduce function is precisely
what makes it the actual primitive, no?
- (missing Impulse)
Then you must have some primitive sources with splitting?
- (missing splittable DoFn)
Kind of the same question - SDF is the one and only primitive
that creates parallelism.
Original Euphoria had an analogy to (Un)boundedReader. The SDK
extension in Beam works on top of PCollecions and therefore does
not deal with IOs.
The ReduceStateByKey is a transform that is a "combinable
stateful DoFn" - i.e. the state might be created
pre-shuffle, on trigger the state is shuffled and then
merged. In Beam we already have CombiningState and
MergingState facility (sort of), which is what is needed,
we just do not have the ability to shuffle the partial
states and then combine them. This also relates to the
inability to run stateful DoFn for merging windowFns,
because that is needed there as well. Is this something
that is fundamentally impossible to define for all
runners? What is worth noting is that building, shuffling
and merging the state before shuffle requires compatible
trigger (purely based on watermark), otherwise the
transform fall-backs to "classical DoFn".
Stateful DoFn for merging windows can be defined. You could
require all state to be mergeable and then it is automatic.
Or you could have an "onMerge" callback. These should both be
fine. The automatic version is less likely to have
nonsensical semantics, but allowing the callback to do
"whatever it wants" whether the result is good or not is more
consistent with the design of stateful DoFn.
Yes, but this is the same for CombineFn, right? The merge (or
combine) has to be correctly aligned with the computation. The
current situation is that we do not support stateful DoFns for
merging WindowFn [1].
Whether and where a shuffle takes place may vary. Start with
the maths.
Shuffle happens at least whenever there is a need to regroup keys.
I'm not sure which maths you refer to, can you clarify please?
Jan
[1]
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
Kenn
Bottom line: I'm thinking of proposing to drop Euphoria
extension, because it has essentially no users and
actually no maintainers, but I have a feeling there is a
value in the set of operators that could be transferred
to Beam core, maybe. I'm pretty sure it would bring value
to users to have access to a "combining stateful DoFn"
primitive (even better would be "combining splittable
DoFn").
Looking forward to any comments on this.
Jan