On Fri, Aug 11, 2017 at 10:52 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> On Thu, Aug 10, 2017 at 1:53 PM, Reuven Lax <re...@google.com.invalid>
> wrote:
> > On Thu, Aug 10, 2017 at 1:07 PM, Kenneth Knowles <k...@google.com.invalid
> >
> > wrote:
> >
> >>  > > >    - Does it also imply fixed length and content for value
> >> iterators?
> >> > > >
> >>
> >> The concept of "value iterator" brings up a nit.
> >>
> >> First, there is no such concept in the Beam model, and I don't think
> there
> >> should be. I don't think we should special case GBK if we can avoid it.
> If
> >> a PCollection contains elements of type KV<K, Iterable<V>> we should
> have
> >> the same definition of "stable" whether or not it came from GBK. So I
> >> think, for some definition of "fixed content", it should imply fixed
> length
> >> and content.
> >
> > I agree. Of course it might take longer to implement this well. It's
> > perfectly acceptable for this to temporarily not work in this scenario,
> as
> > long as we know how to make it work.
>
> The question here is whether the ordering is part of the "content" of
> an iterable.
>

My initial instinct was to say yes - but maybe it should not be until Beam
has a first-class notion of sorted values after a GBK?


>
> >> But actually what is the definition of whether two Iterable<V> values
> have
> >> the same content, since we don't require a deterministic coder for the
> >> values? I think this question applies to this proposal in general.
> >> Obviously we expect that if the runner is replaying from persisted
> >> serialized data, then it suffices, but that is too operational for a
> good
> >> definition IMO.
> >
> > I don't quite understand what you mean here. The user has a definition of
> > equality which is independent of coders (e.g. equals() in Java).
>
> I agree with this definition; the difficulty here arises in the fact
> that the Iterable interface does not specify what equals() should
> return for collections that differ only in order. (E.g. Set and List
> both implement Iterable). Beam places no constraint on the ordering of
> values; is this part of the equivalence class/is equivalent according
> to the user. In other words, does that mean on replay if the ordering
> differs it's considered "stable."
>
> > Even
> > non-deterministic coders are expected to deserialize to objects that the
> > user finds equivalent. Or if you want to word things a bit more formally
> -
> > the set of encoded objects can be partitioned into equivalence classes,
> > based on the users' notion of equality. A deterministic coder is simply
> one
> > in which each equivalence class has cardinality one.
> >
> > the KV<K,
> >> > > > Iterable<V>> has no good way of being deterministic if there is
> late
> >> > > data.
> >> > > > We could do so by forcing the Iterable to be materialized into a
> >> single
> >> > > > element, but that would also mean that the entire Iterable must
> fit
> >> in
> >> > > > memory (which at least the Dataflow runner does not require).
> >> >
> >>
> >> As I mention above, the iterable is semantically [part of] a single
> >> element. So just to unpack this, to make sure we are talking about the
> same
> >> thing, I think you are talking about GBK as implemented via GBKO + GABW.
> >>
> >> When the output of GABW is required to be stable but the output of GBKO
> is
> >> not stable, we don't have stability for free in all cases by inserting a
> >> GBK, but require something more to make the output of GABW stable, in
> the
> >> worst case a full materialization.
> >>
> >
> > Correct. My point is that there are alternate, cheaper ways of doing
> this.
> > If GABW stores state in an ordered list, it can simply checkpoint a
> market
> > into that list to ensure that the output is stabl.
>
> In the presence of non-trivial triggering and/or late data, I'm not so
> sure this is "easy." E.g. A bundle may fail, and more data may come in
> from upstream (and get appended to the buffer) before it is retried.
>

That will still work. If the subsequent ParDo has processed the Iterable,
that means we'll have successfully checkpointed a marker to the list (using
whatever technique the runner supports). More data coming in will get
appended after the marker, so we can ensure that the retry still sees the
same elements in the Iterable.

Reply via email to