I like "Stable" too. I can try to make up other scenarios to try out different vocabulary.
Here are a couple: - redundant processing to mitigate stragglers - duplication in the course of optimizations* This expands the scope of the feature to be not just agreement on the PCollection contents between retries, but between various other execution strategies, etc. Kenn *actually the case where two consumers need to agree on contents is not addressed directly, but I think can be implemented in userland On Wed, Aug 9, 2017 at 10:10 AM, Ben Chambers <bchamb...@apache.org> wrote: > I strongly agree with this proposal. I think moving away from "just insert > a GroupByKey for one of the 3 different reasons you may want it" towards > APIs that allow code to express the requirements they have and the runner > to choose the best way to meet this is a major step forwards in terms of > portability. > > I think "deterministic" may be misleading. The actual contents of the > collection aren't deterministic if upstream computations aren't. The > property we really need is that once an input may have been observed by the > side-effecting code it will never be observed with a different value. > > I would propose something RequiresStableInput, to indicate that the input > must be stable as observed by the function. I could also see something > hinting at the fact we don't recompute the input, such as > RequiresMemoizedInput or RequiresNoRecomputation. > > -- Ben > > P.S For anyone interested other uses of GroupByKey that we may want to > discuss APIs for would be preventing retry across steps (eg., preventing > fusion) and redistributing inputs across workers. > > On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles <k...@google.com.invalid> > wrote: > > > This came up again, so I wanted to push it along by proposing a specific > > API for Java that could have a derived API in Python. I am writing this > > quickly to get something out there, so I welcome suggestions for > revision. > > > > Today a DoFn has a @ProcessElement annotated method with various > automated > > parameters, but most fundamentally this: > > > > @ProcessElement > > public void process(ProcessContext ctx) { > > ctx.element() // to access the current input element > > ctx.output(something) // to write to default output collection > > ctx.output(tag, something) // to write to other output collections > > } > > > > For some time, we have hoped to unpack the context - it is a > > backwards-compatibility pattern made obsolete by the new DoFn design. So > > instead it would look like this: > > > > @ProcessElement > > public void process(Element element, MainOutput mainOutput, ...) { > > element.get() // to access the current input element > > mainOutput.output(something) // to write to the default output > collection > > other.output(something) // to write to other output collection > > } > > > > I've deliberately left out the undecided syntax for side outputs. But it > > would be nice for the tag to be built in to the parameter so it doesn't > > have to be used when calling output(). > > > > One way to enhance this to deterministic input would just be this: > > > > @ProcessElement > > @RequiresDeterministicInput > > public void process(Element element, MainOutput mainOutput, ...) { > > element.get() // to access the current input element > > mainOutput.output(something) // to write to the default output > collection > > other.output(something) // to write to other output collection > > } > > > > There are really a lot of places where we could put an annotation or > change > > a type to indicate that the input PCollection should be > > well-defined/deterministically-replayable. I don't have a really strong > > opinion. > > > > Kenn > > > > On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers > <bchamb...@google.com.invalid > > > > > wrote: > > > > > Allowing an annotation on DoFn's that produce deterministic output > could > > be > > > added in the future, but doesn't seem like a great option. > > > > > > 1. It is a correctness issue to assume a DoFn is deterministic and be > > > wrong, so we would need to assume all transform outputs are > > > non-deterministic unless annotated. Getting this correct is difficult > > (for > > > example, GBK is surprisingly non-deterministic except in specific > cases). > > > > > > 2. It is unlikely to be a major performance improvement, given that any > > > non-deterministic transform prior to a sink (which are most likely to > > > require deterministic input) will cause additional work to be needed. > > > > > > Based on this, it seems like the risk of allowing an annotation is high > > > while the potential for performance improvements is low. The current > > > proposal (not allowing an annotation) makes sense for now, until we can > > > demonstrate that the impact on performance is high in cases that could > be > > > avoided with an annotation (in real-world use). > > > > > > -- Ben > > > > > > On Tue, Mar 21, 2017 at 2:05 PM vikas rk <vikky...@gmail.com> wrote: > > > > > > +1 for the general idea of runners handling it over hard-coded > > > implementation strategy. > > > > > > For the Write transform I believe you are talking about > ApplyShardingKey > > > < > > > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4 > > > c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/ > > > io/Write.java#L304 > > > > > > > which > > > introduces non deterministic behavior when retried? > > > > > > > > > *Let a DoFn declare (mechanism not important right now) that it > > > "requiresdeterministic input"* > > > > > > > > > > > > *Each runner will need a way to induce deterministic input - the > > > obviouschoice being a materialization.* > > > > > > Does this mean that a runner will always materialize (or whatever the > > > strategy is) an input PCollection to this DoFn even though the > > PCollection > > > might have been produced by deterministic transforms? Would it make > sense > > > to also let DoFns declare if they produce non-deterministic output? > > > > > > -Vikas > > > > > > > > > On 21 March 2017 at 13:52, Stephen Sisk <s...@google.com.invalid> > wrote: > > > > > > > Hey Kenn- > > > > > > > > this seems important, but I don't have all the context on what the > > > problem > > > > is. > > > > > > > > Can you explain this sentence "Specifically, there is pseudorandom > data > > > > generated and once it has been observed and used to produce a side > > > effect, > > > > it cannot be regenerated without erroneous results." ? > > > > > > > > Where is the pseudorandom data coming from? Perhaps a concrete > example > > > > would help? > > > > > > > > S > > > > > > > > > > > > On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles > <k...@google.com.invalid > > > > > > > wrote: > > > > > > > > > Problem: > > > > > > > > > > I will drop all nuance and say that the `Write` transform as it > > exists > > > in > > > > > the SDK is incorrect until we add some specification and APIs. We > > can't > > > > > keep shipping an SDK with an unsafe transform in it, and IMO this > > > > certainly > > > > > blocks a stable release. > > > > > > > > > > Specifically, there is pseudorandom data generated and once it has > > been > > > > > observed and used to produce a side effect, it cannot be > regenerated > > > > > without erroneous results. > > > > > > > > > > This generalizes: For some side-effecting user-defined functions, > it > > is > > > > > vital that even across retries/replays they have a consistent view > of > > > the > > > > > contents of their input PCollection, because their effect on the > > > outside > > > > > world cannot be retracted if/when they fail and are retried. Once > the > > > > > runner ensures a consistent view of the input, it is then their own > > > > > responsibility to be idempotent. > > > > > > > > > > Ideally we should specify this requirement for the user-defined > > > function > > > > > without imposing any particular implementation strategy on Beam > > > runners. > > > > > > > > > > Proposal: > > > > > > > > > > 1. Let a DoFn declare (mechanism not important right now) that it > > > > "requires > > > > > deterministic input". > > > > > > > > > > 2. Each runner will need a way to induce deterministic input - the > > > > obvious > > > > > choice being a materialization. > > > > > > > > > > I want to keep the discussion focused, so I'm leaving out any > > > > possibilities > > > > > of taking this further. > > > > > > > > > > Regarding performance: Today places that require this tend to be > > > already > > > > > paying the cost via GroupByKey / Reshuffle operations, since that > > was a > > > > > simple way to induce determinism in batch Dataflow* (doesn't work > for > > > > most > > > > > other runners nor for streaming Dataflow). This change will > replace a > > > > > hard-coded implementation strategy with a requirement that may be > > > > fulfilled > > > > > in the most efficient way available. > > > > > > > > > > Thoughts? > > > > > > > > > > Kenn (w/ lots of consult from colleagues, especially Ben) > > > > > > > > > > * There is some overlap with the reshuffle/redistribute discussion > > > > because > > > > > of this historical situation, but I would like to leave that > broader > > > > > discussion out of this correctness issue. > > > > > > > > > > > > > > >