It means that single element replay is stable.

On Thu, Aug 10, 2017 at 10:56 AM, Raghu Angadi <rang...@google.com.invalid>
wrote:

> Can we define what exactly is meant by deterministic/stable/replayable
> etc?
>
>    - Does it imply a fixed order? If yes, it implies fixed order of
>    processElement() invocations, right? Are there any qualifiers (within a
>    window+key etc)?
>

No, no ordering guarantee.


>    - Does it also imply fixed length and content for value iterators?
>

Good point. With our current runner api, it does not. the KV<K,
Iterable<V>> has no good way of being deterministic if there is late data.
We could do so by forcing the Iterable to be materialized into a single
element, but that would also mean that the entire Iterable must fit in
memory (which at least the Dataflow runner does not require).


>    - Some examples to clarify nuances would be very useful.
>
> State durability semantics for timers that Reuven proposed seem to be
> unrelated to stable input (at model level). It might be simpler to add
> these semantics first. A lot of deterministic side-effects issues can be
> handled by durable state in timers. One thing I like about timers approach
> is that it makes the cost more transparent to the user since the state is
> explicitly stored.
>
>
> On Thu, Aug 10, 2017 at 10:02 AM, Ben Chambers
> <bchamb...@google.com.invalid
> > wrote:
>
> > I think it only makes sense in places where a user might reasonable
> require
> > stable input to ensure idempotency of side-effects. It also only makes
> > sense in places where a runner could reasonably provide such a guarantee.
> >
> > A given Combine is unlikely to have side effects so it is less likely to
> > benefit from stability of the input. Further, the reason writing a
> Combine
> > is desirable is because its execution can be split up and moved to the
> > mapper-side (before the GroupByKey). But this division is inherently
> > non-deterministic, and so it seems unlikely to benefit from stability.
> And
> > many cases where I could see wanting side-effects would end up in
> > extractOutput, for which there is an easy (arguably better) solution --
> > have extractOutput return the accumulators and do the side-effects in a
> > DoFn afterwards.
> >
> > For composites, it is a bit trickier. I could see a case for supporting
> it
> > on composites, but it would need to make it very clear that it only
> > affected the input to the composite. If any of the operations within the
> > composite were non-deterministic, then the outputs of that could be
> > unstable, leading to instability in later parts of the composite.
> Further,
> > it doesn't seem to offer much. The composite itself doesn't perform
> > side-effects, so there is no benefit to having the annotation there --
> > instead, we allow the annotation to be put where it is relevant and
> > important -- on the DoFn's that actually have side-effects that require
> > stability.
> >
> > On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax <re...@google.com.invalid>
> > wrote:
> >
> > > I don't think it really makes sense to to do this on Combine. And I
> agree
> > > with you, it doesn't make sense on composites either.
> > >
> > > On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner
> <sweg...@google.com.invalid
> > >
> > > wrote:
> > >
> > > > Does requires-stable-input only apply to ParDo transforms?
> > > >
> > > > I don't think it would make sense to annotate to composite, because
> > > > checkpointing should happen as close to the side-effecting operation
> as
> > > > possible, since upstream transforms within a composite could
> introduce
> > > > non-determinism. So it's the primitive transform that should own the
> > > > requirement.
> > > >
> > > > Are there other primitives that should be annotated? 'Combine' is
> > > > interesting because it optimized in Dataflow (and perhaps other
> > runners)
> > > to
> > > > partially apply before a GroupByKey.
> > > >
> > > > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau
> > <taki...@google.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > +1 to the annotation idea, and to having it on processTimer.
> > > > >
> > > > > -Tyler
> > > > >
> > > > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <
> > aljos...@apache.org>
> > > > > wrote:
> > > > >
> > > > > > +1 to the annotation approach. I outlined how implementing this
> > would
> > > > > work
> > > > > > in the Flink runner in the Thread about the exactly-once Kafka
> > Sink.
> > > > > >
> > > > > > > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID
> >
> > > > wrote:
> > > > > > >
> > > > > > > Yes - I don't think we should try and make any deterministic
> > > > guarantees
> > > > > > > about what is in a bundle. Stability guarantees are per element
> > > only.
> > > > > > >
> > > > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
> > > <tg...@google.com.invalid
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> +1 to the annotation-on-ProcessElement approach.
> ProcessElement
> > is
> > > > the
> > > > > > >> minimum implementation requirement of a DoFn, and should be
> > where
> > > > the
> > > > > > >> processing logic which depends on characteristics of the
> inputs
> > > lie.
> > > > > > It's a
> > > > > > >> good way of signalling the requirements of the Fn, and letting
> > the
> > > > > > runner
> > > > > > >> decide.
> > > > > > >>
> > > > > > >> I have a minor concern that this may not work as expected for
> > > users
> > > > > that
> > > > > > >> try to batch remote calls in `FinishBundle` - we should make
> > sure
> > > we
> > > > > > >> document that it is explicitly the input elements that will be
> > > > > replayed,
> > > > > > >> and bundles and other operational are still arbitrary.
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> > > > <re...@google.com.invalid
> > > > > >
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> I think deterministic here means deterministically
> replayable.
> > > i.e.
> > > > > no
> > > > > > >>> matter how many times the element is retried, it will always
> be
> > > the
> > > > > > same.
> > > > > > >>>
> > > > > > >>> I think we should also allow specifying this on processTimer.
> > > This
> > > > > > would
> > > > > > >>> mean that any keyed state written in a previous
> processElement
> > > must
> > > > > be
> > > > > > >>> guaranteed durable before processTimer is called.
> > > > > > >>>
> > > > > > >>>
> > > > > > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <
> > > > bchamb...@apache.org>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > > >>>> I strongly agree with this proposal. I think moving away
> from
> > > > "just
> > > > > > >>> insert
> > > > > > >>>> a GroupByKey for one of the 3 different reasons you may want
> > it"
> > > > > > >> towards
> > > > > > >>>> APIs that allow code to express the requirements they have
> and
> > > the
> > > > > > >> runner
> > > > > > >>>> to choose the best way to meet this is a major step forwards
> > in
> > > > > terms
> > > > > > >> of
> > > > > > >>>> portability.
> > > > > > >>>>
> > > > > > >>>> I think "deterministic" may be misleading. The actual
> contents
> > > of
> > > > > the
> > > > > > >>>> collection aren't deterministic if upstream computations
> > aren't.
> > > > The
> > > > > > >>>> property we really need is that once an input may have been
> > > > observed
> > > > > > by
> > > > > > >>> the
> > > > > > >>>> side-effecting code it will never be observed with a
> different
> > > > > value.
> > > > > > >>>>
> > > > > > >>>> I would propose something RequiresStableInput, to indicate
> > that
> > > > the
> > > > > > >> input
> > > > > > >>>> must be stable as observed by the function. I could also see
> > > > > something
> > > > > > >>>> hinting at the fact we don't recompute the input, such as
> > > > > > >>>> RequiresMemoizedInput or RequiresNoRecomputation.
> > > > > > >>>>
> > > > > > >>>> -- Ben
> > > > > > >>>>
> > > > > > >>>> P.S For anyone interested other uses of GroupByKey that we
> may
> > > > want
> > > > > to
> > > > > > >>>> discuss APIs for would be preventing retry across steps
> (eg.,
> > > > > > >> preventing
> > > > > > >>>> fusion) and redistributing inputs across workers.
> > > > > > >>>>
> > > > > > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> > > > > <k...@google.com.invalid
> > > > > > >>>
> > > > > > >>>> wrote:
> > > > > > >>>>
> > > > > > >>>>> This came up again, so I wanted to push it along by
> > proposing a
> > > > > > >>> specific
> > > > > > >>>>> API for Java that could have a derived API in Python. I am
> > > > writing
> > > > > > >> this
> > > > > > >>>>> quickly to get something out there, so I welcome
> suggestions
> > > for
> > > > > > >>>> revision.
> > > > > > >>>>>
> > > > > > >>>>> Today a DoFn has a @ProcessElement annotated method with
> > > various
> > > > > > >>>> automated
> > > > > > >>>>> parameters, but most fundamentally this:
> > > > > > >>>>>
> > > > > > >>>>> @ProcessElement
> > > > > > >>>>> public void process(ProcessContext ctx) {
> > > > > > >>>>>  ctx.element() // to access the current input element
> > > > > > >>>>>  ctx.output(something) // to write to default output
> > collection
> > > > > > >>>>>  ctx.output(tag, something) // to write to other output
> > > > collections
> > > > > > >>>>> }
> > > > > > >>>>>
> > > > > > >>>>> For some time, we have hoped to unpack the context - it is
> a
> > > > > > >>>>> backwards-compatibility pattern made obsolete by the new
> DoFn
> > > > > design.
> > > > > > >>> So
> > > > > > >>>>> instead it would look like this:
> > > > > > >>>>>
> > > > > > >>>>> @ProcessElement
> > > > > > >>>>> public void process(Element element, MainOutput mainOutput,
> > > ...)
> > > > {
> > > > > > >>>>>  element.get() // to access the current input element
> > > > > > >>>>>  mainOutput.output(something) // to write to the default
> > output
> > > > > > >>>> collection
> > > > > > >>>>>  other.output(something) // to write to other output
> > collection
> > > > > > >>>>> }
> > > > > > >>>>>
> > > > > > >>>>> I've deliberately left out the undecided syntax for side
> > > outputs.
> > > > > But
> > > > > > >>> it
> > > > > > >>>>> would be nice for the tag to be built in to the parameter
> so
> > it
> > > > > > >> doesn't
> > > > > > >>>>> have to be used when calling output().
> > > > > > >>>>>
> > > > > > >>>>> One way to enhance this to deterministic input would just
> be
> > > > this:
> > > > > > >>>>>
> > > > > > >>>>> @ProcessElement
> > > > > > >>>>> @RequiresDeterministicInput
> > > > > > >>>>> public void process(Element element, MainOutput mainOutput,
> > > ...)
> > > > {
> > > > > > >>>>>  element.get() // to access the current input element
> > > > > > >>>>>  mainOutput.output(something) // to write to the default
> > output
> > > > > > >>>> collection
> > > > > > >>>>>  other.output(something) // to write to other output
> > collection
> > > > > > >>>>> }
> > > > > > >>>>>
> > > > > > >>>>> There are really a lot of places where we could put an
> > > annotation
> > > > > or
> > > > > > >>>> change
> > > > > > >>>>> a type to indicate that the input PCollection should be
> > > > > > >>>>> well-defined/deterministically-replayable. I don't have a
> > > really
> > > > > > >>> strong
> > > > > > >>>>> opinion.
> > > > > > >>>>>
> > > > > > >>>>> Kenn
> > > > > > >>>>>
> > > > > > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > > > > > >>>> <bchamb...@google.com.invalid
> > > > > > >>>>>>
> > > > > > >>>>> wrote:
> > > > > > >>>>>
> > > > > > >>>>>> Allowing an annotation on DoFn's that produce
> deterministic
> > > > output
> > > > > > >>>> could
> > > > > > >>>>> be
> > > > > > >>>>>> added in the future, but doesn't seem like a great option.
> > > > > > >>>>>>
> > > > > > >>>>>> 1. It is a correctness issue to assume a DoFn is
> > deterministic
> > > > and
> > > > > > >> be
> > > > > > >>>>>> wrong, so we would need to assume all transform outputs
> are
> > > > > > >>>>>> non-deterministic unless annotated. Getting this correct
> is
> > > > > > >> difficult
> > > > > > >>>>> (for
> > > > > > >>>>>> example, GBK is surprisingly non-deterministic except in
> > > > specific
> > > > > > >>>> cases).
> > > > > > >>>>>>
> > > > > > >>>>>> 2. It is unlikely to be a major performance improvement,
> > given
> > > > > that
> > > > > > >>> any
> > > > > > >>>>>> non-deterministic transform prior to a sink (which are
> most
> > > > likely
> > > > > > >> to
> > > > > > >>>>>> require deterministic input) will cause additional work to
> > be
> > > > > > >> needed.
> > > > > > >>>>>>
> > > > > > >>>>>> Based on this, it seems like the risk of allowing an
> > > annotation
> > > > is
> > > > > > >>> high
> > > > > > >>>>>> while the potential for performance improvements is low.
> The
> > > > > > >> current
> > > > > > >>>>>> proposal (not allowing an annotation) makes sense for now,
> > > until
> > > > > we
> > > > > > >>> can
> > > > > > >>>>>> demonstrate that the impact on performance is high in
> cases
> > > that
> > > > > > >>> could
> > > > > > >>>> be
> > > > > > >>>>>> avoided with an annotation (in real-world use).
> > > > > > >>>>>>
> > > > > > >>>>>> -- Ben
> > > > > > >>>>>>
> > > > > > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <
> > vikky...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> +1 for the general idea of runners handling it over
> > hard-coded
> > > > > > >>>>>> implementation strategy.
> > > > > > >>>>>>
> > > > > > >>>>>> For the Write transform I believe you are talking about
> > > > > > >>>> ApplyShardingKey
> > > > > > >>>>>> <
> > > > > > >>>>>>
> > > > > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> > > > > > >>>>>> c3e7fd3433/sdks/java/core/src/
> > main/java/org/apache/beam/sdk/
> > > > > > >>>>>> io/Write.java#L304
> > > > > > >>>>>>>
> > > > > > >>>>>> which
> > > > > > >>>>>> introduces non deterministic behavior when retried?
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> *Let a DoFn declare (mechanism not important right now)
> that
> > > it
> > > > > > >>>>>> "requiresdeterministic input"*
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> *Each runner will need a way to induce deterministic
> input -
> > > the
> > > > > > >>>>>> obviouschoice being a materialization.*
> > > > > > >>>>>>
> > > > > > >>>>>> Does this mean that a runner will always materialize (or
> > > > whatever
> > > > > > >> the
> > > > > > >>>>>> strategy is) an input PCollection to this DoFn even though
> > the
> > > > > > >>>>> PCollection
> > > > > > >>>>>> might have been produced by deterministic transforms?
> Would
> > it
> > > > > make
> > > > > > >>>> sense
> > > > > > >>>>>> to also let DoFns declare if they produce
> non-deterministic
> > > > > output?
> > > > > > >>>>>>
> > > > > > >>>>>> -Vikas
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk
> > > <s...@google.com.invalid
> > > > >
> > > > > > >>>> wrote:
> > > > > > >>>>>>
> > > > > > >>>>>>> Hey Kenn-
> > > > > > >>>>>>>
> > > > > > >>>>>>> this seems important, but I don't have all the context on
> > > what
> > > > > > >> the
> > > > > > >>>>>> problem
> > > > > > >>>>>>> is.
> > > > > > >>>>>>>
> > > > > > >>>>>>> Can you explain this sentence "Specifically, there is
> > > > > > >> pseudorandom
> > > > > > >>>> data
> > > > > > >>>>>>> generated and once it has been observed and used to
> > produce a
> > > > > > >> side
> > > > > > >>>>>> effect,
> > > > > > >>>>>>> it cannot be regenerated without erroneous results." ?
> > > > > > >>>>>>>
> > > > > > >>>>>>> Where is the pseudorandom data coming from? Perhaps a
> > > concrete
> > > > > > >>>> example
> > > > > > >>>>>>> would help?
> > > > > > >>>>>>>
> > > > > > >>>>>>> S
> > > > > > >>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > > > > > >>>> <k...@google.com.invalid
> > > > > > >>>>>>
> > > > > > >>>>>>> wrote:
> > > > > > >>>>>>>
> > > > > > >>>>>>>> Problem:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I will drop all nuance and say that the `Write`
> transform
> > as
> > > > it
> > > > > > >>>>> exists
> > > > > > >>>>>> in
> > > > > > >>>>>>>> the SDK is incorrect until we add some specification and
> > > APIs.
> > > > > > >> We
> > > > > > >>>>> can't
> > > > > > >>>>>>>> keep shipping an SDK with an unsafe transform in it, and
> > IMO
> > > > > > >> this
> > > > > > >>>>>>> certainly
> > > > > > >>>>>>>> blocks a stable release.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Specifically, there is pseudorandom data generated and
> > once
> > > it
> > > > > > >>> has
> > > > > > >>>>> been
> > > > > > >>>>>>>> observed and used to produce a side effect, it cannot be
> > > > > > >>>> regenerated
> > > > > > >>>>>>>> without erroneous results.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> This generalizes: For some side-effecting user-defined
> > > > > > >> functions,
> > > > > > >>>> it
> > > > > > >>>>> is
> > > > > > >>>>>>>> vital that even across retries/replays they have a
> > > consistent
> > > > > > >>> view
> > > > > > >>>> of
> > > > > > >>>>>> the
> > > > > > >>>>>>>> contents of their input PCollection, because their
> effect
> > on
> > > > > > >> the
> > > > > > >>>>>> outside
> > > > > > >>>>>>>> world cannot be retracted if/when they fail and are
> > retried.
> > > > > > >> Once
> > > > > > >>>> the
> > > > > > >>>>>>>> runner ensures a consistent view of the input, it is
> then
> > > > their
> > > > > > >>> own
> > > > > > >>>>>>>> responsibility to be idempotent.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Ideally we should specify this requirement for the
> > > > user-defined
> > > > > > >>>>>> function
> > > > > > >>>>>>>> without imposing any particular implementation strategy
> on
> > > > Beam
> > > > > > >>>>>> runners.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Proposal:
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> 1. Let a DoFn declare (mechanism not important right
> now)
> > > that
> > > > > > >> it
> > > > > > >>>>>>> "requires
> > > > > > >>>>>>>> deterministic input".
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> 2. Each runner will need a way to induce deterministic
> > > input -
> > > > > > >>> the
> > > > > > >>>>>>> obvious
> > > > > > >>>>>>>> choice being a materialization.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> I want to keep the discussion focused, so I'm leaving
> out
> > > any
> > > > > > >>>>>>> possibilities
> > > > > > >>>>>>>> of taking this further.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Regarding performance: Today places that require this
> tend
> > > to
> > > > > > >> be
> > > > > > >>>>>> already
> > > > > > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations,
> > since
> > > > > > >> that
> > > > > > >>>>> was a
> > > > > > >>>>>>>> simple way to induce determinism in batch Dataflow*
> > (doesn't
> > > > > > >> work
> > > > > > >>>> for
> > > > > > >>>>>>> most
> > > > > > >>>>>>>> other runners nor for streaming Dataflow). This change
> > will
> > > > > > >>>> replace a
> > > > > > >>>>>>>> hard-coded implementation strategy with a requirement
> that
> > > may
> > > > > > >> be
> > > > > > >>>>>>> fulfilled
> > > > > > >>>>>>>> in the most efficient way available.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Thoughts?
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially
> Ben)
> > > > > > >>>>>>>>
> > > > > > >>>>>>>> * There is some overlap with the reshuffle/redistribute
> > > > > > >>> discussion
> > > > > > >>>>>>> because
> > > > > > >>>>>>>> of this historical situation, but I would like to leave
> > that
> > > > > > >>>> broader
> > > > > > >>>>>>>> discussion out of this correctness issue.
> > > > > > >>>>>>>>
> > > > > > >>>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to