So, based on some offline discussion, the problem is more complex. There's
several classes of ultimate user needs which are potentially orthogonal,
even though the current Reshuffle transform, as implemented by the Dataflow
runner, happens to satisfy all of them at the same time:

1. Checkpointing a collection.

Suppose you have something like:

PCollection<Data> data = something.apply(ParDo.of(new GenerateFn()));
data.apply(ParDo.of(new WriteFn()))

Suppose GenerateFn is non-deterministic - it can generate different output
on the same input element.
Such Fn's are not forbidden by the Beam model per se, and are obviously
useful (e.g. it can be querying an external API, or it can be pairing each
output element with a random key, etc).

Beam model guarantees that the PCollection "data" will logically consist of
elements produced by *some* sequential execution of GenerateFn on elements
of the PCollection "something" - i.e., even if GenerateFn is invoked on the
same element multiple times and produces different results, only one of
those results will make it into the PCollection. The act of invoking a
DoFn, and the DoFn's side effects, are not part of the Beam model - only
PCollection contents are.

However, these factors can be important. E.g. imagine WriteFn writes data
to a third-party service. Suppose it even does so in an idempotent way
(e.g. suppose the data has a primary key, and WriteFn inserts or overwrites
the row in a database with each primary key) - then, each element on which
WriteFn is invoked will be written to the database exactly once.

However, "each element on which WriteFn is invoked" is not the same as
"each element in the PCollection data" - because, again, the Beam model
does not make guarantees about what DoFn's are invoked on.
In particular, in case of failures, WriteFn might be applied arbitrarily
many times to arbitrarily many different results of GenerateFn. Say,
imagine a hypothetical runner that executes the whole pipeline, and on any
failure, it re-executes the whole pipeline - over the course of that, even
if "data" logically has just 1 element, this element may be produced
multiple times and WriteFn might be applied to multiple different versions
of this element, and in the example above, it may insert extra rows into
the database.

Checkpointing can look like this:

  PCollection<Data> dataCheckpoint = data.apply(Checkpoint.create());

It limits the scope of non-determinism, and guarantees that an immediate
consumer of the collection "dataCheckpoint" will be invoked *only* on the
committed logical contents of the collection. It may still be invoked
multiple times, but on the same element, so it is sufficient to have
idempotency of side effects at the level of the consumer. Note: the
hypothetical "rerun full pipeline on failure" runner will not be able to
implement this transform correctly.

Reshuffle is used in this capacity in BigQueryIO:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java#L2718

2. Preventing a runner "producer-consumer fusion" optimization that would
have limited parallelism.

Suppose "pc" is a collection with 3 elements, and "GenerateFn" generates
1000 elements for each of them.

Then, consider running pc.apply(ParDo.of(new
GenerateFn()).apply(ParDo.of(new ProcessFn())).

Obviously, GenerateFn can at best be applied in parallel to the 3 elements
- no more parallelism can be achieved at this level.
However, ideally ProcessFn would be applied to the 3000 elements all in
parallel - but some runners implement "fusion" where they collapse a
sequence of ParDo's into a single ParDo whose DoFn is the composition of
the component DoFn's. In that case, the pipeline will apply the composition
of GenerateFn and ProcessFn in parallel to 3 elements, achieving a
parallelism of only 3.

This behavior, too, is not part of the Beam model. But we need a transform
that can disable this optimization - e.g. prevent fusion across a
particular PCollection, and guarantee that it is processed with as much
parallelism as if it had been read from a perfectly parallelizable location
(say, an Avro file): we could call it "Redistribute".

pc.apply(ParDo.of(new
GenerateFn())).apply(Redistribute.create()).apply(ParDo.of(new ProcessFn()))

3. Preventing a runner "sibling fusion" optimization

Suppose FooFn and BarFn are DoFn<Integer, String>.
Suppose the pipeline looks like:

PCollection<String> foos = ints.apply(ParDo.of(new FooFn()))
PCollection<String> bars = ints.apply(ParDo.of(new BarFn()))
PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));

In this case, a runner might perform an optimization, fusing FooFn and
BarFn into an Fn that takes an element "x" and produces the concatenation
FooFn(x) + BarFn(x). In some cases, this can be undesirable - e.g. suppose
that BarFn is much slower to compute than FooFn, but the results of FooFn
need to be sent to pubsub as quickly as possible. In that case we don't
want to wait for both FooFn and BarFn on a particular element to complete
before sending the result of FooFn to pubsub.

It seems like a Redistribute transform should be the answer to this as well:

PCollection<String> foos =
ints.apply(Redistribute.create()).apply(ParDo.of(new FooFn()))
PCollection<String> bars =
ints.apply(Redistribute.create()).apply(ParDo.of(new BarFn()))
PCollectionList.of(foos).and(bars).apply(PubsubIO.Write.to(topic));

Here, it would cause FooFn and BarFn to be applied to logically independent
collections, whose contents are equivalent to "ints".

***

The current Reshuffle transform happens to do all 3 at the same time, and
does it in a potentially non-portable way: in particular it relies on the
fact that GroupByKey provides all 3 semantics, but the Beam model does not
require this (and in fact, in Spark it probably wouldn't provide #1). It is
also potentially non-optimal: there can exist better implementations of
each of these 3 semantics not involving a global group-by-key.

It's not clear yet what to do about all this. Thoughts welcome.

On Tue, Oct 11, 2016 at 11:35 AM Kenneth Knowles <k...@google.com.invalid>
wrote:

> On Tue, Oct 11, 2016 at 10:56 AM Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
>
> > Yeah, I'm starting to lean towards removing Redistribute.byKey() from the
> > public API - because it only makes sense for getting access to per-key
> > state, and 1) we don't have it yet and 2) runner should insert it
> > automatically - so there's no use case for it.
>
>
> +1 to removing Redistribute.byKey() from the public API.
>
>
> > The "checkpointing keys" use
> > case should be done via Redistribute.arbitrarily(), I believe.
> >
>
> Actually I think it really does have to be a GroupByKey followed by writing
> the groups without breaking them up:
>
>  - With GBK each element must appear in exactly one output group, so you
> have to checkpoint or be able to retract groupings (nice easy explanation
> from Thomas Groh; any faults in my paraphrase are my own).
>
>  - But GBK followed by "output the elements one by one" actually removes
> this property. Now you can replace the whole thing with a no-op and fuse
> with downstream and still get exactly once processing according to the
> model but not as observed via side effects to external systems. So sinks
> should really be doing that, and I'll retract this use case for
> Redistribute.
>
> As for Redistribute.arbitrarily():
> > In a batch runner, we could describe it as "identity transform, but
> runner
> > is required to process the resulting PCollection with downstream
> transforms
> > as well as if it had been created from elements via Create.of(), in terms
> > of ability to parallelize processing and minimize amount of re-executed
> > work in case of failures" (which is a fancy way of saying "materialize"
> > without really saying "materialize" :) ).
> >
>
> How can you observe if the runner ignored you?
>

Reply via email to