@Amit: Yes, Flink is more "what you write is what you get". For example, in Flink we have a Fold function for windows which cannot be efficiently computed with merging windows (it would require using a "group by" window and then folding the iterable). We just don't allow this.
For Beam, I think it's ok if we clearly define Combine in terms of GroupByKey | CombineValues (which we do). With different runners it's hard to enforce common optimisation strategies. On Sun, 23 Oct 2016 at 06:02 Robert Bradshaw <rober...@google.com.invalid> wrote: > On Sat, Oct 22, 2016 at 2:38 AM, Amit Sela <amitsel...@gmail.com> wrote: > > I understand the semantics, but I feel like there might be a different > > point of view for open-source runners. > > It seems we're losing a major promise of the runner interchangeability > story if different runners can give different results for a > well-defined transformation. I strongly feel we should avoid that path > whenever possible. Specifically in this case Combine.perKey should > mean the same thing on all runners (namely its composite definition), > and only be executed differently when it's safe to do so. > > > Dataflow is a service, and it tries to do it's best to optimize execution > > while users don't have to worry about internal implementation (they are > not > > aware of it). > > I can assure > > < > https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html > > > > you that for Spark users, applying groupByKey instead of combinePerKey is > > an important note. > > For sure. Dataflow calls this out too. See the second star at > https://cloud.google.com/dataflow/model/combine#using-combine-perkey > (though it's not called out as prominently as it is for Spark > users--likely should be more). Beam documentation should make this > point as well. > > > @Aljoscha do Flink users (working on Flink native API) usually care about > > this difference of implementation ? > > Any other runners that can provide input ? > > IIRC, Flink and Dataflow (and, trivially, the direct runner) all avoid > this unsafe optimization when merging windows are mixed with > non-global side inputs. > > Note also that the user of the Combine.perKey transform may not know > the choice of windowing of the main or side inputs, so can't make this > determination of whether it's safe to use this optimization. (As a > concrete example, suppose I created a TopNPercent transform that did a > global count and passed that as a side input to the Top CombineFn.) > > > On Sat, Oct 22, 2016 at 2:25 AM Robert Bradshaw > <rober...@google.com.invalid> > > wrote: > > > > Combine.perKey() is defined as GroupByKey() | Combine.values(). > > > > A runner is free, in fact encouraged, to take advantage of the > > associative properties of CombineFn to compute the result of > > GroupByKey() | Combine.values() as cheaply as possible, but it is > > incorrect to produce something that could not have been produced by > > this composite implementation. (In the case of deterministic trigger > > firing, (e.g. the default trigger), plus assuming of course a > > associative, deterministic CombineFn, there is exactly one correct > > output for every input no matter the WindowFns). > > > > A corollary to this is that we cannot apply combining operations that > > inspect the main input window (including side inputs where the mapping > > is anything but the constant map (like to GlobalWindow)) until the > > main input window is known. > > > > > > On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela <amitsel...@gmail.com> wrote: > >> Please excuse my typos and apply "s/differ/defer/g" ;-). > >> Amit. > >> > >> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela <amitsel...@gmail.com> wrote: > >> > >>> I'd like to raise an issue that was discussed in BEAM-696 > >>> <https://issues.apache.org/jira/browse/BEAM-696>. > >>> I won't recap here because it would be extensive (and probably > >>> exhaustive), and I'd also like to restart the discussion here rather > then > >>> summarize it. > >>> > >>> *The problem* > >>> In the case of (main) input in a merging window (e.g. Sessions) with > >>> sideInputs, pre-combining might lead to non-deterministic behaviour, > for > >>> example: > >>> Main input: e1 (time: 3), e2 (time: 5) > >>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, > > 8), > >>> combined together the merging of their windows yields [3, 8). > >>> Matching SideInputs with FixedWindows of size 2 should yield - e1 > > matching > >>> sideInput window [4, 6), e2 [6, 8), merged [6, 8). > >>> Now, if the sideInput is used in a merging step of the combine, and > both > >>> elements are a part of the same bundle, the sideInput accessed will > >>> correspond to [6, 8) which is the expected behaviour, but if e1 is > >>> pre-combined in a separate bundle, it will access sideInput for [4, 6) > >>> which is wrong. > >>> ** this can tends to be a bit confusing, so any > > clarifications/corrections > >>> are most welcomed.* > >>> > >>> *Solutions* > >>> The optimal solution would be to differ until trigger in case of > merging > >>> windows with sideInputs that are not "agnostic" to such behaviour, but > > this > >>> is clearly not feasible since the nature and use of sideInputs in > >>> CombineFns are opaque. > >>> Second best would be to differ until trigger *only* if sideInputs are > >>> used for merging windows - pretty sure this is how Flink and Dataflow > > (soon > >>> Spark) runners do that. > >>> > >>> *Tradeoffs* > >>> This seems like a very user-friendly way to apply authored pipelines > >>> correctly, but this also means that users who called for a Combine > >>> transformation will get a Grouping transformation instead (sort of the > >>> opposite of combiner lifting ? a combiner unwrapping ?). > >>> For the SDK, Combine is simply a composite transform, but keep in mind > >>> that this affects runner optimization. > >>> The price to pay here is (1) shuffle all elements into a single bundle > >>> (the cost varies according to a runner's typical bundle size) (2) state > > can > >>> grow as processing is differed and not compacted until triggered. > >>> > >>> IMHO, the execution should remain faithful to what the pipeline states, > >>> and if this results in errors, well... it happens. > >>> There are many legitimate use cases where an actual GroupByKey should > be > >>> used (regardless of sideInputs), such as sequencing of events in a > > window, > >>> and I don't see the difference here. > >>> > >>> As stated above, I'm (almost) not recapping anyones notes as they are > >>> persisted in BEAM-696, so if you had something to say please provide > you > >>> input here. > >>> I will note that Ben Chambers and Pei He mentioned that even with > >>> differing, this could still run into some non-determinism if there are > >>> triggers controlling when we extract output because non-merging > windows' > >>> trigger firing is non-deterministic. > >>> > >>> Thanks, > >>> Amit > >>> > >>> >