It means that single element replay is stable. On Thu, Aug 10, 2017 at 10:56 AM, Raghu Angadi <rang...@google.com.invalid> wrote:
> Can we define what exactly is meant by deterministic/stable/replayable > etc? > > - Does it imply a fixed order? If yes, it implies fixed order of > processElement() invocations, right? Are there any qualifiers (within a > window+key etc)? > No, no ordering guarantee. > - Does it also imply fixed length and content for value iterators? > Good point. With our current runner api, it does not. the KV<K, Iterable<V>> has no good way of being deterministic if there is late data. We could do so by forcing the Iterable to be materialized into a single element, but that would also mean that the entire Iterable must fit in memory (which at least the Dataflow runner does not require). > - Some examples to clarify nuances would be very useful. > > State durability semantics for timers that Reuven proposed seem to be > unrelated to stable input (at model level). It might be simpler to add > these semantics first. A lot of deterministic side-effects issues can be > handled by durable state in timers. One thing I like about timers approach > is that it makes the cost more transparent to the user since the state is > explicitly stored. > > > On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers > <bchamb...@google.com.invalid > > wrote: > > > I think it only makes sense in places where a user might reasonable > require > > stable input to ensure idempotency of side-effects. It also only makes > > sense in places where a runner could reasonably provide such a guarantee. > > > > A given Combine is unlikely to have side effects so it is less likely to > > benefit from stability of the input. Further, the reason writing a > Combine > > is desirable is because its execution can be split up and moved to the > > mapper-side (before the GroupByKey). But this division is inherently > > non-deterministic, and so it seems unlikely to benefit from stability. > And > > many cases where I could see wanting side-effects would end up in > > extractOutput, for which there is an easy (arguably better) solution -- > > have extractOutput return the accumulators and do the side-effects in a > > DoFn afterwards. > > > > For composites, it is a bit trickier. I could see a case for supporting > it > > on composites, but it would need to make it very clear that it only > > affected the input to the composite. If any of the operations within the > > composite were non-deterministic, then the outputs of that could be > > unstable, leading to instability in later parts of the composite. > Further, > > it doesn't seem to offer much. The composite itself doesn't perform > > side-effects, so there is no benefit to having the annotation there -- > > instead, we allow the annotation to be put where it is relevant and > > important -- on the DoFn's that actually have side-effects that require > > stability. > > > > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax <re...@google.com.invalid> > > wrote: > > > > > I don't think it really makes sense to to do this on Combine. And I > agree > > > with you, it doesn't make sense on composites either. > > > > > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner > <sweg...@google.com.invalid > > > > > > wrote: > > > > > > > Does requires-stable-input only apply to ParDo transforms? > > > > > > > > I don't think it would make sense to annotate to composite, because > > > > checkpointing should happen as close to the side-effecting operation > as > > > > possible, since upstream transforms within a composite could > introduce > > > > non-determinism. So it's the primitive transform that should own the > > > > requirement. > > > > > > > > Are there other primitives that should be annotated? 'Combine' is > > > > interesting because it optimized in Dataflow (and perhaps other > > runners) > > > to > > > > partially apply before a GroupByKey. > > > > > > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau > > <taki...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > +1 to the annotation idea, and to having it on processTimer. > > > > > > > > > > -Tyler > > > > > > > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek < > > aljos...@apache.org> > > > > > wrote: > > > > > > > > > > > +1 to the annotation approach. I outlined how implementing this > > would > > > > > work > > > > > > in the Flink runner in the Thread about the exactly-once Kafka > > Sink. > > > > > > > > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID > > > > > > wrote: > > > > > > > > > > > > > > Yes - I don't think we should try and make any deterministic > > > > guarantees > > > > > > > about what is in a bundle. Stability guarantees are per element > > > only. > > > > > > > > > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh > > > <tg...@google.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> +1 to the annotation-on-ProcessElement approach. > ProcessElement > > is > > > > the > > > > > > >> minimum implementation requirement of a DoFn, and should be > > where > > > > the > > > > > > >> processing logic which depends on characteristics of the > inputs > > > lie. > > > > > > It's a > > > > > > >> good way of signalling the requirements of the Fn, and letting > > the > > > > > > runner > > > > > > >> decide. > > > > > > >> > > > > > > >> I have a minor concern that this may not work as expected for > > > users > > > > > that > > > > > > >> try to batch remote calls in `FinishBundle` - we should make > > sure > > > we > > > > > > >> document that it is explicitly the input elements that will be > > > > > replayed, > > > > > > >> and bundles and other operational are still arbitrary. > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax > > > > <re...@google.com.invalid > > > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > >>> I think deterministic here means deterministically > replayable. > > > i.e. > > > > > no > > > > > > >>> matter how many times the element is retried, it will always > be > > > the > > > > > > same. > > > > > > >>> > > > > > > >>> I think we should also allow specifying this on processTimer. > > > This > > > > > > would > > > > > > >>> mean that any keyed state written in a previous > processElement > > > must > > > > > be > > > > > > >>> guaranteed durable before processTimer is called. > > > > > > >>> > > > > > > >>> > > > > > > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers < > > > > bchamb...@apache.org> > > > > > > >>> wrote: > > > > > > >>> > > > > > > >>>> I strongly agree with this proposal. I think moving away > from > > > > "just > > > > > > >>> insert > > > > > > >>>> a GroupByKey for one of the 3 different reasons you may want > > it" > > > > > > >> towards > > > > > > >>>> APIs that allow code to express the requirements they have > and > > > the > > > > > > >> runner > > > > > > >>>> to choose the best way to meet this is a major step forwards > > in > > > > > terms > > > > > > >> of > > > > > > >>>> portability. > > > > > > >>>> > > > > > > >>>> I think "deterministic" may be misleading. The actual > contents > > > of > > > > > the > > > > > > >>>> collection aren't deterministic if upstream computations > > aren't. > > > > The > > > > > > >>>> property we really need is that once an input may have been > > > > observed > > > > > > by > > > > > > >>> the > > > > > > >>>> side-effecting code it will never be observed with a > different > > > > > value. > > > > > > >>>> > > > > > > >>>> I would propose something RequiresStableInput, to indicate > > that > > > > the > > > > > > >> input > > > > > > >>>> must be stable as observed by the function. I could also see > > > > > something > > > > > > >>>> hinting at the fact we don't recompute the input, such as > > > > > > >>>> RequiresMemoizedInput or RequiresNoRecomputation. > > > > > > >>>> > > > > > > >>>> -- Ben > > > > > > >>>> > > > > > > >>>> P.S For anyone interested other uses of GroupByKey that we > may > > > > want > > > > > to > > > > > > >>>> discuss APIs for would be preventing retry across steps > (eg., > > > > > > >> preventing > > > > > > >>>> fusion) and redistributing inputs across workers. > > > > > > >>>> > > > > > > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles > > > > > <k...@google.com.invalid > > > > > > >>> > > > > > > >>>> wrote: > > > > > > >>>> > > > > > > >>>>> This came up again, so I wanted to push it along by > > proposing a > > > > > > >>> specific > > > > > > >>>>> API for Java that could have a derived API in Python. I am > > > > writing > > > > > > >> this > > > > > > >>>>> quickly to get something out there, so I welcome > suggestions > > > for > > > > > > >>>> revision. > > > > > > >>>>> > > > > > > >>>>> Today a DoFn has a @ProcessElement annotated method with > > > various > > > > > > >>>> automated > > > > > > >>>>> parameters, but most fundamentally this: > > > > > > >>>>> > > > > > > >>>>> @ProcessElement > > > > > > >>>>> public void process(ProcessContext ctx) { > > > > > > >>>>> ctx.element() // to access the current input element > > > > > > >>>>> ctx.output(something) // to write to default output > > collection > > > > > > >>>>> ctx.output(tag, something) // to write to other output > > > > collections > > > > > > >>>>> } > > > > > > >>>>> > > > > > > >>>>> For some time, we have hoped to unpack the context - it is > a > > > > > > >>>>> backwards-compatibility pattern made obsolete by the new > DoFn > > > > > design. > > > > > > >>> So > > > > > > >>>>> instead it would look like this: > > > > > > >>>>> > > > > > > >>>>> @ProcessElement > > > > > > >>>>> public void process(Element element, MainOutput mainOutput, > > > ...) > > > > { > > > > > > >>>>> element.get() // to access the current input element > > > > > > >>>>> mainOutput.output(something) // to write to the default > > output > > > > > > >>>> collection > > > > > > >>>>> other.output(something) // to write to other output > > collection > > > > > > >>>>> } > > > > > > >>>>> > > > > > > >>>>> I've deliberately left out the undecided syntax for side > > > outputs. > > > > > But > > > > > > >>> it > > > > > > >>>>> would be nice for the tag to be built in to the parameter > so > > it > > > > > > >> doesn't > > > > > > >>>>> have to be used when calling output(). > > > > > > >>>>> > > > > > > >>>>> One way to enhance this to deterministic input would just > be > > > > this: > > > > > > >>>>> > > > > > > >>>>> @ProcessElement > > > > > > >>>>> @RequiresDeterministicInput > > > > > > >>>>> public void process(Element element, MainOutput mainOutput, > > > ...) > > > > { > > > > > > >>>>> element.get() // to access the current input element > > > > > > >>>>> mainOutput.output(something) // to write to the default > > output > > > > > > >>>> collection > > > > > > >>>>> other.output(something) // to write to other output > > collection > > > > > > >>>>> } > > > > > > >>>>> > > > > > > >>>>> There are really a lot of places where we could put an > > > annotation > > > > > or > > > > > > >>>> change > > > > > > >>>>> a type to indicate that the input PCollection should be > > > > > > >>>>> well-defined/deterministically-replayable. I don't have a > > > really > > > > > > >>> strong > > > > > > >>>>> opinion. > > > > > > >>>>> > > > > > > >>>>> Kenn > > > > > > >>>>> > > > > > > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers > > > > > > >>>> <bchamb...@google.com.invalid > > > > > > >>>>>> > > > > > > >>>>> wrote: > > > > > > >>>>> > > > > > > >>>>>> Allowing an annotation on DoFn's that produce > deterministic > > > > output > > > > > > >>>> could > > > > > > >>>>> be > > > > > > >>>>>> added in the future, but doesn't seem like a great option. > > > > > > >>>>>> > > > > > > >>>>>> 1. It is a correctness issue to assume a DoFn is > > deterministic > > > > and > > > > > > >> be > > > > > > >>>>>> wrong, so we would need to assume all transform outputs > are > > > > > > >>>>>> non-deterministic unless annotated. Getting this correct > is > > > > > > >> difficult > > > > > > >>>>> (for > > > > > > >>>>>> example, GBK is surprisingly non-deterministic except in > > > > specific > > > > > > >>>> cases). > > > > > > >>>>>> > > > > > > >>>>>> 2. It is unlikely to be a major performance improvement, > > given > > > > > that > > > > > > >>> any > > > > > > >>>>>> non-deterministic transform prior to a sink (which are > most > > > > likely > > > > > > >> to > > > > > > >>>>>> require deterministic input) will cause additional work to > > be > > > > > > >> needed. > > > > > > >>>>>> > > > > > > >>>>>> Based on this, it seems like the risk of allowing an > > > annotation > > > > is > > > > > > >>> high > > > > > > >>>>>> while the potential for performance improvements is low. > The > > > > > > >> current > > > > > > >>>>>> proposal (not allowing an annotation) makes sense for now, > > > until > > > > > we > > > > > > >>> can > > > > > > >>>>>> demonstrate that the impact on performance is high in > cases > > > that > > > > > > >>> could > > > > > > >>>> be > > > > > > >>>>>> avoided with an annotation (in real-world use). > > > > > > >>>>>> > > > > > > >>>>>> -- Ben > > > > > > >>>>>> > > > > > > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk < > > vikky...@gmail.com> > > > > > > >> wrote: > > > > > > >>>>>> > > > > > > >>>>>> +1 for the general idea of runners handling it over > > hard-coded > > > > > > >>>>>> implementation strategy. > > > > > > >>>>>> > > > > > > >>>>>> For the Write transform I believe you are talking about > > > > > > >>>> ApplyShardingKey > > > > > > >>>>>> < > > > > > > >>>>>> > > > > > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4 > > > > > > >>>>>> c3e7fd3433/sdks/java/core/src/ > > main/java/org/apache/beam/sdk/ > > > > > > >>>>>> io/Write.java#L304 > > > > > > >>>>>>> > > > > > > >>>>>> which > > > > > > >>>>>> introduces non deterministic behavior when retried? > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> *Let a DoFn declare (mechanism not important right now) > that > > > it > > > > > > >>>>>> "requiresdeterministic input"* > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> *Each runner will need a way to induce deterministic > input - > > > the > > > > > > >>>>>> obviouschoice being a materialization.* > > > > > > >>>>>> > > > > > > >>>>>> Does this mean that a runner will always materialize (or > > > > whatever > > > > > > >> the > > > > > > >>>>>> strategy is) an input PCollection to this DoFn even though > > the > > > > > > >>>>> PCollection > > > > > > >>>>>> might have been produced by deterministic transforms? > Would > > it > > > > > make > > > > > > >>>> sense > > > > > > >>>>>> to also let DoFns declare if they produce > non-deterministic > > > > > output? > > > > > > >>>>>> > > > > > > >>>>>> -Vikas > > > > > > >>>>>> > > > > > > >>>>>> > > > > > > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk > > > <s...@google.com.invalid > > > > > > > > > > > >>>> wrote: > > > > > > >>>>>> > > > > > > >>>>>>> Hey Kenn- > > > > > > >>>>>>> > > > > > > >>>>>>> this seems important, but I don't have all the context on > > > what > > > > > > >> the > > > > > > >>>>>> problem > > > > > > >>>>>>> is. > > > > > > >>>>>>> > > > > > > >>>>>>> Can you explain this sentence "Specifically, there is > > > > > > >> pseudorandom > > > > > > >>>> data > > > > > > >>>>>>> generated and once it has been observed and used to > > produce a > > > > > > >> side > > > > > > >>>>>> effect, > > > > > > >>>>>>> it cannot be regenerated without erroneous results." ? > > > > > > >>>>>>> > > > > > > >>>>>>> Where is the pseudorandom data coming from? Perhaps a > > > concrete > > > > > > >>>> example > > > > > > >>>>>>> would help? > > > > > > >>>>>>> > > > > > > >>>>>>> S > > > > > > >>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles > > > > > > >>>> <k...@google.com.invalid > > > > > > >>>>>> > > > > > > >>>>>>> wrote: > > > > > > >>>>>>> > > > > > > >>>>>>>> Problem: > > > > > > >>>>>>>> > > > > > > >>>>>>>> I will drop all nuance and say that the `Write` > transform > > as > > > > it > > > > > > >>>>> exists > > > > > > >>>>>> in > > > > > > >>>>>>>> the SDK is incorrect until we add some specification and > > > APIs. > > > > > > >> We > > > > > > >>>>> can't > > > > > > >>>>>>>> keep shipping an SDK with an unsafe transform in it, and > > IMO > > > > > > >> this > > > > > > >>>>>>> certainly > > > > > > >>>>>>>> blocks a stable release. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Specifically, there is pseudorandom data generated and > > once > > > it > > > > > > >>> has > > > > > > >>>>> been > > > > > > >>>>>>>> observed and used to produce a side effect, it cannot be > > > > > > >>>> regenerated > > > > > > >>>>>>>> without erroneous results. > > > > > > >>>>>>>> > > > > > > >>>>>>>> This generalizes: For some side-effecting user-defined > > > > > > >> functions, > > > > > > >>>> it > > > > > > >>>>> is > > > > > > >>>>>>>> vital that even across retries/replays they have a > > > consistent > > > > > > >>> view > > > > > > >>>> of > > > > > > >>>>>> the > > > > > > >>>>>>>> contents of their input PCollection, because their > effect > > on > > > > > > >> the > > > > > > >>>>>> outside > > > > > > >>>>>>>> world cannot be retracted if/when they fail and are > > retried. > > > > > > >> Once > > > > > > >>>> the > > > > > > >>>>>>>> runner ensures a consistent view of the input, it is > then > > > > their > > > > > > >>> own > > > > > > >>>>>>>> responsibility to be idempotent. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Ideally we should specify this requirement for the > > > > user-defined > > > > > > >>>>>> function > > > > > > >>>>>>>> without imposing any particular implementation strategy > on > > > > Beam > > > > > > >>>>>> runners. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Proposal: > > > > > > >>>>>>>> > > > > > > >>>>>>>> 1. Let a DoFn declare (mechanism not important right > now) > > > that > > > > > > >> it > > > > > > >>>>>>> "requires > > > > > > >>>>>>>> deterministic input". > > > > > > >>>>>>>> > > > > > > >>>>>>>> 2. Each runner will need a way to induce deterministic > > > input - > > > > > > >>> the > > > > > > >>>>>>> obvious > > > > > > >>>>>>>> choice being a materialization. > > > > > > >>>>>>>> > > > > > > >>>>>>>> I want to keep the discussion focused, so I'm leaving > out > > > any > > > > > > >>>>>>> possibilities > > > > > > >>>>>>>> of taking this further. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Regarding performance: Today places that require this > tend > > > to > > > > > > >> be > > > > > > >>>>>> already > > > > > > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, > > since > > > > > > >> that > > > > > > >>>>> was a > > > > > > >>>>>>>> simple way to induce determinism in batch Dataflow* > > (doesn't > > > > > > >> work > > > > > > >>>> for > > > > > > >>>>>>> most > > > > > > >>>>>>>> other runners nor for streaming Dataflow). This change > > will > > > > > > >>>> replace a > > > > > > >>>>>>>> hard-coded implementation strategy with a requirement > that > > > may > > > > > > >> be > > > > > > >>>>>>> fulfilled > > > > > > >>>>>>>> in the most efficient way available. > > > > > > >>>>>>>> > > > > > > >>>>>>>> Thoughts? > > > > > > >>>>>>>> > > > > > > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially > Ben) > > > > > > >>>>>>>> > > > > > > >>>>>>>> * There is some overlap with the reshuffle/redistribute > > > > > > >>> discussion > > > > > > >>>>>>> because > > > > > > >>>>>>>> of this historical situation, but I would like to leave > > that > > > > > > >>>> broader > > > > > > >>>>>>>> discussion out of this correctness issue. > > > > > > >>>>>>>> > > > > > > >>>>>>> > > > > > > >>>>>> > > > > > > >>>>> > > > > > > >>>> > > > > > > >>> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > >