On Mon, Oct 24, 2022 at 5:51 AM Jan Lukavský
<je...@seznam.cz> wrote:
On 10/22/22 21:47, Reuven Lax via dev wrote:
I think we stated that CoGroupbyKey was also a
primitive, though in practice it's implemented in terms
of GroupByKey today.
On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles
<k...@apache.org> wrote:
On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský
<je...@seznam.cz> wrote:
Hi,
I have some missing pieces in my understanding
of the set of Beam's primitive transforms,
which I'd like to fill. First a quick recap of
what I think is the current state. We have
(basically) the following primitive transforms:
- DoFn (stateless, stateful, splittable)
- Window
- Impulse
- GroupByKey
- Combine
Not a primitive, just a well-defined transform that
runners can execute in special ways.
Yep, OK, agree. Performance is orthogonal to semantics.
- Flatten (pCollections)
The rest, yes.
Inside runners, we most often transform GBK
into ReduceFn (ReduceFnRunner), which does the
actual logic for both GBK and stateful DoFn.
ReduceFnRunner is for windowing / triggers and has
special feature to use a CombineFn while doing it.
Nothing to do with stateful DoFn.
My bad, wrong wording. The point was that *all* of the
semantics of GBK and Combine can be defined in terms of
stateful DoFn. There are some changes needed to stateful
DoFn to support the Combine functionality. But as
mentioned above - optimization is orthogonal to semantics.
Not quite IMO. It is a subtle difference. Perhaps these
transforms can be *implemented* using stateful DoFn, but
defining their semantics directly at a high level is more
powerful. The higher level we can make transforms, the more
flexibility we have in the runners. You *could* suggest that
we take the same approach as we do with Combine: not a
primitive, but a special transform that we optimize. You
could say that "vanilla ParDo" is a composite that has a
stateful ParDo implementation, but a runner can implement
the composite more efficiently (without a shuffle). Same
with CoGBK. You could say that there is a default expansion
of CoGBK that uses stateful DoFn (which implies a shuffle)
but that smart runners will not use that expansion.
I'll compare this to the set of transforms we
used to use in Euphoria (currently java SDK
extension):
- FlatMap ~~ stateless DoFn
- Union ~~ Flatten
- ReduceStateByKey ~~ stateful DoFn, GBK,
Combine, Window
Stateful DoFn does not require associative or
commutative operation, while reduce/combine does.
Windowing is really just a secondary key for
GBK/Combine that allows completion of unbounded
aggregations but has no computation associated with it.
Merging WindowFn contains some computation. The fact
that stateful DoFn do not require specific form of
reduce function is precisely what makes it the actual
primitive, no?
- (missing Impulse)
Then you must have some primitive sources with
splitting?
- (missing splittable DoFn)
Kind of the same question - SDF is the one and only
primitive that creates parallelism.
Original Euphoria had an analogy to (Un)boundedReader.
The SDK extension in Beam works on top of PCollecions
and therefore does not deal with IOs.
The ReduceStateByKey is a transform that is a
"combinable stateful DoFn" - i.e. the state
might be created pre-shuffle, on trigger the
state is shuffled and then merged. In Beam we
already have CombiningState and MergingState
facility (sort of), which is what is needed, we
just do not have the ability to shuffle the
partial states and then combine them. This also
relates to the inability to run stateful DoFn
for merging windowFns, because that is needed
there as well. Is this something that is
fundamentally impossible to define for all
runners? What is worth noting is that building,
shuffling and merging the state before shuffle
requires compatible trigger (purely based on
watermark), otherwise the transform fall-backs
to "classical DoFn".
Stateful DoFn for merging windows can be defined.
You could require all state to be mergeable and
then it is automatic. Or you could have an
"onMerge" callback. These should both be fine. The
automatic version is less likely to have
nonsensical semantics, but allowing the callback to
do "whatever it wants" whether the result is good
or not is more consistent with the design of
stateful DoFn.
Yes, but this is the same for CombineFn, right? The
merge (or combine) has to be correctly aligned with the
computation. The current situation is that we do not
support stateful DoFns for merging WindowFn [1].
Whether and where a shuffle takes place may vary.
Start with the maths.
Shuffle happens at least whenever there is a need to
regroup keys. I'm not sure which maths you refer to, can
you clarify please?
Jan
[1]
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
Kenn
Bottom line: I'm thinking of proposing to drop
Euphoria extension, because it has essentially
no users and actually no maintainers, but I
have a feeling there is a value in the set of
operators that could be transferred to Beam
core, maybe. I'm pretty sure it would bring
value to users to have access to a "combining
stateful DoFn" primitive (even better would be
"combining splittable DoFn").
Looking forward to any comments on this.
Jan