Yeah, I'm starting to lean towards removing Redistribute.byKey() from the
public API - because it only makes sense for getting access to per-key
state, and 1) we don't have it yet and 2) runner should insert it
automatically - so there's no use case for it. The "checkpointing keys" use
case should be done via Redistribute.arbitrarily(), I believe.

As for Redistribute.arbitrarily():
In a batch runner, we could describe it as "identity transform, but runner
is required to process the resulting PCollection with downstream transforms
as well as if it had been created from elements via Create.of(), in terms
of ability to parallelize processing and minimize amount of re-executed
work in case of failures" (which is a fancy way of saying "materialize"
without really saying "materialize" :) ).
However, this reduction to Create.of() makes less sense in the unified
model, even though again the goal is to get the best-possible parallelism
and minimized re-execution.

On Tue, Oct 11, 2016 at 10:30 AM Robert Bradshaw
<rober...@google.com.invalid> wrote:

> Actually, Redistribute.perKey seems a bit dangerous, as there's no
> guarantee the partitioning is persisted to any subsequent steps, and
> we don't have a concrete notion of key-partitioned elements outside of
> GBK in the model. I suspect it was only introduced because that's what
> Redistribute.arbitrarily() is built on.
>
> On Tue, Oct 11, 2016 at 10:16 AM, Ben Chambers <bchamb...@apache.org>
> wrote:
> > As Kenn points out, I think the nature of the Redistribute operation is
> to
> > act as a hint (or requirement) to the runner that a certain distribution
> > the elements is desirable. In a perfect this wouldn't be necessary
> because
> > every runner would be able to do exactly the right thing. Looking at the
> > different use cases may be helpful:
> >
> > 1. Redistribute.arbitrarily being used in IO as a fusion break and
> > checkpoint. We could express this as a hint saying that we'd like to
> > persist the PCollection at this point.
> > 2. Redistribute.perKey being used to checkpoint keys in a keyed
> > PCollection. I think this could be the same as the previous hint or a
> > variant thereof.
> > 3. Redistribute.perKey to ensure that the elements are distributed across
> > machines such that all elements with a specific key are on the same
> > machine. This should only be necessary for per-key processing (such as
> > state) and can be added by the runner when necessary (becomes easier once
> > we have a notion of transforms that preserve key-partitioning, etc.)
> >
> > Of these 1 and 2 seem to be the most interesting. The hint can be
> > implemented in various ways -- a transform that represents the hint (and
> > the runner can then implement as it sees fit) or via a method that sets
> > some property on the PCollection, to which the runner could choose to
> apply
> > a transform. I lean towards the former (keeping this as a transform)
> since
> > it fits more naturally into the codebase and doesn't require extending
> > PCollection (something we avoid).
> >
> > What if this was something like: ".apply(Hints.checkpoint())" or
> > ".apply(Hints.break())"? This makes it clearer that this is a hint to the
> > runner and not part of the semantics?
> >
> > On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles <k...@google.com.invalid
> >
> > wrote:
> >
> >> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
> >>
> >> <kirpic...@google.com.invalid> wrote:
> >>
> >>
> >>
> >> > The transform, the way it's implemented, actually does several things
> at
> >>
> >> > the same time and that's why it's tricky to document it.
> >>
> >> >
> >>
> >>
> >>
> >> This thread has actually made me less sure about my thoughts on this
> >>
> >> transform. I do know what the transform is about and I do think we need
> it.
> >>
> >> But I don't know that it can be explained "within the model". Look at
> our
> >>
> >> classic questions about Redistribute.arbitrarily() and
> >> Redistribute.byKey():
> >>
> >>
> >>
> >>  - "what" is it computing? The identity on its input.
> >>
> >>  - "where" is the event time windowing? Same as its input.
> >>
> >>  - "when" is output produced? As fast as reasonable (runner-specific).
> >>
> >>  - "how" are refinements related? Same as its input (I think this might
> >>
> >> actually be incorrect if accumulating fired panes)
> >>
> >>
> >>
> >> These points don't describe any of the real goals of Redistribute. Hence
> >>
> >> describing it in terms of fusion and checkpointing, which are quite
> >>
> >> runner-specific in their (optional) manifestations.
> >>
> >>
> >>
> >> - Introduces a fusion barrier (in runners that have it), making sure
> that
> >>
> >> > the runner can fully parallelize processing the output PCollection
> with
> >>
> >> > DoFn's
> >>
> >> >
> >>
> >>
> >>
> >> Can a runner introduce other fusion barriers whenever it wants? Yes.
> >>
> >> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
> >>
> >> why not?)
> >>
> >>
> >>
> >>
> >>
> >> > - Introduces a fault-tolerance barrier, effectively "checkpointing"
> the
> >>
> >> > input PCollection (again, in runners where it makes sense) and making
> >> sure
> >>
> >> > that processing elements of the output PCollection with a DoFn, if the
> >> DoFn
> >>
> >> > fails, will redo only that processing, but not need to recompute the
> >> input
> >>
> >> > PCollection.
> >>
> >> >
> >>
> >>
> >>
> >> Can a runner introduce a checkpoint whenever appropriate? Yes.
> >>
> >> Can a runner ignore a hint to checkpoint? Yes (if it can still compute
> the
> >>
> >> same result - it may not even conceive of checkpointing in a compatible
> >>
> >> way).
> >>
> >>
> >>
> >> - All of the above and also makes the collection "key-partitioned",
> giving
> >>
> >> > access to per-key state to downstream key-preserving DoFns. However,
> this
> >>
> >> > is also runner-specific, because it's conceivable that a runner might
> not
> >>
> >> > need this "key-partitioned" property (in fact it's best if a runner
> >>
> >> > inserted such a "redistribute by key" automatically if it needs
> it...),
> >> and
> >>
> >> > it currently isn't exposed anyway.
> >>
> >> >
> >>
> >>
> >>
> >> Agreed. The runner should insert the necessary keying wherever needed.
> One
> >>
> >> might say the same for other uses of Redistribute, but in practice hints
> >>
> >> are useful.
> >>
> >>
> >>
> >>
> >>
> >> > Still thinking about the best way to describe this in a way that's
> least
> >>
> >> > confusing to users.
> >>
> >> >
> >>
> >>
> >>
> >> I think it isn't just about users. I don't the transform is quite
> >>
> >> well-defined at the "what the runner must do" level. Here is a question
> I
> >>
> >> am considering: When is it _incorrect_ for a runner to replace a
> >>
> >> Redistribute with an identity transform? I have some thoughts, such as
> >>
> >> committing pseudorandomly generated data, but do you have some other
> ideas?
> >>
> >>
> >>
> >> Kenn
> >>
> >>
>

Reply via email to