Yes - I don't think we should try and make any deterministic guarantees
about what is in a bundle. Stability guarantees are per element only.

On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh <tg...@google.com.invalid>
wrote:

> +1 to the annotation-on-ProcessElement approach. ProcessElement is the
> minimum implementation requirement of a DoFn, and should be where the
> processing logic which depends on characteristics of the inputs lie. It's a
> good way of signalling the requirements of the Fn, and letting the runner
> decide.
>
> I have a minor concern that this may not work as expected for users that
> try to batch remote calls in `FinishBundle` - we should make sure we
> document that it is explicitly the input elements that will be replayed,
> and bundles and other operational are still arbitrary.
>
>
>
> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax <re...@google.com.invalid>
> wrote:
>
> > I think deterministic here means deterministically replayable. i.e. no
> > matter how many times the element is retried, it will always be the same.
> >
> > I think we should also allow specifying this on processTimer. This would
> > mean that any keyed state written in a previous processElement must be
> > guaranteed durable before processTimer is called.
> >
> >
> > On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org>
> > wrote:
> >
> > > I strongly agree with this proposal. I think moving away from "just
> > insert
> > > a GroupByKey for one of the 3 different reasons you may want it"
> towards
> > > APIs that allow code to express the requirements they have and the
> runner
> > > to choose the best way to meet this is a major step forwards in terms
> of
> > > portability.
> > >
> > > I think "deterministic" may be misleading. The actual contents of the
> > > collection aren't deterministic if upstream computations aren't. The
> > > property we really need is that once an input may have been observed by
> > the
> > > side-effecting code it will never be observed with a different value.
> > >
> > > I would propose something RequiresStableInput, to indicate that the
> input
> > > must be stable as observed by the function. I could also see something
> > > hinting at the fact we don't recompute the input, such as
> > > RequiresMemoizedInput or RequiresNoRecomputation.
> > >
> > > -- Ben
> > >
> > > P.S For anyone interested other uses of GroupByKey that we may want to
> > > discuss APIs for would be preventing retry across steps (eg.,
> preventing
> > > fusion) and redistributing inputs across workers.
> > >
> > > On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid
> >
> > > wrote:
> > >
> > > > This came up again, so I wanted to push it along by proposing a
> > specific
> > > > API for Java that could have a derived API in Python. I am writing
> this
> > > > quickly to get something out there, so I welcome suggestions for
> > > revision.
> > > >
> > > > Today a DoFn has a @ProcessElement annotated method with various
> > > automated
> > > > parameters, but most fundamentally this:
> > > >
> > > > @ProcessElement
> > > > public void process(ProcessContext ctx) {
> > > >   ctx.element() // to access the current input element
> > > >   ctx.output(something) // to write to default output collection
> > > >   ctx.output(tag, something) // to write to other output collections
> > > > }
> > > >
> > > > For some time, we have hoped to unpack the context - it is a
> > > > backwards-compatibility pattern made obsolete by the new DoFn design.
> > So
> > > > instead it would look like this:
> > > >
> > > > @ProcessElement
> > > > public void process(Element element, MainOutput mainOutput, ...) {
> > > >   element.get() // to access the current input element
> > > >   mainOutput.output(something) // to write to the default output
> > > collection
> > > >   other.output(something) // to write to other output collection
> > > > }
> > > >
> > > > I've deliberately left out the undecided syntax for side outputs. But
> > it
> > > > would be nice for the tag to be built in to the parameter so it
> doesn't
> > > > have to be used when calling output().
> > > >
> > > > One way to enhance this to deterministic input would just be this:
> > > >
> > > > @ProcessElement
> > > > @RequiresDeterministicInput
> > > > public void process(Element element, MainOutput mainOutput, ...) {
> > > >   element.get() // to access the current input element
> > > >   mainOutput.output(something) // to write to the default output
> > > collection
> > > >   other.output(something) // to write to other output collection
> > > > }
> > > >
> > > > There are really a lot of places where we could put an annotation or
> > > change
> > > > a type to indicate that the input PCollection should be
> > > > well-defined/deterministically-replayable. I don't have a really
> > strong
> > > > opinion.
> > > >
> > > > Kenn
> > > >
> > > > On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers
> > > <bchamb...@google.com.invalid
> > > > >
> > > > wrote:
> > > >
> > > > > Allowing an annotation on DoFn's that produce deterministic output
> > > could
> > > > be
> > > > > added in the future, but doesn't seem like a great option.
> > > > >
> > > > > 1. It is a correctness issue to assume a DoFn is deterministic and
> be
> > > > > wrong, so we would need to assume all transform outputs are
> > > > > non-deterministic unless annotated. Getting this correct is
> difficult
> > > > (for
> > > > > example, GBK is surprisingly non-deterministic except in specific
> > > cases).
> > > > >
> > > > > 2. It is unlikely to be a major performance improvement, given that
> > any
> > > > > non-deterministic transform prior to a sink (which are most likely
> to
> > > > > require deterministic input) will cause additional work to be
> needed.
> > > > >
> > > > > Based on this, it seems like the risk of allowing an annotation is
> > high
> > > > > while the potential for performance improvements is low. The
> current
> > > > > proposal (not allowing an annotation) makes sense for now, until we
> > can
> > > > > demonstrate that the impact on performance is high in cases that
> > could
> > > be
> > > > > avoided with an annotation (in real-world use).
> > > > >
> > > > > -- Ben
> > > > >
> > > > > On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com>
> wrote:
> > > > >
> > > > > +1 for the general idea of runners handling it over hard-coded
> > > > > implementation strategy.
> > > > >
> > > > > For the Write transform I believe you are talking about
> > > ApplyShardingKey
> > > > > <
> > > > > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> > > > > c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > > > > io/Write.java#L304
> > > > > >
> > > > > which
> > > > > introduces non deterministic behavior when retried?
> > > > >
> > > > >
> > > > > *Let a DoFn declare (mechanism not important right now) that it
> > > > > "requiresdeterministic input"*
> > > > >
> > > > >
> > > > >
> > > > > *Each runner will need a way to induce deterministic input - the
> > > > > obviouschoice being a materialization.*
> > > > >
> > > > > Does this mean that a runner will always materialize (or whatever
> the
> > > > > strategy is) an input PCollection to this DoFn even though the
> > > > PCollection
> > > > > might have been produced by deterministic transforms? Would it make
> > > sense
> > > > > to also let DoFns declare if they produce non-deterministic output?
> > > > >
> > > > > -Vikas
> > > > >
> > > > >
> > > > > On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid>
> > > wrote:
> > > > >
> > > > > > Hey Kenn-
> > > > > >
> > > > > > this seems important, but I don't have all the context on what
> the
> > > > > problem
> > > > > > is.
> > > > > >
> > > > > > Can you explain this sentence "Specifically, there is
> pseudorandom
> > > data
> > > > > > generated and once it has been observed and used to produce a
> side
> > > > > effect,
> > > > > > it cannot be regenerated without erroneous results." ?
> > > > > >
> > > > > > Where is the pseudorandom data coming from? Perhaps a concrete
> > > example
> > > > > > would help?
> > > > > >
> > > > > > S
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles
> > > <k...@google.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Problem:
> > > > > > >
> > > > > > > I will drop all nuance and say that the `Write` transform as it
> > > > exists
> > > > > in
> > > > > > > the SDK is incorrect until we add some specification and APIs.
> We
> > > > can't
> > > > > > > keep shipping an SDK with an unsafe transform in it, and IMO
> this
> > > > > > certainly
> > > > > > > blocks a stable release.
> > > > > > >
> > > > > > > Specifically, there is pseudorandom data generated and once it
> > has
> > > > been
> > > > > > > observed and used to produce a side effect, it cannot be
> > > regenerated
> > > > > > > without erroneous results.
> > > > > > >
> > > > > > > This generalizes: For some side-effecting user-defined
> functions,
> > > it
> > > > is
> > > > > > > vital that even across retries/replays they have a consistent
> > view
> > > of
> > > > > the
> > > > > > > contents of their input PCollection, because their effect on
> the
> > > > > outside
> > > > > > > world cannot be retracted if/when they fail and are retried.
> Once
> > > the
> > > > > > > runner ensures a consistent view of the input, it is then their
> > own
> > > > > > > responsibility to be idempotent.
> > > > > > >
> > > > > > > Ideally we should specify this requirement for the user-defined
> > > > > function
> > > > > > > without imposing any particular implementation strategy on Beam
> > > > > runners.
> > > > > > >
> > > > > > > Proposal:
> > > > > > >
> > > > > > > 1. Let a DoFn declare (mechanism not important right now) that
> it
> > > > > > "requires
> > > > > > > deterministic input".
> > > > > > >
> > > > > > > 2. Each runner will need a way to induce deterministic input -
> > the
> > > > > > obvious
> > > > > > > choice being a materialization.
> > > > > > >
> > > > > > > I want to keep the discussion focused, so I'm leaving out any
> > > > > > possibilities
> > > > > > > of taking this further.
> > > > > > >
> > > > > > > Regarding performance: Today places that require this tend to
> be
> > > > > already
> > > > > > > paying the cost via GroupByKey / Reshuffle operations, since
> that
> > > > was a
> > > > > > > simple way to induce determinism in batch Dataflow* (doesn't
> work
> > > for
> > > > > > most
> > > > > > > other runners nor for streaming Dataflow). This change will
> > > replace a
> > > > > > > hard-coded implementation strategy with a requirement that may
> be
> > > > > > fulfilled
> > > > > > > in the most efficient way available.
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > > > Kenn (w/ lots of consult from colleagues, especially Ben)
> > > > > > >
> > > > > > > * There is some overlap with the reshuffle/redistribute
> > discussion
> > > > > > because
> > > > > > > of this historical situation, but I would like to leave that
> > > broader
> > > > > > > discussion out of this correctness issue.
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to