So, based on some offline discussion, the problem is more complex. There's several classes of ultimate user needs which are potentially orthogonal, even though the current Reshuffle transform, as implemented by the Dataflow runner, happens to satisfy all of them at the same time:
1. Checkpointing a collection. Suppose you have something like: PCollection<Data> data = something.apply(ParDo.of(new GenerateFn())); data.apply(ParDo.of(new WriteFn())) Suppose GenerateFn is non-deterministic - it can generate different output on the same input element. Such Fn's are not forbidden by the Beam model per se, and are obviously useful (e.g. it can be querying an external API, or it can be pairing each output element with a random key, etc). Beam model guarantees that the PCollection "data" will logically consist of elements produced by *some* sequential execution of GenerateFn on elements of the PCollection "something" - i.e., even if GenerateFn is invoked on the same element multiple times and produces different results, only one of those results will make it into the PCollection. The act of invoking a DoFn, and the DoFn's side effects, are not part of the Beam model - only PCollection contents are. However, these factors can be important. E.g. imagine WriteFn writes data to a third-party service. Suppose it even does so in an idempotent way (e.g. suppose the data has a primary key, and WriteFn inserts or overwrites the row in a database with each primary key) - then, each element on which WriteFn is invoked will be written to the database exactly once. However, "each element on which WriteFn is invoked" is not the same as "each element in the PCollection data" - because, again, the Beam model does not make guarantees about what DoFn's are invoked on. In particular, in case of failures, WriteFn might be applied arbitrarily many times to arbitrarily many different results of GenerateFn. Say, imagine a hypothetical runner that executes the whole pipeline, and on any failure, it re-executes the whole pipeline - over the course of that, even if "data" logically has just 1 element, this element may be produced multiple times and WriteFn might be applied to multiple different versions of this element, and in the example above, it may insert extra rows into the database. Checkpointing can look like this: PCollection<Data> dataCheckpoint = data.apply(Checkpoint.create()); It limits the scope of non-determinism, and guarantees that an immediate consumer of the collection "dataCheckpoint" will be invoked *only* on the committed logical contents of the collection. It may still be invoked multiple times, but on the same element, so it is sufficient to have idempotency of side effects at the level of the consumer. Note: the hypothetical "rerun full pipeline on failure" runner will not be able to implement this transform correctly. Reshuffle is used in this capacity in BigQueryIO: https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2718 2. Preventing a runner "producer-consumer fusion" optimization that would have limited parallelism. Suppose "pc" is a collection with 3 elements, and "GenerateFn" generates 1000 elements for each of them. Then, consider running pc.apply(ParDo.of(new GenerateFn()).apply(ParDo.of(new ProcessFn())). Obviously, GenerateFn can at best be applied in parallel to the 3 elements - no more parallelism can be achieved at this level. However, ideally ProcessFn would be applied to the 3000 elements all in parallel - but some runners implement "fusion" where they collapse a sequence of ParDo's into a single ParDo whose DoFn is the composition of the component DoFn's. In that case, the pipeline will apply the composition of GenerateFn and ProcessFn in parallel to 3 elements, achieving a parallelism of only 3. This behavior, too, is not part of the Beam model. But we need a transform that can disable this optimization - e.g. prevent fusion across a particular PCollection, and guarantee that it is processed with as much parallelism as if it had been read from a perfectly parallelizable location (say, an Avro file): we could call it "Redistribute". pc.apply(ParDo.of(new GenerateFn())).apply(Redistribute.create()).apply(ParDo.of(new ProcessFn())) 3. Preventing a runner "sibling fusion" optimization Suppose FooFn and BarFn are DoFn<Integer, String>. Suppose the pipeline looks like: PCollection<String> foos = ints.apply(ParDo.of(new FooFn())) PCollection<String> bars = ints.apply(ParDo.of(new BarFn())) PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic)); In this case, a runner might perform an optimization, fusing FooFn and BarFn into an Fn that takes an element "x" and produces the concatenation FooFn(x) + BarFn(x). In some cases, this can be undesirable - e.g. suppose that BarFn is much slower to compute than FooFn, but the results of FooFn need to be sent to pubsub as quickly as possible. In that case we don't want to wait for both FooFn and BarFn on a particular element to complete before sending the result of FooFn to pubsub. It seems like a Redistribute transform should be the answer to this as well: PCollection<String> foos = ints.apply(Redistribute.create()).apply(ParDo.of(new FooFn())) PCollection<String> bars = ints.apply(Redistribute.create()).apply(ParDo.of(new BarFn())) PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic)); Here, it would cause FooFn and BarFn to be applied to logically independent collections, whose contents are equivalent to "ints". *** The current Reshuffle transform happens to do all 3 at the same time, and does it in a potentially non-portable way: in particular it relies on the fact that GroupByKey provides all 3 semantics, but the Beam model does not require this (and in fact, in Spark it probably wouldn't provide #1). It is also potentially non-optimal: there can exist better implementations of each of these 3 semantics not involving a global group-by-key. It's not clear yet what to do about all this. Thoughts welcome. On Tue, Oct 11, 2016 at 11:35 AM Kenneth Knowles <k...@google.com.invalid> wrote: > On Tue, Oct 11, 2016 at 10:56 AM Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > > Yeah, I'm starting to lean towards removing Redistribute.byKey() from the > > public API - because it only makes sense for getting access to per-key > > state, and 1) we don't have it yet and 2) runner should insert it > > automatically - so there's no use case for it. > > > +1 to removing Redistribute.byKey() from the public API. > > > > The "checkpointing keys" use > > case should be done via Redistribute.arbitrarily(), I believe. > > > > Actually I think it really does have to be a GroupByKey followed by writing > the groups without breaking them up: > > - With GBK each element must appear in exactly one output group, so you > have to checkpoint or be able to retract groupings (nice easy explanation > from Thomas Groh; any faults in my paraphrase are my own). > > - But GBK followed by "output the elements one by one" actually removes > this property. Now you can replace the whole thing with a no-op and fuse > with downstream and still get exactly once processing according to the > model but not as observed via side effects to external systems. So sinks > should really be doing that, and I'll retract this use case for > Redistribute. > > As for Redistribute.arbitrarily(): > > In a batch runner, we could describe it as "identity transform, but > runner > > is required to process the resulting PCollection with downstream > transforms > > as well as if it had been created from elements via Create.of(), in terms > > of ability to parallelize processing and minimize amount of re-executed > > work in case of failures" (which is a fancy way of saying "materialize" > > without really saying "materialize" :) ). > > > > How can you observe if the runner ignored you? >