Hi all,

Please take a look at some notes from a discussion we had about this with a
few folks, and an updated proposal and a couple of demo PRs implementing
the proposal.
http://s.apache.org/context-fn

I hope this proposal is more agreeable.

On Wed, Sep 13, 2017 at 3:46 PM Kenneth Knowles <k...@google.com.invalid>
wrote:

> ValueProvider is global, PCollectionView is per-window, state is
> per-step/key/window, etc.
>
> So my unhappiness increases as we move through that list, adding more and
> more constraints on correct use, none of which are reflected in the API.
> Your description of "its context is an execution of the pipeline" is
> accurate for ValueProvider. The question is not merely "which DoFn will
> need which side inputs" but in which methods the side input is accessed
> (forbidden in every DoFn method other than @ProcessElement and @OnTimer).
>
> As for lambdas being more universal - I agree! But the capabilities of
> ParDo are not. I don't think we should transparently make them available
> anywhere you have a lambda. For example, multiply triggered side inputs
> fundamentally alter the semantics of MapElements and Filter to vary over
> time. The only reason this isn't a showstopper is that multiply triggered
> side inputs have very loose consistency already, and you can write
> nondeterministic predicates and map functions anyhow. If either of those
> were better, we'd want to keep them that way.
>
> Since NewDoFn is somewhat tied to the alternative proposal, and there's the
> point that since lambdas are cross-language we might reconsider
> ProcessContext (aka "pile of mud") style. But this universality - being the
> lowest common denominator across languages - is not a goal. Python already
> is quite different from Java, using | and >> and kwarg side inputs to good
> effect. And those two languages are quite similar. Go will look entirely
> different. For Java, annotation-driven APIs are common and offer important
> advantages for readability, validation, and forward/backward compatibility.
> And incidentally NewDoFn subsumes ProcessContext.
>
> On Wed, Sep 13, 2017 at 2:32 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Thanks!
> >
> > I think most of the issues you point out [validation, scheduling,
> > prefetching] are in the area of wiring. I reiterate that they can be
> solved
> > - both of the methods below will give the runner an answer to the
> low-level
> > question "which DoFn will need which side inputs":
> >
> > 1) Providing withSideInputs() builder methods on transforms that are
> > parameterized by user code. If only some side inputs should be made
> > available to particular bits of user code, provide more detailed
> > withBlahSideInputs() methods - this is up to the transform.
> >
> > 2) Inferring this from something annotation-driven as indicated in the
> > thread, e.g. capturing the PCollectionView in @SideInput-annotated public
> > fields. This can't be done on a lambda, because lambdas don't have fields
> > [so I think method #1 will keep being necessary], but it can be done on
> an
> > anonymous class.
> >
> > As for direct access being misleading: I'm not sure I agree. I think the
> > intuition for PCollectionView.get() is no more wrong than the intuition
> for
> > ValueProvider.get(): the return value is, logically, context-free [more
> > like: its context is an execution of the pipeline], so I have no issue
> with
> > it being accessed implicitly.
> >
> > On Wed, Sep 13, 2017 at 2:05 PM Kenneth Knowles <k...@google.com.invalid>
> > wrote:
> >
> > > I made some comments on
> https://issues.apache.org/jira/browse/BEAM-2950
> > > which was filed to do a similar thing for State. Luke correctly pointed
> > out
> > > that many of the points apply here as well. I said most of the same
> > above,
> > > but I thought I'd pull them out again from that ticket and rephrase to
> > > apply to side inputs:
> > >
> > >  - Direct access at first appears "more intuitive" because to a
> newcomer
> > it
> > > "looks like" normal [captured variable] access. But in fact it is
> nothing
> > > like normal [captured variable] access so this intuition is misleading
> > and
> > > should not be encouraged. So it is actually less readable because your
> > > intuitive reading is wrong.
> > >
> > >  - This design would miss the validation aspect. One way it is
> different
> > > than normal [functional] programming is that there are many places it
> is
> > > illegal to reference [side inputs], such as StartBundle/FinishBundle,
> or
> > > passing to another object. This proposal would turn those into dynamic
> > > failures at best, or in the worst case data corruption (runner fails to
> > > catch illegal access, and permits some thread-global context to leak)
> > >
> > >  - It is actually mandatory that we are always able to detect [side
> > inputs,
> > > or the user has to manually wire them], as it [must be scheduled
> > > differently]
> > >
> > >  - A runner can't automatically prefetch, because it doesn't know which
> > > [side input] is used by which methods.
> > >
> > >  - Magic by mutating stuff into place is just less readable / more
> error
> > > prone.
> > >
> > > State has even more compelling issues and none of the benefits so my
> > +0.75
> > > for side inputs (now I am feeling more like +0.25) is a -1 for state.
> We
> > > should definitely not block one feature on all vaguely similar
> features.
> > >
> > > Kenn
> > >
> > >
> > >
> > > On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > > On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw
> > > > <rober...@google.com.invalid>
> > > > wrote:
> > > >
> > > > > On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
> > > > > <kirpic...@google.com.invalid> wrote:
> > > > > > Hi Robert,
> > > > > >
> > > > > > Given the anticipated usage of this proposal in Java, I'm not
> sure
> > > the
> > > > > > Python approach you quoted is the right one.
> > > > >
> > > > > Perhaps not, but does that mean it would be a Java-ism only or
> would
> > > > > we implement it in Python despite it being worse there?
> > > > >
> > > > I'm not sure, but I don't see why the proposed approach of view.get()
> > > > wouldn't work well, or be harder to implement in Python.
> > > >
> > > >
> > > > >
> > > > > > The main reason: I see how it works with Map/FlatMap, but what
> > about
> > > > > cases
> > > > > > like FileIO.write(), parameterized by several lambdas (element ->
> > > > > > destination, destination -> filename policy, destination ->
> sink),
> > > > where
> > > > > > different lambdas may want to access different side inputs? It
> > feels
> > > > > > excessive to make each of the lambdas take all of the side inputs
> > in
> > > > the
> > > > > > same order; moreover, if the composite transform internally needs
> > to
> > > > pass
> > > > > > some more side inputs to the DoFn's executing these lambdas, it
> > will
> > > > need
> > > > > > to manipulate the argument lists in nontrivial ways to make sure
> it
> > > > > passes
> > > > > > them only the side inputs the user asked for, and in the proper
> > > order.
> > > > >
> > > > > In Python it would be trivial to "slice" the side input arguments
> > > > > across the lambdas in a natural way, but I can see that this would
> be
> > > > > more of a pain in Java, especially as lambdas are unnecessarily
> > > > > crippled during compilation.
> > > > >
> > > > > > Another reason is, I think with Java's type system it's
> impossible
> > to
> > > > > have
> > > > > > a NewDoFn-style API for lambdas, because annotations on lambda
> > > > arguments
> > > > > > are dropped when the lambda is converted to the respective
> > > > single-method
> > > > > > interface - a lambda is subject to a lot more type erasure than
> > > > anonymous
> > > > > > class.
> > > > >
> > > > > Yeah, this is unfortunate. But, as mentioned, side inputs don't
> need
> > > > > to be annotated, just counted. For something like inspecting the
> > > > > window the NewDoFn has a lot of advantages over implicit access
> (and
> > > > > makes it so you can't "forget" to declare your dependency), but I
> do
> > > > > see advantages for the implicit way of doing things for delegating
> to
> > > > > other callables.
> > > > >
> > > > > On the other hand, there is a bit of precedence for this: metrics
> > have
> > > > > the "implicit" api. If we do go this direction for side inputs, we
> > > > > should also consider it for state and side outputs.
> > > > >
> > > > I think Kenn is very strongly against using it for state, whereas I
> > don't
> > > > have an opinion either way because I can't think of a use case for
> > > > accessing state from a lambda - we should probably discuss this
> > > separately,
> > > > with proposed code examples in front of us.
> > > >
> > > > For side outputs, yes, it might be nice to ".add()" to a PCollection,
> > but
> > > > it would require bigger changes - e.g. creating intermediate
> > > PCollection's
> > > > and inserting an implicit Flatten in front of all steps that
> contribute
> > > to
> > > > this PCollection, because a PCollection currently can be produced
> only
> > > by 1
> > > > step. Maybe there's a different way to express implicit side outputs.
> > > > Either way I support the idea of looking for such a way because it
> > would
> > > > simplify use cases such as error handling dead-letter collections.
> > > >
> > > > I guess the bigger point is: do we want to block the discussion of
> > > implicit
> > > > side inputs on making a decision about the implicitness of other
> things
> > > > (side outputs, state, PipelineOptions, window etc). I can see the
> > > argument
> > > > for a "yes, block", but can also see the argument for a "no, don't
> > > block" -
> > > > because this proposal is (as indicated earlier in the thread)
> > > > forward-compatible with annotation-based wiring, because we already
> > have
> > > a
> > > > precedent for implicit access of something via ValueProvider, and
> > because
> > > > of the advantages it offers.
> > > >
> > > > Want to mention another advantage: lambdas are likely to be much
> easier
> > > > than NewDoFn approach to use from non-Java but JVM languages/SDKs
> (e.g.
> > > > Scio), which might have even more type erasure, or might have less
> > > > sophisticated annotation machinery, or NewDoFn-style anonymous
> classes
> > > > might be highly non-idiomatic in them. Lambdas are idiomatic in every
> > > > language that supports lambdas, which these days is basically every
> > > > language. [I might be opening a can of worms here, but I guess you
> can
> > > > consider this an argument against NewDoFn in general - though that's
> > > > certainly outside the scope of this thread].
> > > >
> > > >
> > > > >
> > > > > > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw
> > > > > <rober...@google.com.invalid>
> > > > > > wrote:
> > > > > >
> > > > > >> +1 to reducing the amount of boilerplate for dealing with side
> > > inputs.
> > > > > >>
> > > > > >> I prefer the "NewDoFn" style of side inputs for consistency. The
> > > > > >> primary drawback seems to be lambda's incompatibility with
> > > > > >> annotations. This is solved in Python by letting all the first
> > > > > >> annotated argument of the process method be the main input, and
> > > > > >> subsequent ones be the side input. For example
> > > > > >>
> > > > > >> main_pcoll | beam.Map(
> > > > > >>     lambda main_input_elem, side_input_value: main_input_elem +
> > > > > >> side_input_value,
> > > > > >>     side_input_pvalue)
> > > > > >>
> > > > > >> For multiple side inputs they are mapped positionally (though
> > Python
> > > > > >> has the advantage that arguments can be passed by keyword as
> well
> > to
> > > > > >> enhance readability when there are many of them, and we allow
> that
> > > for
> > > > > >> side inputs). Note that side_input_pvalue is not referenced
> > anywhere
> > > > > >> else, so we don't even have to store it and pass it around (one
> > > > > >> typically writes pvalue.AsList(some_pcoll) inline here). When
> the
> > > > > >> concrete PCollectionView is used to access the value this means
> > that
> > > > > >> it must be passed separately to both the ParDo and the callback
> > > > > >> (unless we can infer it, which I don't think we can do in all
> > > (many?)
> > > > > >> cases).
> > > > > >>
> > > > > >> There's no reason we couldn't do this, or something very
> similar,
> > in
> > > > > >> Java as well.
> > > > > >>
> > > > > >> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax
> > > <re...@google.com.invalid
> > > > >
> > > > > >> wrote:
> > > > > >> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
> > > > > >> > kirpic...@google.com.invalid> wrote:
> > > > > >> >
> > > > > >> >> Hi,
> > > > > >> >>
> > > > > >> >> I agree with these concerns to an extent, however I think the
> > > > > advantage
> > > > > >> of
> > > > > >> >> transparently letting any user code access side inputs,
> > > especially
> > > > > >> >> including lambdas, is so great that we should find a way to
> > > address
> > > > > >> these
> > > > > >> >> concerns within the constraints of the pattern I'm proposing.
> > See
> > > > > more
> > > > > >> >> below.
> > > > > >> >>
> > > > > >> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers
> > > > > >> <bchamb...@google.com.invalid
> > > > > >> >> >
> > > > > >> >> wrote:
> > > > > >> >>
> > > > > >> >> > One possible issue with this is that updating a thread
> local
> > is
> > > > > >> likely to
> > > > > >> >> > be much more expensive than passing an additional argument.
> > > > > >> >>
> > > > > >> >> This is an implementation detail that can be fixed - Luke
> made
> > a
> > > > > >> suggestion
> > > > > >> >> on the PR to set up the side input context once per bundle
> > rather
> > > > > than
> > > > > >> once
> > > > > >> >> per element.
> > > > > >> >>
> > > > > >> >
> > > > > >> > However remember that bundles might be small. Dataflow
> streaming
> > > > > runner
> > > > > >> > creates small bundles by design. The Flink runner creates
> > > > > single-element
> > > > > >> > bundles.
> > > > > >> >
> > > > > >> >
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> > Also, not all
> > > > > >> >> > code called from within the DoFn will necessarily be in the
> > > same
> > > > > >> thread
> > > > > >> >> > (eg., sometimes we create a pool of threads for doing
> work).
> > > > > >> >>
> > > > > >> >> I think we already require that c.output() can not be done
> from
> > > > > multiple
> > > > > >> >> threads; and I don't think we document c.sideInput() to be
> > > > > thread-safe
> > > > > >> - it
> > > > > >> >> may be reasonable to declare that it isn't and has to be
> > accessed
> > > > > from
> > > > > >> the
> > > > > >> >> same thread as the ProcessElement call. If we want to relax
> > this,
> > > > > then
> > > > > >> >> there might be ways to deal with that too, e.g. provide
> > utilities
> > > > for
> > > > > >> the
> > > > > >> >> user to capture the "user code context" and restoring it
> > inside a
> > > > > >> thread.
> > > > > >> >> This would likely be valuable for other purposes, such as
> > making
> > > > > those
> > > > > >> >> extra threads visible to our profiling utilities.
> > > > > >> >>
> > > > > >> >
> > > > > >> > This seems fair, but we should be be very careful about our
> > > > > >> documentation.
> > > > > >> > And +1 to adding utilities to make multi-threaded work easier
> to
> > > > > manage.
> > > > > >> >
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> > It may be
> > > > > >> >> > *more* confusing for this to sometimes work magically and
> > > > sometimes
> > > > > >> fail
> > > > > >> >> > horribly. Also, requiring the PCollectionView to be passed
> to
> > > > user
> > > > > >> code
> > > > > >> >> > that accesses it is nice because it makes *very clear* that
> > the
> > > > > side
> > > > > >> >> input
> > > > > >> >> > needs to be provided from the DoFn to that particular
> > utility.
> > > If
> > > > > it
> > > > > >> is
> > > > > >> >> > accessed via "spooky action at a distance" we lose that
> piece
> > > of
> > > > > >> "free"
> > > > > >> >> > documentation, which may lead to extensive misuse of these
> > > > utility
> > > > > >> >> methods.
> > > > > >> >> >
> > > > > >> >> I'd like to understand this concern better - from this
> > > description
> > > > > it's
> > > > > >> not
> > > > > >> >> clear to me. The pattern I'm proposing is that, when you're
> > > > > authoring a
> > > > > >> >> PTransform that is configured by any user callbacks, then:
> > > > > >> >> - you should provide a builder method .withSideInputs(...)
> > > > > >> >> - you should propagate those side inputs to all your internal
> > > > DoFn's
> > > > > >> that
> > > > > >> >> invoke the user code
> > > > > >> >> - in return the user callbacks will be allowed to access
> those
> > > > > >> particular
> > > > > >> >> side inputs
> > > > > >> >> This seems like a simple enough model to me to understand,
> both
> > > > from
> > > > > a
> > > > > >> >> user's perspective and from a transform author's perspective.
> > > > Steps 1
> > > > > >> and 2
> > > > > >> >> may eventually be automated by annotation analysis or other
> > means
> > > > > (e.g.
> > > > > >> SDK
> > > > > >> >> giving a way to provide given side inputs automatically to
> > > > everything
> > > > > >> >> inside a composite transform rather than to individual
> DoFn's).
> > > > > >> >>
> > > > > >> >>
> > > > > >> >> >
> > > > > >> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
> > > > > >> >> > <kirpic...@google.com.invalid> wrote:
> > > > > >> >> >
> > > > > >> >> > > Hi,
> > > > > >> >> > >
> > > > > >> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles
> > > > > >> <k...@google.com.invalid
> > > > > >> >> >
> > > > > >> >> > > wrote:
> > > > > >> >> > >
> > > > > >> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
> > > > > >> >> > > > kirpic...@google.com.invalid> wrote:
> > > > > >> >> > > >
> > > > > >> >> > > > >
> > > > > >> >> > > > > The differences are:
> > > > > >> >> > > > > - The proposal in the doc allows wiring different
> side
> > > > > inputs to
> > > > > >> >> the
> > > > > >> >> > > same
> > > > > >> >> > > > > Supplier, but I'm not convinced that this is
> important
> > -
> > > > you
> > > > > can
> > > > > >> >> just
> > > > > >> >> > > as
> > > > > >> >> > > > > easily call the constructor of your DoFn passing
> > > different
> > > > > >> >> > > > > PCollectionView's for it to capture.
> > > > > >> >> > > > >
> > > > > >> >> > > >
> > > > > >> >> > > > I disagree with this bit about it being "just as easy".
> > > > Passing
> > > > > >> the
> > > > > >> >> > > needed
> > > > > >> >> > > > PCollectionViews to your constructor (or even having a
> > > > > >> constructor)
> > > > > >> >> is
> > > > > >> >> > a
> > > > > >> >> > > > pain. Every time I have to do it, it adds a ton of
> > > > boilerplate
> > > > > >> that
> > > > > >> >> > feels
> > > > > >> >> > > > like pure noise. To make a DoFn reusable it must be
> made
> > > > into a
> > > > > >> named
> > > > > >> >> > > class
> > > > > >> >> > > > with a constructor, versus inlined with no constructor.
> > > > > >> >> > >
> > > > > >> >> > > Hm, why? You can have the DoFn be an anonymous class
> > > capturing
> > > > > the
> > > > > >> >> > > PCollectionView into a @SideInput field as a closure.
> > > > > >> >> > >
> > > > > >> >> > >
> > > > > >> >> > > > A generous analogy
> > > > > >> >> > > > is is that it is "just" manual closure
> > conversion/currying,
> > > > > >> changing
> > > > > >> >> > > > f(side, main) to f(side)(main). But in practice in Beam
> > the
> > > > > second
> > > > > >> >> one
> > > > > >> >> > > has
> > > > > >> >> > > > much more boilerplate.
> > > > > >> >> > > >
> > > > > >> >> > > > Also, Beam is worse. We present the user with
> > higher-order
> > > > > >> functions,
> > > > > >> >> > > which
> > > > > >> >> > > > is where the actual annoyance comes in. When you want
> to
> > > > > pardo(f)
> > > > > >> you
> > > > > >> >> > > have
> > > > > >> >> > > > to write pardo(f(side))(side, main). Your proposal is
> to
> > > > > support
> > > > > >> >> > > > pardo(f(side))(main) and mine is to support
> > pardo(f)(side,
> > > > > main).
> > > > > >> I
> > > > > >> >> > still
> > > > > >> >> > > > propose that we support both (as they get implemented).
> > If
> > > > you
> > > > > >> buy in
> > > > > >> >> > to
> > > > > >> >> > > my
> > > > > >> >> > > > analogy, then there's decades of precedent and the
> burden
> > > of
> > > > > proof
> > > > > >> >> > falls
> > > > > >> >> > > > heavily on whoever doesn't want to support both.
> > > > > >> >> > > >
> > > > > >> >> > > I see your point. I think the proposal is compatible with
> > > what
> > > > > >> you're
> > > > > >> >> > > suggesting too - in DoFn we could have @SideInput
> > > *parameters*
> > > > of
> > > > > >> type
> > > > > >> >> > > PCollectionView, with the same semantics as a field.
> > > > > >> >> > >
> > > > > >> >> > >
> > > > > >> >> > > >
> > > > > >> >> > > > - My proposal allows getting rid of .withSideInputs()
> > > > entirely,
> > > > > >> >> because
> > > > > >> >> > > the
> > > > > >> >> > > > > DoFn captures the PCollectionView so you don't need
> to
> > > > > specify
> > > > > >> it
> > > > > >> >> > > > > explicitly for wiring.
> > > > > >> >> > > > >
> > > > > >> >> > > >
> > > > > >> >> > > > I've decided to change to full +1 (whatever that means
> > > > > compared to
> > > > > >> >> 0.75
> > > > > >> >> > > :-)
> > > > > >> >> > > > to adding support for @SideInput fields, because the
> > > benefits
> > > > > >> >> outweigh
> > > > > >> >> > > this
> > > > > >> >> > > > failure mode:
> > > > > >> >> > > >
> > > > > >> >> > > > new DoFn {
> > > > > >> >> > > >   // forgot the annotation
> > > > > >> >> > > >   private final PCollectionView whatever;
> > > > > >> >> > > >
> > > > > >> >> > > >   @ProcessElement public void process(...) {
> > > > > >> >> > > >     whatever.get(); // crash during execution
> > > > > >> >> > > >   }
> > > > > >> >> > > > }
> > > > > >> >> > > >
> > > > > >> >> > > > But ideas to mitigate that would be cool.
> > > > > >> >> > >
> > > > > >> >> > > Hm, can't think of anything less hacky than "prohibit
> > having
> > > > > fields
> > > > > >> of
> > > > > >> >> > type
> > > > > >> >> > > PCollectionView that are not public, final, and annotated
> > > with
> > > > > >> >> > @SideInput"
> > > > > >> >> > > - not sure we'd want to go down this road. I suppose a
> good
> > > > error
> > > > > >> >> message
> > > > > >> >> > > in .get() would be sufficient, saying "Did you forget to
> > > > specify
> > > > > a
> > > > > >> >> > > requirement for this side input via .withSideInputs() or
> by
> > > > > >> annotating
> > > > > >> >> > the
> > > > > >> >> > > field as @SideInput" or something like that.
> > > > > >> >> > >
> > > > > >> >> > > >
> > > > > >> >> > >
> > > > > >> >> > >
> > > > > >> >> > > > Kenn
> > > > > >> >> > > >
> > > > > >> >> > > >
> > > > > >> >> > > > >
> > > > > >> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik
> > > > > >> >> <lc...@google.com.invalid
> > > > > >> >> > >
> > > > > >> >> > > > > wrote:
> > > > > >> >> > > > >
> > > > > >> >> > > > > > My concern with the proposal is not the specifics
> of
> > > how
> > > > it
> > > > > >> will
> > > > > >> >> > work
> > > > > >> >> > > > and
> > > > > >> >> > > > > > more about it being yet another way on how our API
> is
> > > to
> > > > be
> > > > > >> used
> > > > > >> >> > even
> > > > > >> >> > > > > > though we have a proposal [1] of an API style we
> were
> > > > > working
> > > > > >> >> > towards
> > > > > >> >> > > > in
> > > > > >> >> > > > > > Java and Python. I would rather re-open that
> > discussion
> > > > now
> > > > > >> about
> > > > > >> >> > > what
> > > > > >> >> > > > we
> > > > > >> >> > > > > > want that API to look like for our major features
> and
> > > > work
> > > > > >> >> towards
> > > > > >> >> > > > > > consistency (or not if there is a strong argument
> as
> > to
> > > > why
> > > > > >> some
> > > > > >> >> > > > feature
> > > > > >> >> > > > > > should have a different style).
> > > > > >> >> > > > > >
> > > > > >> >> > > > > > 1: https://s.apache.org/a-new-dofn
> > > > > >> >> > > > > >
> > > > > >> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles
> > > > > >> >> > > > <k...@google.com.invalid
> > > > > >> >> > > > > >
> > > > > >> >> > > > > > wrote:
> > > > > >> >> > > > > >
> > > > > >> >> > > > > > > +0.75 because I'd like to bring up invalid
> > pipelines.
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > I had proposed side inputs as parameters to DoFn
> in
> > > > > >> >> > > > > > > https://s.apache.org/a-new-dofn (specifically at
> > > [1])
> > > > so
> > > > > >> the
> > > > > >> >> > only
> > > > > >> >> > > > > place
> > > > > >> >> > > > > > > they are specified is in the graph construction,
> > > making
> > > > > the
> > > > > >> >> DoFn
> > > > > >> >> > > more
> > > > > >> >> > > > > > > reusable and errors impossible. I've actually
> been
> > > > > noodling
> > > > > >> my
> > > > > >> >> > way
> > > > > >> >> > > > > > towards
> > > > > >> >> > > > > > > this in a branch :-)
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > Eugene's proposal is a sort of converse, where
> the
> > > side
> > > > > >> inputs
> > > > > >> >> > are
> > > > > >> >> > > > > values
> > > > > >> >> > > > > > > captured in the closure and not parameters, yet
> the
> > > > only
> > > > > >> place
> > > > > >> >> > they
> > > > > >> >> > > > are
> > > > > >> >> > > > > > > specified is in the DoFn.
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > I see no conflict between these two. It is very
> > > natural
> > > > > to
> > > > > >> have
> > > > > >> >> > > both
> > > > > >> >> > > > > the
> > > > > >> >> > > > > > > capability to accept parameters and the ability
> to
> > > > > capture
> > > > > >> >> > > variables
> > > > > >> >> > > > in
> > > > > >> >> > > > > > the
> > > > > >> >> > > > > > > closure. Supporting both is totally standard in
> > > > > up-to-date
> > > > > >> >> > > > programming
> > > > > >> >> > > > > > > languages.
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > Today we have the worse of both worlds:
> > > PCollectionView
> > > > > >> behaves
> > > > > >> >> > as
> > > > > >> >> > > > > > > something captured in the closure/constructor,
> but
> > > must
> > > > > >> still
> > > > > >> >> be
> > > > > >> >> > > > > > explicitly
> > > > > >> >> > > > > > > wired up.
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > But if I use PCollectionView.get() and have not
> > wired
> > > > it
> > > > > up
> > > > > >> in
> > > > > >> >> > any
> > > > > >> >> > > > way,
> > > > > >> >> > > > > > > what happens? Just like today, you can try to
> > > > > >> .sideInput(...) a
> > > > > >> >> > > thing
> > > > > >> >> > > > > > that
> > > > > >> >> > > > > > > is not available. With side inputs as parameters,
> > > this
> > > > is
> > > > > >> not
> > > > > >> >> > > > possible.
> > > > > >> >> > > > > > If
> > > > > >> >> > > > > > > you want to treat them as captured in a closure,
> > > while
> > > > > >> avoiding
> > > > > >> >> > > > errors,
> > > > > >> >> > > > > > it
> > > > > >> >> > > > > > > seems like you might need to do some low-level
> > magic,
> > > > > like
> > > > > >> the
> > > > > >> >> > > > > > > serialization-based detection that Luke has
> > suggested
> > > > > before
> > > > > >> >> > (there
> > > > > >> >> > > > are
> > > > > >> >> > > > > > > known downsides that we haven't explored, like
> > > > > overcapture).
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > Kenn
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > [1]
> > > > > >> >> > > > > > >
> > > > > >> >> > https://docs.google.com/document/d/
> > > > 1ClmQ6LqdnfseRzeSw3SL68DAO1f8j
> > > > > >> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene
> Kirpichov <
> > > > > >> >> > > > > > > kirpic...@google.com.invalid> wrote:
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > > > > Hm, I guess you're right - for outputs it could
> > be
> > > > > indeed
> > > > > >> >> quite
> > > > > >> >> > > > > > valuable
> > > > > >> >> > > > > > > to
> > > > > >> >> > > > > > > > output to them without plumbing (e.g.
> outputting
> > > > > errors).
> > > > > >> >> Could
> > > > > >> >> > > be
> > > > > >> >> > > > > done
> > > > > >> >> > > > > > > > perhaps via TupleTag.output()? (assuming the
> same
> > > > > TupleTag
> > > > > >> >> can
> > > > > >> >> > > not
> > > > > >> >> > > > be
> > > > > >> >> > > > > > > > reused to tag multiple PCollection's)
> > > > > >> >> > > > > > > >
> > > > > >> >> > > > > > > > For now I sent a PR for side input support
> > > > > >> >> > > > > > > > https://github.com/apache/beam/pull/3814 .
> > > > > >> >> > > > > > > >
> > > > > >> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik
> > > > > >> >> > > > <lc...@google.com.invalid
> > > > > >> >> > > > > >
> > > > > >> >> > > > > > > > wrote:
> > > > > >> >> > > > > > > >
> > > > > >> >> > > > > > > > > I disagree, state may not care where it is
> used
> > > as
> > > > > well
> > > > > >> >> > since a
> > > > > >> >> > > > > > person
> > > > > >> >> > > > > > > > may
> > > > > >> >> > > > > > > > > call a function which needs to store/retrieve
> > > state
> > > > > and
> > > > > >> >> > instead
> > > > > >> >> > > > of
> > > > > >> >> > > > > > > having
> > > > > >> >> > > > > > > > > the DoFn declare the StateSpec and then pass
> in
> > > the
> > > > > >> state
> > > > > >> >> > > > > > > implementation
> > > > > >> >> > > > > > > > > down into the function everywhere. Similarly
> > for
> > > > > >> outputs,
> > > > > >> >> the
> > > > > >> >> > > > > > internal
> > > > > >> >> > > > > > > > > functions could take the TupleTag and request
> > an
> > > > > output
> > > > > >> >> > manager
> > > > > >> >> > > > or
> > > > > >> >> > > > > > take
> > > > > >> >> > > > > > > > an
> > > > > >> >> > > > > > > > > "output" reference which give functions the
> > > ability
> > > > > to
> > > > > >> >> > produce
> > > > > >> >> > > > > output
> > > > > >> >> > > > > > > > > directly without needing to pass everything
> > that
> > > is
> > > > > >> needed
> > > > > >> >> to
> > > > > >> >> > > be
> > > > > >> >> > > > > > output
> > > > > >> >> > > > > > > > > back to the caller.
> > > > > >> >> > > > > > > > >
> > > > > >> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene
> > Kirpichov
> > > <
> > > > > >> >> > > > > > > > > kirpic...@google.com.invalid> wrote:
> > > > > >> >> > > > > > > > >
> > > > > >> >> > > > > > > > > > Hm, I think of these things (state, side
> > > outputs
> > > > > >> etc.),
> > > > > >> >> > only
> > > > > >> >> > > > side
> > > > > >> >> > > > > > > > inputs
> > > > > >> >> > > > > > > > > > make sense to access in arbitrary user
> > > callbacks
> > > > > >> without
> > > > > >> >> > > > explicit
> > > > > >> >> > > > > > > > > knowledge
> > > > > >> >> > > > > > > > > > of the surrounding transform - so only side
> > > > inputs
> > > > > >> can be
> > > > > >> >> > > > > implicit
> > > > > >> >> > > > > > > like
> > > > > >> >> > > > > > > > > > this.
> > > > > >> >> > > > > > > > > >
> > > > > >> >> > > > > > > > > > Ultimately we'll probably end up removing
> > > > > >> ProcessContext,
> > > > > >> >> > and
> > > > > >> >> > > > > > keeping
> > > > > >> >> > > > > > > > > only
> > > > > >> >> > > > > > > > > > annotations (on fields / methods /
> > parameters).
> > > > In
> > > > > >> that
> > > > > >> >> > > world,
> > > > > >> >> > > > a
> > > > > >> >> > > > > > > field
> > > > > >> >> > > > > > > > > > annotation could be used (like per my
> > previous
> > > > > email)
> > > > > >> to
> > > > > >> >> > > > > statically
> > > > > >> >> > > > > > > > > specify
> > > > > >> >> > > > > > > > > > which side inputs will be needed - while
> the
> > > > value
> > > > > >> could
> > > > > >> >> > > still
> > > > > >> >> > > > be
> > > > > >> >> > > > > > > > > accessed
> > > > > >> >> > > > > > > > > > via .get(), just like state cells are
> > accessed
> > > > via
> > > > > >> >> .read()
> > > > > >> >> > > and
> > > > > >> >> > > > > > > > .write():
> > > > > >> >> > > > > > > > > > i.e., #get() is not a new method of access.
> > > > > >> >> > > > > > > > > >
> > > > > >> >> > > > > > > > > > Overall, it seems like I should proceed
> with
> > > the
> > > > > >> idea. I
> > > > > >> >> > > filed
> > > > > >> >> > > > > > > > > >
> > > https://issues.apache.org/jira/browse/BEAM-2844.
> > > > > >> >> > > > > > > > > >
> > > > > >> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik
> > > > > >> >> > > > > > <lc...@google.com.invalid
> > > > > >> >> > > > > > > >
> > > > > >> >> > > > > > > > > > wrote:
> > > > > >> >> > > > > > > > > >
> > > > > >> >> > > > > > > > > > > For API consistency reasons, it would be
> > good
> > > > if
> > > > > we
> > > > > >> did
> > > > > >> >> > > this
> > > > > >> >> > > > > > > > > holistically
> > > > > >> >> > > > > > > > > > > and expanded this approach to state, side
> > > > > outputs,
> > > > > >> ...
> > > > > >> >> so
> > > > > >> >> > > > that
> > > > > >> >> > > > > a
> > > > > >> >> > > > > > > > person
> > > > > >> >> > > > > > > > > > can
> > > > > >> >> > > > > > > > > > > always call Something.get() to return
> > > something
> > > > > that
> > > > > >> >> they
> > > > > >> >> > > can
> > > > > >> >> > > > > > > access
> > > > > >> >> > > > > > > > > > > implementation wise. It will be confusing
> > for
> > > > our
> > > > > >> users
> > > > > >> >> > to
> > > > > >> >> > > > have
> > > > > >> >> > > > > > > many
> > > > > >> >> > > > > > > > > > > variations in our style of how all these
> > > > concepts
> > > > > >> are
> > > > > >> >> > used
> > > > > >> >> > > > > > > > > > (ProcessContext
> > > > > >> >> > > > > > > > > > > / Annotations / #get())
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene
> > > > Kirpichov
> > > > > <
> > > > > >> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote:
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > Also, I think my approach is compatible
> > > with
> > > > > >> >> > annotations
> > > > > >> >> > > > and
> > > > > >> >> > > > > > > future
> > > > > >> >> > > > > > > > > > > removal
> > > > > >> >> > > > > > > > > > > > of .withSideInputs if we annotate a
> > field:
> > > > > >> >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...;
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > class MyDoFn {
> > > > > >> >> > > > > > > > > > > >   @SideInput
> > > > > >> >> > > > > > > > > > > >   PCollectionView<Foo> foo = foo;
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > >   ...foo.get()...
> > > > > >> >> > > > > > > > > > > > }
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > We can extract the accessed views from
> > the
> > > > DoFn
> > > > > >> >> > instance
> > > > > >> >> > > > > using
> > > > > >> >> > > > > > > > > > > reflection.
> > > > > >> >> > > > > > > > > > > > Still not compatible with lambdas, but
> > > > > compatible
> > > > > >> >> > > > > automatically
> > > > > >> >> > > > > > > > with
> > > > > >> >> > > > > > > > > > all
> > > > > >> >> > > > > > > > > > > > anonymous classes.
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene
> > > > Kirpichov <
> > > > > >> >> > > > > > > > kirpic...@google.com>
> > > > > >> >> > > > > > > > > > > > wrote:
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > > Hi Luke,
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > > I know this (annotations) is the
> > pattern
> > > we
> > > > > were
> > > > > >> >> > > > > considering
> > > > > >> >> > > > > > > for
> > > > > >> >> > > > > > > > > side
> > > > > >> >> > > > > > > > > > > > > inputs, but I no longer think it is
> the
> > > > best
> > > > > >> way to
> > > > > >> >> > > > access
> > > > > >> >> > > > > > > them.
> > > > > >> >> > > > > > > > > > > > > Annotations help getting rid of the
> > > > > >> >> .withSideInputs()
> > > > > >> >> > > > call,
> > > > > >> >> > > > > > but
> > > > > >> >> > > > > > > > > this
> > > > > >> >> > > > > > > > > > is
> > > > > >> >> > > > > > > > > > > > > where their advantage ends.
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > > The advantages of the proposed
> approach
> > > are
> > > > > >> that it
> > > > > >> >> > > > > > > automatically
> > > > > >> >> > > > > > > > > > works
> > > > > >> >> > > > > > > > > > > > > with all existing callback or lambda
> > > code.
> > > > No
> > > > > >> need
> > > > > >> >> to
> > > > > >> >> > > > > further
> > > > > >> >> > > > > > > > > develop
> > > > > >> >> > > > > > > > > > > the
> > > > > >> >> > > > > > > > > > > > > reflection machinery to support side
> > > input
> > > > > >> >> > annotations
> > > > > >> >> > > -
> > > > > >> >> > > > > and
> > > > > >> >> > > > > > > > > > especially
> > > > > >> >> > > > > > > > > > > > to
> > > > > >> >> > > > > > > > > > > > > support arbitrary user interfaces, no
> > > need
> > > > to
> > > > > >> >> change
> > > > > >> >> > > > > existing
> > > > > >> >> > > > > > > > > > > transforms,
> > > > > >> >> > > > > > > > > > > > > no need for transform authors to even
> > > know
> > > > > that
> > > > > >> the
> > > > > >> >> > > > > machinery
> > > > > >> >> > > > > > > > > exists
> > > > > >> >> > > > > > > > > > to
> > > > > >> >> > > > > > > > > > > > > make side inputs usable in their
> > > transforms
> > > > > >> (and no
> > > > > >> >> > > need
> > > > > >> >> > > > > for
> > > > > >> >> > > > > > > > > authors
> > > > > >> >> > > > > > > > > > to
> > > > > >> >> > > > > > > > > > > > > think about whether or not they
> should
> > > > > support
> > > > > >> side
> > > > > >> >> > > > > inputs).
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > > Moreover, like Reuven says,
> annotations
> > > > don't
> > > > > >> work
> > > > > >> >> > with
> > > > > >> >> > > > > > lambdas
> > > > > >> >> > > > > > > > at
> > > > > >> >> > > > > > > > > > all:
> > > > > >> >> > > > > > > > > > > > > creating a lambda with a flexible set
> > of
> > > > > >> annotation
> > > > > >> >> > > > > arguments
> > > > > >> >> > > > > > > > > appears
> > > > > >> >> > > > > > > > > > > to
> > > > > >> >> > > > > > > > > > > > be
> > > > > >> >> > > > > > > > > > > > > currently impossible, and even
> > capturing
> > > > the
> > > > > >> >> > > annotations
> > > > > >> >> > > > on
> > > > > >> >> > > > > > > > > arguments
> > > > > >> >> > > > > > > > > > > of
> > > > > >> >> > > > > > > > > > > > a
> > > > > >> >> > > > > > > > > > > > > lambda is I believe also impossible
> > > because
> > > > > the
> > > > > >> >> Java
> > > > > >> >> > > > > compiler
> > > > > >> >> > > > > > > > drops
> > > > > >> >> > > > > > > > > > > them
> > > > > >> >> > > > > > > > > > > > in
> > > > > >> >> > > > > > > > > > > > > the generated class or method handle.
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz
> > > Cwik
> > > > > >> >> > > > > > > > > <lc...@google.com.invalid
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > > wrote:
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > >> I believe we should follow the
> pattern
> > > > that
> > > > > >> state
> > > > > >> >> > uses
> > > > > >> >> > > > and
> > > > > >> >> > > > > > > add a
> > > > > >> >> > > > > > > > > > type
> > > > > >> >> > > > > > > > > > > > >> annotation to link the side input
> > > > > definition to
> > > > > >> >> its
> > > > > >> >> > > > usage
> > > > > >> >> > > > > > > > > directly.
> > > > > >> >> > > > > > > > > > > This
> > > > > >> >> > > > > > > > > > > > >> would allow us to know that the side
> > > input
> > > > > was
> > > > > >> >> > > > definitely
> > > > > >> >> > > > > > > being
> > > > > >> >> > > > > > > > > > > accessed
> > > > > >> >> > > > > > > > > > > > >> and perform validation during graph
> > > > > >> construction
> > > > > >> >> for
> > > > > >> >> > > any
> > > > > >> >> > > > > > used
> > > > > >> >> > > > > > > > but
> > > > > >> >> > > > > > > > > > > > >> unspecified side inputs.
> > > > > >> >> > > > > > > > > > > > >>
> > > > > >> >> > > > > > > > > > > > >> Code snippet:
> > > > > >> >> > > > > > > > > > > > >> final PCollectionView<String> foo =
> > > > > >> >> > > > > > pipeline.apply("fooName",
> > > > > >> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.<
> > > > > >> >> String>asSingleton());
> > > > > >> >> > > > > > > > > > > > >> PCollection<String> output =
> pipeline
> > > > > >> >> > > > > > > > > > > > >>     .apply(Create.of(1, 2, 3))
> > > > > >> >> > > > > > > > > > > > >>     .apply(MapElements.via(
> > > > > >> >> > > > > > > > > > > > >>         new SimpleFunction<Integer,
> > > > > String>() {
> > > > > >> >> > > > > > > > > > > > >>           @Override
> > > > > >> >> > > > > > > > > > > > >>           public String
> apply(Integer
> > > > input,
> > > > > >> >> > > > > > > > @SideInput("fooName")
> > > > > >> >> > > > > > > > > > > > String
> > > > > >> >> > > > > > > > > > > > >> fooValue) {
> > > > > >> >> > > > > > > > > > > > >>             return fooValue + " " +
> > > input;
> > > > > >> >> > > > > > > > > > > > >>           }
> > > > > >> >> > > > > > > > > > > > >>         }).withSideInputs(foo));*
> > > > > >> >> > > > > > > > > > > > >>
> > > > > >> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM,
> Eugene
> > > > > >> Kirpichov <
> > > > > >> >> > > > > > > > > > > > >> kirpic...@google.com.invalid>
> wrote:
> > > > > >> >> > > > > > > > > > > > >>
> > > > > >> >> > > > > > > > > > > > >> > Sure, here's how a modified
> > (passing)
> > > > > >> >> MapElements
> > > > > >> >> > > unit
> > > > > >> >> > > > > > test
> > > > > >> >> > > > > > > > > looks
> > > > > >> >> > > > > > > > > > > > like,
> > > > > >> >> > > > > > > > > > > > >> > with usage of side inputs:
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > > > > > > > > > > >> >   @Test
> > > > > >> >> > > > > > > > > > > > >> >   @Category(NeedsRunner.class)
> > > > > >> >> > > > > > > > > > > > >> >   public void
> > > > testMapBasicWithSideInput()
> > > > > >> throws
> > > > > >> >> > > > > > Exception {
> > > > > >> >> > > > > > > > > > > > >> >    * final PCollectionView<String>
> > foo
> > > > =*
> > > > > >> >> > > > > > > > > > > > >> > *        pipeline.apply("foo",
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > Create.of("foo")).apply(View.<String>asSingleton());*
> > > > > >> >> > > > > > > > > > > > >> >     PCollection<String> output =
> > > > pipeline
> > > > > >> >> > > > > > > > > > > > >> >         .apply(Create.of(1, 2, 3))
> > > > > >> >> > > > > > > > > > > > >> >         .apply(MapElements.via(
> > > > > >> >> > > > > > > > > > > > >> >             new
> > > SimpleFunction<Integer,
> > > > > >> >> String>()
> > > > > >> >> > {
> > > > > >> >> > > > > > > > > > > > >> >               @Override
> > > > > >> >> > > > > > > > > > > > >> >               public String
> > > > apply(Integer
> > > > > >> >> input) {
> > > > > >> >> > > > > > > > > > > > >> >                 return* foo.get()
> > *+ "
> > > > " +
> > > > > >> >> input;
> > > > > >> >> > > > > > > > > > > > >> >               }
> > > > > >> >> > > > > > > > > > > > >> >             })
> > > > > >> >> > > > > > > > > > > > >> >         *.withSideInputs(foo));*
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > > > > > > > > > > >> >     PAssert.that(output).
> > > > > >> >> containsInAnyOrder("foo
> > > > > >> >> > 1",
> > > > > >> >> > > > > "foo
> > > > > >> >> > > > > > > 2",
> > > > > >> >> > > > > > > > > > "foo
> > > > > >> >> > > > > > > > > > > > 3");
> > > > > >> >> > > > > > > > > > > > >> >     pipeline.run();
> > > > > >> >> > > > > > > > > > > > >> >   }
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM
> > Reuven
> > > > Lax
> > > > > >> >> > > > > > > > > > <re...@google.com.invalid
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > > >> > wrote:
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > > > > > > > > > > >> > > Can you provide a code snippet
> > > showing
> > > > > how
> > > > > >> >> this
> > > > > >> >> > > > would
> > > > > >> >> > > > > > > look?
> > > > > >> >> > > > > > > > > > > > >> > >
> > > > > >> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM,
> > > Eugene
> > > > > >> >> > Kirpichov <
> > > > > >> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid>
> > > wrote:
> > > > > >> >> > > > > > > > > > > > >> > >
> > > > > >> >> > > > > > > > > > > > >> > > > TL;DR Introduce method
> > > > > >> >> PCollectionView.get(),
> > > > > >> >> > > > > > > implemented
> > > > > >> >> > > > > > > > > as:
> > > > > >> >> > > > > > > > > > > get
> > > > > >> >> > > > > > > > > > > > >> > > > thread-local ProcessContext
> and
> > do
> > > > > >> >> > > > > c.sideInput(this).
> > > > > >> >> > > > > > > As a
> > > > > >> >> > > > > > > > > > > result,
> > > > > >> >> > > > > > > > > > > > >> any
> > > > > >> >> > > > > > > > > > > > >> > > user
> > > > > >> >> > > > > > > > > > > > >> > > > lambdas such as MapElements
> can
> > > use
> > > > > side
> > > > > >> >> > inputs.
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > Quite a few transforms have
> > > > user-code
> > > > > >> >> > callbacks
> > > > > >> >> > > or
> > > > > >> >> > > > > > > > lambdas:
> > > > > >> >> > > > > > > > > > > ParDo
> > > > > >> >> > > > > > > > > > > > >> > (DoFn),
> > > > > >> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the
> > > > > >> DynamicDestinations
> > > > > >> >> > > > classes
> > > > > >> >> > > > > > in
> > > > > >> >> > > > > > > > > > various
> > > > > >> >> > > > > > > > > > > > IOs,
> > > > > >> >> > > > > > > > > > > > >> > > > combine fns, the PollFn
> callback
> > > in
> > > > > >> Watch,
> > > > > >> >> > etc.
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > Of these, only DoFn and
> > CombineFn
> > > > have
> > > > > >> >> > built-in
> > > > > >> >> > > > > > support
> > > > > >> >> > > > > > > > for
> > > > > >> >> > > > > > > > > > side
> > > > > >> >> > > > > > > > > > > > >> > inputs;
> > > > > >> >> > > > > > > > > > > > >> > > > for DynamicDestinations it is
> > > > plumbed
> > > > > >> >> > > explicitly;
> > > > > >> >> > > > > > others
> > > > > >> >> > > > > > > > > don't
> > > > > >> >> > > > > > > > > > > > have
> > > > > >> >> > > > > > > > > > > > >> > > access
> > > > > >> >> > > > > > > > > > > > >> > > > (e.g. you can't access side
> > inputs
> > > > > from
> > > > > >> >> > > > > > > > Map/FlatMapElements
> > > > > >> >> > > > > > > > > > > > because
> > > > > >> >> > > > > > > > > > > > >> > they
> > > > > >> >> > > > > > > > > > > > >> > > > don't have a ProcessContext or
> > any
> > > > > >> context
> > > > > >> >> for
> > > > > >> >> > > > that
> > > > > >> >> > > > > > > > matter).
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > I think it's important to
> solve
> > > > this,
> > > > > >> >> > especially
> > > > > >> >> > > > as
> > > > > >> >> > > > > > > Java 8
> > > > > >> >> > > > > > > > > > > becomes
> > > > > >> >> > > > > > > > > > > > >> > > people's
> > > > > >> >> > > > > > > > > > > > >> > > > default choice: users will
> want
> > to
> > > > use
> > > > > >> side
> > > > > >> >> > > inputs
> > > > > >> >> > > > > in
> > > > > >> >> > > > > > > > > > > > >> > > Map/FlatMapElements.
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > It also appears to be quite
> easy
> > > to
> > > > > >> >> implement:
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > Runner part:
> > > > > >> >> > > > > > > > > > > > >> > > > - introduce a
> SideInputAccessor
> > > > > interface
> > > > > >> >> > > > > > > > > > > > >> > > > - make .get() on a
> > PCollectionView
> > > > > get it
> > > > > >> >> > from a
> > > > > >> >> > > > > > > > > thread-local
> > > > > >> >> > > > > > > > > > > > >> > > > SideInputAccessor
> > > > > >> >> > > > > > > > > > > > >> > > > - make runners set the
> > > thread-local
> > > > > >> >> > > > > SideInputAccessor
> > > > > >> >> > > > > > > any
> > > > > >> >> > > > > > > > > time
> > > > > >> >> > > > > > > > > > > the
> > > > > >> >> > > > > > > > > > > > >> > runner
> > > > > >> >> > > > > > > > > > > > >> > > > is evaluating something in a
> > > context
> > > > > >> where
> > > > > >> >> > side
> > > > > >> >> > > > > inputs
> > > > > >> >> > > > > > > are
> > > > > >> >> > > > > > > > > > > > >> available,
> > > > > >> >> > > > > > > > > > > > >> > > e.g.
> > > > > >> >> > > > > > > > > > > > >> > > > a ProcessElement method, or
> > > > applying a
> > > > > >> >> > > CombineFn -
> > > > > >> >> > > > > the
> > > > > >> >> > > > > > > set
> > > > > >> >> > > > > > > > > of
> > > > > >> >> > > > > > > > > > > such
> > > > > >> >> > > > > > > > > > > > >> > places
> > > > > >> >> > > > > > > > > > > > >> > > > will be quite small. I believe
> > > only
> > > > > >> runners
> > > > > >> >> > (but
> > > > > >> >> > > > not
> > > > > >> >> > > > > > > > > > transforms)
> > > > > >> >> > > > > > > > > > > > >> will
> > > > > >> >> > > > > > > > > > > > >> > > ever
> > > > > >> >> > > > > > > > > > > > >> > > > need to set this thread-local
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > Transform part:
> > > > > >> >> > > > > > > > > > > > >> > > > - Transforms that take
> user-code
> > > > > lambdas
> > > > > >> and
> > > > > >> >> > > want
> > > > > >> >> > > > to
> > > > > >> >> > > > > > let
> > > > > >> >> > > > > > > > > them
> > > > > >> >> > > > > > > > > > > > access
> > > > > >> >> > > > > > > > > > > > >> > side
> > > > > >> >> > > > > > > > > > > > >> > > > inputs still will need to be
> > > > > configurable
> > > > > >> >> > with a
> > > > > >> >> > > > > > method
> > > > > >> >> > > > > > > > like
> > > > > >> >> > > > > > > > > > > > >> > > > .withSideInputs(view1,
> view2...)
> > > and
> > > > > will
> > > > > >> >> need
> > > > > >> >> > > to
> > > > > >> >> > > > > > plumb
> > > > > >> >> > > > > > > > > those
> > > > > >> >> > > > > > > > > > > down
> > > > > >> >> > > > > > > > > > > > >> to
> > > > > >> >> > > > > > > > > > > > >> > the
> > > > > >> >> > > > > > > > > > > > >> > > > primitive DoFn's or
> CombineFn's
> > -
> > > > > >> >> information
> > > > > >> >> > on
> > > > > >> >> > > > > > *which*
> > > > > >> >> > > > > > > > > side
> > > > > >> >> > > > > > > > > > > > inputs
> > > > > >> >> > > > > > > > > > > > >> > will
> > > > > >> >> > > > > > > > > > > > >> > > > be accessed, of course, still
> > > needs
> > > > to
> > > > > >> be in
> > > > > >> >> > the
> > > > > >> >> > > > > > graph.
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > > > I quickly prototyped this in
> > > direct
> > > > > >> runner
> > > > > >> >> and
> > > > > >> >> > > > > > > > MapElements,
> > > > > >> >> > > > > > > > > > and
> > > > > >> >> > > > > > > > > > > it
> > > > > >> >> > > > > > > > > > > > >> > worked
> > > > > >> >> > > > > > > > > > > > >> > > > with no issues - I'm wondering
> > if
> > > > > there's
> > > > > >> >> > > > something
> > > > > >> >> > > > > > > subtle
> > > > > >> >> > > > > > > > > > that
> > > > > >> >> > > > > > > > > > > > I'm
> > > > > >> >> > > > > > > > > > > > >> > > > missing, or is it actually a
> > good
> > > > > idea?
> > > > > >> >> > > > > > > > > > > > >> > > >
> > > > > >> >> > > > > > > > > > > > >> > >
> > > > > >> >> > > > > > > > > > > > >> >
> > > > > >> >> > > > > > > > > > > > >>
> > > > > >> >> > > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > > >
> > > > > >> >> > > > > > > > > > >
> > > > > >> >> > > > > > > > > >
> > > > > >> >> > > > > > > > >
> > > > > >> >> > > > > > > >
> > > > > >> >> > > > > > >
> > > > > >> >> > > > > >
> > > > > >> >> > > > >
> > > > > >> >> > > >
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to