On 10/22/22 21:47, Reuven Lax via dev wrote:
I think we stated that CoGroupbyKey was also a primitive, though in
practice it's implemented in terms of GroupByKey today.
On Fri, Oct 21, 2022 at 3:05 PM Kenneth Knowles <k...@apache.org> wrote:
On Fri, Oct 21, 2022 at 5:24 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
I have some missing pieces in my understanding of the set of
Beam's primitive transforms, which I'd like to fill. First a
quick recap of what I think is the current state. We have
(basically) the following primitive transforms:
- DoFn (stateless, stateful, splittable)
- Window
- Impulse
- GroupByKey
- Combine
Not a primitive, just a well-defined transform that runners can
execute in special ways.
Yep, OK, agree. Performance is orthogonal to semantics.
- Flatten (pCollections)
The rest, yes.
Inside runners, we most often transform GBK into ReduceFn
(ReduceFnRunner), which does the actual logic for both GBK and
stateful DoFn.
ReduceFnRunner is for windowing / triggers and has special feature
to use a CombineFn while doing it. Nothing to do with stateful DoFn.
My bad, wrong wording. The point was that *all* of the semantics of GBK
and Combine can be defined in terms of stateful DoFn. There are some
changes needed to stateful DoFn to support the Combine functionality.
But as mentioned above - optimization is orthogonal to semantics.
I'll compare this to the set of transforms we used to use in
Euphoria (currently java SDK extension):
- FlatMap ~~ stateless DoFn
- Union ~~ Flatten
- ReduceStateByKey ~~ stateful DoFn, GBK, Combine, Window
Stateful DoFn does not require associative or commutative
operation, while reduce/combine does. Windowing is really just a
secondary key for GBK/Combine that allows completion of unbounded
aggregations but has no computation associated with it.
Merging WindowFn contains some computation. The fact that stateful DoFn
do not require specific form of reduce function is precisely what makes
it the actual primitive, no?
- (missing Impulse)
Then you must have some primitive sources with splitting?
- (missing splittable DoFn)
Kind of the same question - SDF is the one and only primitive that
creates parallelism.
Original Euphoria had an analogy to (Un)boundedReader. The SDK extension
in Beam works on top of PCollecions and therefore does not deal with IOs.
The ReduceStateByKey is a transform that is a "combinable
stateful DoFn" - i.e. the state might be created pre-shuffle,
on trigger the state is shuffled and then merged. In Beam we
already have CombiningState and MergingState facility (sort
of), which is what is needed, we just do not have the ability
to shuffle the partial states and then combine them. This also
relates to the inability to run stateful DoFn for merging
windowFns, because that is needed there as well. Is this
something that is fundamentally impossible to define for all
runners? What is worth noting is that building, shuffling and
merging the state before shuffle requires compatible trigger
(purely based on watermark), otherwise the transform
fall-backs to "classical DoFn".
Stateful DoFn for merging windows can be defined. You could
require all state to be mergeable and then it is automatic. Or you
could have an "onMerge" callback. These should both be fine. The
automatic version is less likely to have nonsensical semantics,
but allowing the callback to do "whatever it wants" whether the
result is good or not is more consistent with the design of
stateful DoFn.
Yes, but this is the same for CombineFn, right? The merge (or combine)
has to be correctly aligned with the computation. The current situation
is that we do not support stateful DoFns for merging WindowFn [1].
Whether and where a shuffle takes place may vary. Start with the
maths.
Shuffle happens at least whenever there is a need to regroup keys. I'm
not sure which maths you refer to, can you clarify please?
Jan
[1]
https://github.com/apache/beam/blob/45b6ac71a87bb2ed83613c90d35ef2d0752266bf/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L106
Kenn
Bottom line: I'm thinking of proposing to drop Euphoria
extension, because it has essentially no users and actually no
maintainers, but I have a feeling there is a value in the set
of operators that could be transferred to Beam core, maybe.
I'm pretty sure it would bring value to users to have access
to a "combining stateful DoFn" primitive (even better would be
"combining splittable DoFn").
Looking forward to any comments on this.
Jan