Does requires-stable-input only apply to ParDo transforms?

I don't think it would make sense to annotate to composite, because
checkpointing should happen as close to the side-effecting operation as
possible, since upstream transforms within a composite could introduce
non-determinism. So it's the primitive transform that should own the
requirement.

Are there other primitives that should be annotated? 'Combine' is
interesting because it optimized in Dataflow (and perhaps other runners) to
partially apply before a GroupByKey.

On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau <taki...@google.com.invalid>
wrote:

> +1 to the annotation idea, and to having it on processTimer.
>
> -Tyler
>
> On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > +1 to the annotation approach. I outlined how implementing this would
> work
> > in the Flink runner in the Thread about the exactly-once Kafka Sink.
> >
> > > On 9. Aug 2017, at 23:03, Reuven Lax <re...@google.com.INVALID> wrote:
> > >
> > > Yes - I don't think we should try and make any deterministic guarantees
> > > about what is in a bundle. Stability guarantees are per element only.
> > >
> > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid>
> > > wrote:
> > >
> > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
> > >> minimum implementation requirement of a DoFn, and should be where the
> > >> processing logic which depends on characteristics of the inputs lie.
> > It's a
> > >> good way of signalling the requirements of the Fn, and letting the
> > runner
> > >> decide.
> > >>
> > >> I have a minor concern that this may not work as expected for users
> that
> > >> try to batch remote calls in `FinishBundle` - we should make sure we
> > >> document that it is explicitly the input elements that will be
> replayed,
> > >> and bundles and other operational are still arbitrary.
> > >>
> > >>
> > >>
> > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid
> >
> > >> wrote:
> > >>
> > >>> I think deterministic here means deterministically replayable. i.e.
> no
> > >>> matter how many times the element is retried, it will always be the
> > same.
> > >>>
> > >>> I think we should also allow specifying this on processTimer. This
> > would
> > >>> mean that any keyed state written in a previous processElement must
> be
> > >>> guaranteed durable before processTimer is called.
> > >>>
> > >>>
> > >>> On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org>
> > >>> wrote:
> > >>>
> > >>>> I strongly agree with this proposal. I think moving away from "just
> > >>> insert
> > >>>> a GroupByKey for one of the 3 different reasons you may want it"
> > >> towards
> > >>>> APIs that allow code to express the requirements they have and the
> > >> runner
> > >>>> to choose the best way to meet this is a major step forwards in
> terms
> > >> of
> > >>>> portability.
> > >>>>
> > >>>> I think "deterministic" may be misleading. The actual contents of
> the
> > >>>> collection aren't deterministic if upstream computations aren't. The
> > >>>> property we really need is that once an input may have been observed
> > by
> > >>> the
> > >>>> side-effecting code it will never be observed with a different
> value.
> > >>>>
> > >>>> I would propose something RequiresStableInput, to indicate that the
> > >> input
> > >>>> must be stable as observed by the function. I could also see
> something
> > >>>> hinting at the fact we don't recompute the input, such as
> > >>>> RequiresMemoizedInput or RequiresNoRecomputation.
> > >>>>
> > >>>> -- Ben
> > >>>>
> > >>>> P.S For anyone interested other uses of GroupByKey that we may want
> to
> > >>>> discuss APIs for would be preventing retry across steps (eg.,
> > >> preventing
> > >>>> fusion) and redistributing inputs across workers.
> > >>>>
> > >>>> On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles
> <k...@google.com.invalid
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> This came up again, so I wanted to push it along by proposing a
> > >>> specific
> > >>>>> API for Java that could have a derived API in Python. I am writing
> > >> this
> > >>>>> quickly to get something out there, so I welcome suggestions for
> > >>>> revision.
> > >>>>>
> > >>>>> Today a DoFn has a @ProcessElement annotated method with various
> > >>>> automated
> > >>>>> parameters, but most fundamentally this:
> > >>>>>
> > >>>>> @ProcessElement
> > >>>>> public void process(ProcessContext ctx) {
> > >>>>>  ctx.element() // to access the current input element
> > >>>>>  ctx.output(something) // to write to default output collection
> > >>>>>  ctx.output(tag, something) // to write to other output collections
> > >>>>> }
> > >>>>>
> > >>>>> For some time, we have hoped to unpack the context - it is a
> > >>>>> backwards-compatibility pattern made obsolete by the new DoFn
> design.
> > >>> So
> > >>>>> instead it would look like this:
> > >>>>>
> > >>>>> @ProcessElement
> > >>>>> public void process(Element element, MainOutput mainOutput, ...) {
> > >>>>>  element.get() // to access the current input element
> > >>>>>  mainOutput.output(something) // to write to the default output
> > >>>> collection
> > >>>>>  other.output(something) // to write to other output collection
> > >>>>> }
> > >>>>>
> > >>>>> I've deliberately left out the undecided syntax for side outputs.
> But
> > >>> it
> > >>>>> would be nice for the tag to be built in to the parameter so it
> > >> doesn't
> > >>>>> have to be used when calling output().
> > >>>>>
> > >>>>> One way to enhance this to deterministic input would just be this:
> > >>>>>
> > >>>>> @ProcessElement
> > >>>>> @RequiresDeterministicInput
> > >>>>> public void process(Element element, MainOutput mainOutput, ...) {
> > >>>>>  element.get() // to access the current input element
> > >>>>>  mainOutput.output(something) // to write to the default output
> > >>>> collection
> > >>>>>  other.output(something) // to write to other output collection
> > >>>>> }
> > >>>>>
> > >>>>> There are really a lot of places where we could put an annotation
> or
> > >>>> change
> > >>>>> a type to indicate that the input PCollection should be
> > >>>>> well-defined/deterministically-replayable. I don't have a really
> > >>> strong
> > >>>>> opinion.
> > >>>>>
> > >>>>> Kenn
> > >>>>>
> > >>>>> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > >>>> <bchamb...@google.com.invalid
> > >>>>>>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Allowing an annotation on DoFn's that produce deterministic output
> > >>>> could
> > >>>>> be
> > >>>>>> added in the future, but doesn't seem like a great option.
> > >>>>>>
> > >>>>>> 1. It is a correctness issue to assume a DoFn is deterministic and
> > >> be
> > >>>>>> wrong, so we would need to assume all transform outputs are
> > >>>>>> non-deterministic unless annotated. Getting this correct is
> > >> difficult
> > >>>>> (for
> > >>>>>> example, GBK is surprisingly non-deterministic except in specific
> > >>>> cases).
> > >>>>>>
> > >>>>>> 2. It is unlikely to be a major performance improvement, given
> that
> > >>> any
> > >>>>>> non-deterministic transform prior to a sink (which are most likely
> > >> to
> > >>>>>> require deterministic input) will cause additional work to be
> > >> needed.
> > >>>>>>
> > >>>>>> Based on this, it seems like the risk of allowing an annotation is
> > >>> high
> > >>>>>> while the potential for performance improvements is low. The
> > >> current
> > >>>>>> proposal (not allowing an annotation) makes sense for now, until
> we
> > >>> can
> > >>>>>> demonstrate that the impact on performance is high in cases that
> > >>> could
> > >>>> be
> > >>>>>> avoided with an annotation (in real-world use).
> > >>>>>>
> > >>>>>> -- Ben
> > >>>>>>
> > >>>>>> On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com>
> > >> wrote:
> > >>>>>>
> > >>>>>> +1 for the general idea of runners handling it over hard-coded
> > >>>>>> implementation strategy.
> > >>>>>>
> > >>>>>> For the Write transform I believe you are talking about
> > >>>> ApplyShardingKey
> > >>>>>> <
> > >>>>>>
> https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> > >>>>>> c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > >>>>>> io/Write.java#L304
> > >>>>>>>
> > >>>>>> which
> > >>>>>> introduces non deterministic behavior when retried?
> > >>>>>>
> > >>>>>>
> > >>>>>> *Let a DoFn declare (mechanism not important right now) that it
> > >>>>>> "requiresdeterministic input"*
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> *Each runner will need a way to induce deterministic input - the
> > >>>>>> obviouschoice being a materialization.*
> > >>>>>>
> > >>>>>> Does this mean that a runner will always materialize (or whatever
> > >> the
> > >>>>>> strategy is) an input PCollection to this DoFn even though the
> > >>>>> PCollection
> > >>>>>> might have been produced by deterministic transforms? Would it
> make
> > >>>> sense
> > >>>>>> to also let DoFns declare if they produce non-deterministic
> output?
> > >>>>>>
> > >>>>>> -Vikas
> > >>>>>>
> > >>>>>>
> > >>>>>> On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid>
> > >>>> wrote:
> > >>>>>>
> > >>>>>>> Hey Kenn-
> > >>>>>>>
> > >>>>>>> this seems important, but I don't have all the context on what
> > >> the
> > >>>>>> problem
> > >>>>>>> is.
> > >>>>>>>
> > >>>>>>> Can you explain this sentence "Specifically, there is
> > >> pseudorandom
> > >>>> data
> > >>>>>>> generated and once it has been observed and used to produce a
> > >> side
> > >>>>>> effect,
> > >>>>>>> it cannot be regenerated without erroneous results." ?
> > >>>>>>>
> > >>>>>>> Where is the pseudorandom data coming from? Perhaps a concrete
> > >>>> example
> > >>>>>>> would help?
> > >>>>>>>
> > >>>>>>> S
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > >>>> <k...@google.com.invalid
> > >>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Problem:
> > >>>>>>>>
> > >>>>>>>> I will drop all nuance and say that the `Write` transform as it
> > >>>>> exists
> > >>>>>> in
> > >>>>>>>> the SDK is incorrect until we add some specification and APIs.
> > >> We
> > >>>>> can't
> > >>>>>>>> keep shipping an SDK with an unsafe transform in it, and IMO
> > >> this
> > >>>>>>> certainly
> > >>>>>>>> blocks a stable release.
> > >>>>>>>>
> > >>>>>>>> Specifically, there is pseudorandom data generated and once it
> > >>> has
> > >>>>> been
> > >>>>>>>> observed and used to produce a side effect, it cannot be
> > >>>> regenerated
> > >>>>>>>> without erroneous results.
> > >>>>>>>>
> > >>>>>>>> This generalizes: For some side-effecting user-defined
> > >> functions,
> > >>>> it
> > >>>>> is
> > >>>>>>>> vital that even across retries/replays they have a consistent
> > >>> view
> > >>>> of
> > >>>>>> the
> > >>>>>>>> contents of their input PCollection, because their effect on
> > >> the
> > >>>>>> outside
> > >>>>>>>> world cannot be retracted if/when they fail and are retried.
> > >> Once
> > >>>> the
> > >>>>>>>> runner ensures a consistent view of the input, it is then their
> > >>> own
> > >>>>>>>> responsibility to be idempotent.
> > >>>>>>>>
> > >>>>>>>> Ideally we should specify this requirement for the user-defined
> > >>>>>> function
> > >>>>>>>> without imposing any particular implementation strategy on Beam
> > >>>>>> runners.
> > >>>>>>>>
> > >>>>>>>> Proposal:
> > >>>>>>>>
> > >>>>>>>> 1. Let a DoFn declare (mechanism not important right now) that
> > >> it
> > >>>>>>> "requires
> > >>>>>>>> deterministic input".
> > >>>>>>>>
> > >>>>>>>> 2. Each runner will need a way to induce deterministic input -
> > >>> the
> > >>>>>>> obvious
> > >>>>>>>> choice being a materialization.
> > >>>>>>>>
> > >>>>>>>> I want to keep the discussion focused, so I'm leaving out any
> > >>>>>>> possibilities
> > >>>>>>>> of taking this further.
> > >>>>>>>>
> > >>>>>>>> Regarding performance: Today places that require this tend to
> > >> be
> > >>>>>> already
> > >>>>>>>> paying the cost via GroupByKey / Reshuffle operations, since
> > >> that
> > >>>>> was a
> > >>>>>>>> simple way to induce determinism in batch Dataflow* (doesn't
> > >> work
> > >>>> for
> > >>>>>>> most
> > >>>>>>>> other runners nor for streaming Dataflow). This change will
> > >>>> replace a
> > >>>>>>>> hard-coded implementation strategy with a requirement that may
> > >> be
> > >>>>>>> fulfilled
> > >>>>>>>> in the most efficient way available.
> > >>>>>>>>
> > >>>>>>>> Thoughts?
> > >>>>>>>>
> > >>>>>>>> Kenn (w/ lots of consult from colleagues, especially Ben)
> > >>>>>>>>
> > >>>>>>>> * There is some overlap with the reshuffle/redistribute
> > >>> discussion
> > >>>>>>> because
> > >>>>>>>> of this historical situation, but I would like to leave that
> > >>>> broader
> > >>>>>>>> discussion out of this correctness issue.
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> >
>

Reply via email to