Hi all, Please take a look at some notes from a discussion we had about this with a few folks, and an updated proposal and a couple of demo PRs implementing the proposal. http://s.apache.org/context-fn
I hope this proposal is more agreeable. On Wed, Sep 13, 2017 at 3:46 PM Kenneth Knowles <k...@google.com.invalid> wrote: > ValueProvider is global, PCollectionView is per-window, state is > per-step/key/window, etc. > > So my unhappiness increases as we move through that list, adding more and > more constraints on correct use, none of which are reflected in the API. > Your description of "its context is an execution of the pipeline" is > accurate for ValueProvider. The question is not merely "which DoFn will > need which side inputs" but in which methods the side input is accessed > (forbidden in every DoFn method other than @ProcessElement and @OnTimer). > > As for lambdas being more universal - I agree! But the capabilities of > ParDo are not. I don't think we should transparently make them available > anywhere you have a lambda. For example, multiply triggered side inputs > fundamentally alter the semantics of MapElements and Filter to vary over > time. The only reason this isn't a showstopper is that multiply triggered > side inputs have very loose consistency already, and you can write > nondeterministic predicates and map functions anyhow. If either of those > were better, we'd want to keep them that way. > > Since NewDoFn is somewhat tied to the alternative proposal, and there's the > point that since lambdas are cross-language we might reconsider > ProcessContext (aka "pile of mud") style. But this universality - being the > lowest common denominator across languages - is not a goal. Python already > is quite different from Java, using | and >> and kwarg side inputs to good > effect. And those two languages are quite similar. Go will look entirely > different. For Java, annotation-driven APIs are common and offer important > advantages for readability, validation, and forward/backward compatibility. > And incidentally NewDoFn subsumes ProcessContext. > > On Wed, Sep 13, 2017 at 2:32 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Thanks! > > > > I think most of the issues you point out [validation, scheduling, > > prefetching] are in the area of wiring. I reiterate that they can be > solved > > - both of the methods below will give the runner an answer to the > low-level > > question "which DoFn will need which side inputs": > > > > 1) Providing withSideInputs() builder methods on transforms that are > > parameterized by user code. If only some side inputs should be made > > available to particular bits of user code, provide more detailed > > withBlahSideInputs() methods - this is up to the transform. > > > > 2) Inferring this from something annotation-driven as indicated in the > > thread, e.g. capturing the PCollectionView in @SideInput-annotated public > > fields. This can't be done on a lambda, because lambdas don't have fields > > [so I think method #1 will keep being necessary], but it can be done on > an > > anonymous class. > > > > As for direct access being misleading: I'm not sure I agree. I think the > > intuition for PCollectionView.get() is no more wrong than the intuition > for > > ValueProvider.get(): the return value is, logically, context-free [more > > like: its context is an execution of the pipeline], so I have no issue > with > > it being accessed implicitly. > > > > On Wed, Sep 13, 2017 at 2:05 PM Kenneth Knowles <k...@google.com.invalid> > > wrote: > > > > > I made some comments on > https://issues.apache.org/jira/browse/BEAM-2950 > > > which was filed to do a similar thing for State. Luke correctly pointed > > out > > > that many of the points apply here as well. I said most of the same > > above, > > > but I thought I'd pull them out again from that ticket and rephrase to > > > apply to side inputs: > > > > > > - Direct access at first appears "more intuitive" because to a > newcomer > > it > > > "looks like" normal [captured variable] access. But in fact it is > nothing > > > like normal [captured variable] access so this intuition is misleading > > and > > > should not be encouraged. So it is actually less readable because your > > > intuitive reading is wrong. > > > > > > - This design would miss the validation aspect. One way it is > different > > > than normal [functional] programming is that there are many places it > is > > > illegal to reference [side inputs], such as StartBundle/FinishBundle, > or > > > passing to another object. This proposal would turn those into dynamic > > > failures at best, or in the worst case data corruption (runner fails to > > > catch illegal access, and permits some thread-global context to leak) > > > > > > - It is actually mandatory that we are always able to detect [side > > inputs, > > > or the user has to manually wire them], as it [must be scheduled > > > differently] > > > > > > - A runner can't automatically prefetch, because it doesn't know which > > > [side input] is used by which methods. > > > > > > - Magic by mutating stuff into place is just less readable / more > error > > > prone. > > > > > > State has even more compelling issues and none of the benefits so my > > +0.75 > > > for side inputs (now I am feeling more like +0.25) is a -1 for state. > We > > > should definitely not block one feature on all vaguely similar > features. > > > > > > Kenn > > > > > > > > > > > > On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov < > > > kirpic...@google.com.invalid> wrote: > > > > > > > On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw > > > > <rober...@google.com.invalid> > > > > wrote: > > > > > > > > > On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov > > > > > <kirpic...@google.com.invalid> wrote: > > > > > > Hi Robert, > > > > > > > > > > > > Given the anticipated usage of this proposal in Java, I'm not > sure > > > the > > > > > > Python approach you quoted is the right one. > > > > > > > > > > Perhaps not, but does that mean it would be a Java-ism only or > would > > > > > we implement it in Python despite it being worse there? > > > > > > > > > I'm not sure, but I don't see why the proposed approach of view.get() > > > > wouldn't work well, or be harder to implement in Python. > > > > > > > > > > > > > > > > > > > The main reason: I see how it works with Map/FlatMap, but what > > about > > > > > cases > > > > > > like FileIO.write(), parameterized by several lambdas (element -> > > > > > > destination, destination -> filename policy, destination -> > sink), > > > > where > > > > > > different lambdas may want to access different side inputs? It > > feels > > > > > > excessive to make each of the lambdas take all of the side inputs > > in > > > > the > > > > > > same order; moreover, if the composite transform internally needs > > to > > > > pass > > > > > > some more side inputs to the DoFn's executing these lambdas, it > > will > > > > need > > > > > > to manipulate the argument lists in nontrivial ways to make sure > it > > > > > passes > > > > > > them only the side inputs the user asked for, and in the proper > > > order. > > > > > > > > > > In Python it would be trivial to "slice" the side input arguments > > > > > across the lambdas in a natural way, but I can see that this would > be > > > > > more of a pain in Java, especially as lambdas are unnecessarily > > > > > crippled during compilation. > > > > > > > > > > > Another reason is, I think with Java's type system it's > impossible > > to > > > > > have > > > > > > a NewDoFn-style API for lambdas, because annotations on lambda > > > > arguments > > > > > > are dropped when the lambda is converted to the respective > > > > single-method > > > > > > interface - a lambda is subject to a lot more type erasure than > > > > anonymous > > > > > > class. > > > > > > > > > > Yeah, this is unfortunate. But, as mentioned, side inputs don't > need > > > > > to be annotated, just counted. For something like inspecting the > > > > > window the NewDoFn has a lot of advantages over implicit access > (and > > > > > makes it so you can't "forget" to declare your dependency), but I > do > > > > > see advantages for the implicit way of doing things for delegating > to > > > > > other callables. > > > > > > > > > > On the other hand, there is a bit of precedence for this: metrics > > have > > > > > the "implicit" api. If we do go this direction for side inputs, we > > > > > should also consider it for state and side outputs. > > > > > > > > > I think Kenn is very strongly against using it for state, whereas I > > don't > > > > have an opinion either way because I can't think of a use case for > > > > accessing state from a lambda - we should probably discuss this > > > separately, > > > > with proposed code examples in front of us. > > > > > > > > For side outputs, yes, it might be nice to ".add()" to a PCollection, > > but > > > > it would require bigger changes - e.g. creating intermediate > > > PCollection's > > > > and inserting an implicit Flatten in front of all steps that > contribute > > > to > > > > this PCollection, because a PCollection currently can be produced > only > > > by 1 > > > > step. Maybe there's a different way to express implicit side outputs. > > > > Either way I support the idea of looking for such a way because it > > would > > > > simplify use cases such as error handling dead-letter collections. > > > > > > > > I guess the bigger point is: do we want to block the discussion of > > > implicit > > > > side inputs on making a decision about the implicitness of other > things > > > > (side outputs, state, PipelineOptions, window etc). I can see the > > > argument > > > > for a "yes, block", but can also see the argument for a "no, don't > > > block" - > > > > because this proposal is (as indicated earlier in the thread) > > > > forward-compatible with annotation-based wiring, because we already > > have > > > a > > > > precedent for implicit access of something via ValueProvider, and > > because > > > > of the advantages it offers. > > > > > > > > Want to mention another advantage: lambdas are likely to be much > easier > > > > than NewDoFn approach to use from non-Java but JVM languages/SDKs > (e.g. > > > > Scio), which might have even more type erasure, or might have less > > > > sophisticated annotation machinery, or NewDoFn-style anonymous > classes > > > > might be highly non-idiomatic in them. Lambdas are idiomatic in every > > > > language that supports lambdas, which these days is basically every > > > > language. [I might be opening a can of worms here, but I guess you > can > > > > consider this an argument against NewDoFn in general - though that's > > > > certainly outside the scope of this thread]. > > > > > > > > > > > > > > > > > > > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw > > > > > <rober...@google.com.invalid> > > > > > > wrote: > > > > > > > > > > > >> +1 to reducing the amount of boilerplate for dealing with side > > > inputs. > > > > > >> > > > > > >> I prefer the "NewDoFn" style of side inputs for consistency. The > > > > > >> primary drawback seems to be lambda's incompatibility with > > > > > >> annotations. This is solved in Python by letting all the first > > > > > >> annotated argument of the process method be the main input, and > > > > > >> subsequent ones be the side input. For example > > > > > >> > > > > > >> main_pcoll | beam.Map( > > > > > >> lambda main_input_elem, side_input_value: main_input_elem + > > > > > >> side_input_value, > > > > > >> side_input_pvalue) > > > > > >> > > > > > >> For multiple side inputs they are mapped positionally (though > > Python > > > > > >> has the advantage that arguments can be passed by keyword as > well > > to > > > > > >> enhance readability when there are many of them, and we allow > that > > > for > > > > > >> side inputs). Note that side_input_pvalue is not referenced > > anywhere > > > > > >> else, so we don't even have to store it and pass it around (one > > > > > >> typically writes pvalue.AsList(some_pcoll) inline here). When > the > > > > > >> concrete PCollectionView is used to access the value this means > > that > > > > > >> it must be passed separately to both the ParDo and the callback > > > > > >> (unless we can infer it, which I don't think we can do in all > > > (many?) > > > > > >> cases). > > > > > >> > > > > > >> There's no reason we couldn't do this, or something very > similar, > > in > > > > > >> Java as well. > > > > > >> > > > > > >> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax > > > <re...@google.com.invalid > > > > > > > > > > >> wrote: > > > > > >> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < > > > > > >> > kirpic...@google.com.invalid> wrote: > > > > > >> > > > > > > >> >> Hi, > > > > > >> >> > > > > > >> >> I agree with these concerns to an extent, however I think the > > > > > advantage > > > > > >> of > > > > > >> >> transparently letting any user code access side inputs, > > > especially > > > > > >> >> including lambdas, is so great that we should find a way to > > > address > > > > > >> these > > > > > >> >> concerns within the constraints of the pattern I'm proposing. > > See > > > > > more > > > > > >> >> below. > > > > > >> >> > > > > > >> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers > > > > > >> <bchamb...@google.com.invalid > > > > > >> >> > > > > > > >> >> wrote: > > > > > >> >> > > > > > >> >> > One possible issue with this is that updating a thread > local > > is > > > > > >> likely to > > > > > >> >> > be much more expensive than passing an additional argument. > > > > > >> >> > > > > > >> >> This is an implementation detail that can be fixed - Luke > made > > a > > > > > >> suggestion > > > > > >> >> on the PR to set up the side input context once per bundle > > rather > > > > > than > > > > > >> once > > > > > >> >> per element. > > > > > >> >> > > > > > >> > > > > > > >> > However remember that bundles might be small. Dataflow > streaming > > > > > runner > > > > > >> > creates small bundles by design. The Flink runner creates > > > > > single-element > > > > > >> > bundles. > > > > > >> > > > > > > >> > > > > > > >> >> > > > > > >> >> > > > > > >> >> > Also, not all > > > > > >> >> > code called from within the DoFn will necessarily be in the > > > same > > > > > >> thread > > > > > >> >> > (eg., sometimes we create a pool of threads for doing > work). > > > > > >> >> > > > > > >> >> I think we already require that c.output() can not be done > from > > > > > multiple > > > > > >> >> threads; and I don't think we document c.sideInput() to be > > > > > thread-safe > > > > > >> - it > > > > > >> >> may be reasonable to declare that it isn't and has to be > > accessed > > > > > from > > > > > >> the > > > > > >> >> same thread as the ProcessElement call. If we want to relax > > this, > > > > > then > > > > > >> >> there might be ways to deal with that too, e.g. provide > > utilities > > > > for > > > > > >> the > > > > > >> >> user to capture the "user code context" and restoring it > > inside a > > > > > >> thread. > > > > > >> >> This would likely be valuable for other purposes, such as > > making > > > > > those > > > > > >> >> extra threads visible to our profiling utilities. > > > > > >> >> > > > > > >> > > > > > > >> > This seems fair, but we should be be very careful about our > > > > > >> documentation. > > > > > >> > And +1 to adding utilities to make multi-threaded work easier > to > > > > > manage. > > > > > >> > > > > > > >> >> > > > > > >> >> > > > > > >> >> > It may be > > > > > >> >> > *more* confusing for this to sometimes work magically and > > > > sometimes > > > > > >> fail > > > > > >> >> > horribly. Also, requiring the PCollectionView to be passed > to > > > > user > > > > > >> code > > > > > >> >> > that accesses it is nice because it makes *very clear* that > > the > > > > > side > > > > > >> >> input > > > > > >> >> > needs to be provided from the DoFn to that particular > > utility. > > > If > > > > > it > > > > > >> is > > > > > >> >> > accessed via "spooky action at a distance" we lose that > piece > > > of > > > > > >> "free" > > > > > >> >> > documentation, which may lead to extensive misuse of these > > > > utility > > > > > >> >> methods. > > > > > >> >> > > > > > > >> >> I'd like to understand this concern better - from this > > > description > > > > > it's > > > > > >> not > > > > > >> >> clear to me. The pattern I'm proposing is that, when you're > > > > > authoring a > > > > > >> >> PTransform that is configured by any user callbacks, then: > > > > > >> >> - you should provide a builder method .withSideInputs(...) > > > > > >> >> - you should propagate those side inputs to all your internal > > > > DoFn's > > > > > >> that > > > > > >> >> invoke the user code > > > > > >> >> - in return the user callbacks will be allowed to access > those > > > > > >> particular > > > > > >> >> side inputs > > > > > >> >> This seems like a simple enough model to me to understand, > both > > > > from > > > > > a > > > > > >> >> user's perspective and from a transform author's perspective. > > > > Steps 1 > > > > > >> and 2 > > > > > >> >> may eventually be automated by annotation analysis or other > > means > > > > > (e.g. > > > > > >> SDK > > > > > >> >> giving a way to provide given side inputs automatically to > > > > everything > > > > > >> >> inside a composite transform rather than to individual > DoFn's). > > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > > > >> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov > > > > > >> >> > <kirpic...@google.com.invalid> wrote: > > > > > >> >> > > > > > > >> >> > > Hi, > > > > > >> >> > > > > > > > >> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles > > > > > >> <k...@google.com.invalid > > > > > >> >> > > > > > > >> >> > > wrote: > > > > > >> >> > > > > > > > >> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > > > > > >> >> > > > kirpic...@google.com.invalid> wrote: > > > > > >> >> > > > > > > > > >> >> > > > > > > > > > >> >> > > > > The differences are: > > > > > >> >> > > > > - The proposal in the doc allows wiring different > side > > > > > inputs to > > > > > >> >> the > > > > > >> >> > > same > > > > > >> >> > > > > Supplier, but I'm not convinced that this is > important > > - > > > > you > > > > > can > > > > > >> >> just > > > > > >> >> > > as > > > > > >> >> > > > > easily call the constructor of your DoFn passing > > > different > > > > > >> >> > > > > PCollectionView's for it to capture. > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > I disagree with this bit about it being "just as easy". > > > > Passing > > > > > >> the > > > > > >> >> > > needed > > > > > >> >> > > > PCollectionViews to your constructor (or even having a > > > > > >> constructor) > > > > > >> >> is > > > > > >> >> > a > > > > > >> >> > > > pain. Every time I have to do it, it adds a ton of > > > > boilerplate > > > > > >> that > > > > > >> >> > feels > > > > > >> >> > > > like pure noise. To make a DoFn reusable it must be > made > > > > into a > > > > > >> named > > > > > >> >> > > class > > > > > >> >> > > > with a constructor, versus inlined with no constructor. > > > > > >> >> > > > > > > > >> >> > > Hm, why? You can have the DoFn be an anonymous class > > > capturing > > > > > the > > > > > >> >> > > PCollectionView into a @SideInput field as a closure. > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > A generous analogy > > > > > >> >> > > > is is that it is "just" manual closure > > conversion/currying, > > > > > >> changing > > > > > >> >> > > > f(side, main) to f(side)(main). But in practice in Beam > > the > > > > > second > > > > > >> >> one > > > > > >> >> > > has > > > > > >> >> > > > much more boilerplate. > > > > > >> >> > > > > > > > > >> >> > > > Also, Beam is worse. We present the user with > > higher-order > > > > > >> functions, > > > > > >> >> > > which > > > > > >> >> > > > is where the actual annoyance comes in. When you want > to > > > > > pardo(f) > > > > > >> you > > > > > >> >> > > have > > > > > >> >> > > > to write pardo(f(side))(side, main). Your proposal is > to > > > > > support > > > > > >> >> > > > pardo(f(side))(main) and mine is to support > > pardo(f)(side, > > > > > main). > > > > > >> I > > > > > >> >> > still > > > > > >> >> > > > propose that we support both (as they get implemented). > > If > > > > you > > > > > >> buy in > > > > > >> >> > to > > > > > >> >> > > my > > > > > >> >> > > > analogy, then there's decades of precedent and the > burden > > > of > > > > > proof > > > > > >> >> > falls > > > > > >> >> > > > heavily on whoever doesn't want to support both. > > > > > >> >> > > > > > > > > >> >> > > I see your point. I think the proposal is compatible with > > > what > > > > > >> you're > > > > > >> >> > > suggesting too - in DoFn we could have @SideInput > > > *parameters* > > > > of > > > > > >> type > > > > > >> >> > > PCollectionView, with the same semantics as a field. > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > > >> >> > > > - My proposal allows getting rid of .withSideInputs() > > > > entirely, > > > > > >> >> because > > > > > >> >> > > the > > > > > >> >> > > > > DoFn captures the PCollectionView so you don't need > to > > > > > specify > > > > > >> it > > > > > >> >> > > > > explicitly for wiring. > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > I've decided to change to full +1 (whatever that means > > > > > compared to > > > > > >> >> 0.75 > > > > > >> >> > > :-) > > > > > >> >> > > > to adding support for @SideInput fields, because the > > > benefits > > > > > >> >> outweigh > > > > > >> >> > > this > > > > > >> >> > > > failure mode: > > > > > >> >> > > > > > > > > >> >> > > > new DoFn { > > > > > >> >> > > > // forgot the annotation > > > > > >> >> > > > private final PCollectionView whatever; > > > > > >> >> > > > > > > > > >> >> > > > @ProcessElement public void process(...) { > > > > > >> >> > > > whatever.get(); // crash during execution > > > > > >> >> > > > } > > > > > >> >> > > > } > > > > > >> >> > > > > > > > > >> >> > > > But ideas to mitigate that would be cool. > > > > > >> >> > > > > > > > >> >> > > Hm, can't think of anything less hacky than "prohibit > > having > > > > > fields > > > > > >> of > > > > > >> >> > type > > > > > >> >> > > PCollectionView that are not public, final, and annotated > > > with > > > > > >> >> > @SideInput" > > > > > >> >> > > - not sure we'd want to go down this road. I suppose a > good > > > > error > > > > > >> >> message > > > > > >> >> > > in .get() would be sufficient, saying "Did you forget to > > > > specify > > > > > a > > > > > >> >> > > requirement for this side input via .withSideInputs() or > by > > > > > >> annotating > > > > > >> >> > the > > > > > >> >> > > field as @SideInput" or something like that. > > > > > >> >> > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > Kenn > > > > > >> >> > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > > > >> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik > > > > > >> >> <lc...@google.com.invalid > > > > > >> >> > > > > > > > >> >> > > > > wrote: > > > > > >> >> > > > > > > > > > >> >> > > > > > My concern with the proposal is not the specifics > of > > > how > > > > it > > > > > >> will > > > > > >> >> > work > > > > > >> >> > > > and > > > > > >> >> > > > > > more about it being yet another way on how our API > is > > > to > > > > be > > > > > >> used > > > > > >> >> > even > > > > > >> >> > > > > > though we have a proposal [1] of an API style we > were > > > > > working > > > > > >> >> > towards > > > > > >> >> > > > in > > > > > >> >> > > > > > Java and Python. I would rather re-open that > > discussion > > > > now > > > > > >> about > > > > > >> >> > > what > > > > > >> >> > > > we > > > > > >> >> > > > > > want that API to look like for our major features > and > > > > work > > > > > >> >> towards > > > > > >> >> > > > > > consistency (or not if there is a strong argument > as > > to > > > > why > > > > > >> some > > > > > >> >> > > > feature > > > > > >> >> > > > > > should have a different style). > > > > > >> >> > > > > > > > > > > >> >> > > > > > 1: https://s.apache.org/a-new-dofn > > > > > >> >> > > > > > > > > > > >> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > > > > > >> >> > > > <k...@google.com.invalid > > > > > >> >> > > > > > > > > > > >> >> > > > > > wrote: > > > > > >> >> > > > > > > > > > > >> >> > > > > > > +0.75 because I'd like to bring up invalid > > pipelines. > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > I had proposed side inputs as parameters to DoFn > in > > > > > >> >> > > > > > > https://s.apache.org/a-new-dofn (specifically at > > > [1]) > > > > so > > > > > >> the > > > > > >> >> > only > > > > > >> >> > > > > place > > > > > >> >> > > > > > > they are specified is in the graph construction, > > > making > > > > > the > > > > > >> >> DoFn > > > > > >> >> > > more > > > > > >> >> > > > > > > reusable and errors impossible. I've actually > been > > > > > noodling > > > > > >> my > > > > > >> >> > way > > > > > >> >> > > > > > towards > > > > > >> >> > > > > > > this in a branch :-) > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > Eugene's proposal is a sort of converse, where > the > > > side > > > > > >> inputs > > > > > >> >> > are > > > > > >> >> > > > > values > > > > > >> >> > > > > > > captured in the closure and not parameters, yet > the > > > > only > > > > > >> place > > > > > >> >> > they > > > > > >> >> > > > are > > > > > >> >> > > > > > > specified is in the DoFn. > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > I see no conflict between these two. It is very > > > natural > > > > > to > > > > > >> have > > > > > >> >> > > both > > > > > >> >> > > > > the > > > > > >> >> > > > > > > capability to accept parameters and the ability > to > > > > > capture > > > > > >> >> > > variables > > > > > >> >> > > > in > > > > > >> >> > > > > > the > > > > > >> >> > > > > > > closure. Supporting both is totally standard in > > > > > up-to-date > > > > > >> >> > > > programming > > > > > >> >> > > > > > > languages. > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > Today we have the worse of both worlds: > > > PCollectionView > > > > > >> behaves > > > > > >> >> > as > > > > > >> >> > > > > > > something captured in the closure/constructor, > but > > > must > > > > > >> still > > > > > >> >> be > > > > > >> >> > > > > > explicitly > > > > > >> >> > > > > > > wired up. > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > But if I use PCollectionView.get() and have not > > wired > > > > it > > > > > up > > > > > >> in > > > > > >> >> > any > > > > > >> >> > > > way, > > > > > >> >> > > > > > > what happens? Just like today, you can try to > > > > > >> .sideInput(...) a > > > > > >> >> > > thing > > > > > >> >> > > > > > that > > > > > >> >> > > > > > > is not available. With side inputs as parameters, > > > this > > > > is > > > > > >> not > > > > > >> >> > > > possible. > > > > > >> >> > > > > > If > > > > > >> >> > > > > > > you want to treat them as captured in a closure, > > > while > > > > > >> avoiding > > > > > >> >> > > > errors, > > > > > >> >> > > > > > it > > > > > >> >> > > > > > > seems like you might need to do some low-level > > magic, > > > > > like > > > > > >> the > > > > > >> >> > > > > > > serialization-based detection that Luke has > > suggested > > > > > before > > > > > >> >> > (there > > > > > >> >> > > > are > > > > > >> >> > > > > > > known downsides that we haven't explored, like > > > > > overcapture). > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > Kenn > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > [1] > > > > > >> >> > > > > > > > > > > > >> >> > https://docs.google.com/document/d/ > > > > 1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > > > > > >> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene > Kirpichov < > > > > > >> >> > > > > > > kirpic...@google.com.invalid> wrote: > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > Hm, I guess you're right - for outputs it could > > be > > > > > indeed > > > > > >> >> quite > > > > > >> >> > > > > > valuable > > > > > >> >> > > > > > > to > > > > > >> >> > > > > > > > output to them without plumbing (e.g. > outputting > > > > > errors). > > > > > >> >> Could > > > > > >> >> > > be > > > > > >> >> > > > > done > > > > > >> >> > > > > > > > perhaps via TupleTag.output()? (assuming the > same > > > > > TupleTag > > > > > >> >> can > > > > > >> >> > > not > > > > > >> >> > > > be > > > > > >> >> > > > > > > > reused to tag multiple PCollection's) > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > For now I sent a PR for side input support > > > > > >> >> > > > > > > > https://github.com/apache/beam/pull/3814 . > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > > > > > >> >> > > > <lc...@google.com.invalid > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > wrote: > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > I disagree, state may not care where it is > used > > > as > > > > > well > > > > > >> >> > since a > > > > > >> >> > > > > > person > > > > > >> >> > > > > > > > may > > > > > >> >> > > > > > > > > call a function which needs to store/retrieve > > > state > > > > > and > > > > > >> >> > instead > > > > > >> >> > > > of > > > > > >> >> > > > > > > having > > > > > >> >> > > > > > > > > the DoFn declare the StateSpec and then pass > in > > > the > > > > > >> state > > > > > >> >> > > > > > > implementation > > > > > >> >> > > > > > > > > down into the function everywhere. Similarly > > for > > > > > >> outputs, > > > > > >> >> the > > > > > >> >> > > > > > internal > > > > > >> >> > > > > > > > > functions could take the TupleTag and request > > an > > > > > output > > > > > >> >> > manager > > > > > >> >> > > > or > > > > > >> >> > > > > > take > > > > > >> >> > > > > > > > an > > > > > >> >> > > > > > > > > "output" reference which give functions the > > > ability > > > > > to > > > > > >> >> > produce > > > > > >> >> > > > > output > > > > > >> >> > > > > > > > > directly without needing to pass everything > > that > > > is > > > > > >> needed > > > > > >> >> to > > > > > >> >> > > be > > > > > >> >> > > > > > output > > > > > >> >> > > > > > > > > back to the caller. > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene > > Kirpichov > > > < > > > > > >> >> > > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > Hm, I think of these things (state, side > > > outputs > > > > > >> etc.), > > > > > >> >> > only > > > > > >> >> > > > side > > > > > >> >> > > > > > > > inputs > > > > > >> >> > > > > > > > > > make sense to access in arbitrary user > > > callbacks > > > > > >> without > > > > > >> >> > > > explicit > > > > > >> >> > > > > > > > > knowledge > > > > > >> >> > > > > > > > > > of the surrounding transform - so only side > > > > inputs > > > > > >> can be > > > > > >> >> > > > > implicit > > > > > >> >> > > > > > > like > > > > > >> >> > > > > > > > > > this. > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > Ultimately we'll probably end up removing > > > > > >> ProcessContext, > > > > > >> >> > and > > > > > >> >> > > > > > keeping > > > > > >> >> > > > > > > > > only > > > > > >> >> > > > > > > > > > annotations (on fields / methods / > > parameters). > > > > In > > > > > >> that > > > > > >> >> > > world, > > > > > >> >> > > > a > > > > > >> >> > > > > > > field > > > > > >> >> > > > > > > > > > annotation could be used (like per my > > previous > > > > > email) > > > > > >> to > > > > > >> >> > > > > statically > > > > > >> >> > > > > > > > > specify > > > > > >> >> > > > > > > > > > which side inputs will be needed - while > the > > > > value > > > > > >> could > > > > > >> >> > > still > > > > > >> >> > > > be > > > > > >> >> > > > > > > > > accessed > > > > > >> >> > > > > > > > > > via .get(), just like state cells are > > accessed > > > > via > > > > > >> >> .read() > > > > > >> >> > > and > > > > > >> >> > > > > > > > .write(): > > > > > >> >> > > > > > > > > > i.e., #get() is not a new method of access. > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > Overall, it seems like I should proceed > with > > > the > > > > > >> idea. I > > > > > >> >> > > filed > > > > > >> >> > > > > > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > > > > > >> >> > > > > > <lc...@google.com.invalid > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > wrote: > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > > For API consistency reasons, it would be > > good > > > > if > > > > > we > > > > > >> did > > > > > >> >> > > this > > > > > >> >> > > > > > > > > holistically > > > > > >> >> > > > > > > > > > > and expanded this approach to state, side > > > > > outputs, > > > > > >> ... > > > > > >> >> so > > > > > >> >> > > > that > > > > > >> >> > > > > a > > > > > >> >> > > > > > > > person > > > > > >> >> > > > > > > > > > can > > > > > >> >> > > > > > > > > > > always call Something.get() to return > > > something > > > > > that > > > > > >> >> they > > > > > >> >> > > can > > > > > >> >> > > > > > > access > > > > > >> >> > > > > > > > > > > implementation wise. It will be confusing > > for > > > > our > > > > > >> users > > > > > >> >> > to > > > > > >> >> > > > have > > > > > >> >> > > > > > > many > > > > > >> >> > > > > > > > > > > variations in our style of how all these > > > > concepts > > > > > >> are > > > > > >> >> > used > > > > > >> >> > > > > > > > > > (ProcessContext > > > > > >> >> > > > > > > > > > > / Annotations / #get()) > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene > > > > Kirpichov > > > > > < > > > > > >> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > Also, I think my approach is compatible > > > with > > > > > >> >> > annotations > > > > > >> >> > > > and > > > > > >> >> > > > > > > future > > > > > >> >> > > > > > > > > > > removal > > > > > >> >> > > > > > > > > > > > of .withSideInputs if we annotate a > > field: > > > > > >> >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...; > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > class MyDoFn { > > > > > >> >> > > > > > > > > > > > @SideInput > > > > > >> >> > > > > > > > > > > > PCollectionView<Foo> foo = foo; > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > ...foo.get()... > > > > > >> >> > > > > > > > > > > > } > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > We can extract the accessed views from > > the > > > > DoFn > > > > > >> >> > instance > > > > > >> >> > > > > using > > > > > >> >> > > > > > > > > > > reflection. > > > > > >> >> > > > > > > > > > > > Still not compatible with lambdas, but > > > > > compatible > > > > > >> >> > > > > automatically > > > > > >> >> > > > > > > > with > > > > > >> >> > > > > > > > > > all > > > > > >> >> > > > > > > > > > > > anonymous classes. > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene > > > > Kirpichov < > > > > > >> >> > > > > > > > kirpic...@google.com> > > > > > >> >> > > > > > > > > > > > wrote: > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Hi Luke, > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > I know this (annotations) is the > > pattern > > > we > > > > > were > > > > > >> >> > > > > considering > > > > > >> >> > > > > > > for > > > > > >> >> > > > > > > > > side > > > > > >> >> > > > > > > > > > > > > inputs, but I no longer think it is > the > > > > best > > > > > >> way to > > > > > >> >> > > > access > > > > > >> >> > > > > > > them. > > > > > >> >> > > > > > > > > > > > > Annotations help getting rid of the > > > > > >> >> .withSideInputs() > > > > > >> >> > > > call, > > > > > >> >> > > > > > but > > > > > >> >> > > > > > > > > this > > > > > >> >> > > > > > > > > > is > > > > > >> >> > > > > > > > > > > > > where their advantage ends. > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > The advantages of the proposed > approach > > > are > > > > > >> that it > > > > > >> >> > > > > > > automatically > > > > > >> >> > > > > > > > > > works > > > > > >> >> > > > > > > > > > > > > with all existing callback or lambda > > > code. > > > > No > > > > > >> need > > > > > >> >> to > > > > > >> >> > > > > further > > > > > >> >> > > > > > > > > develop > > > > > >> >> > > > > > > > > > > the > > > > > >> >> > > > > > > > > > > > > reflection machinery to support side > > > input > > > > > >> >> > annotations > > > > > >> >> > > - > > > > > >> >> > > > > and > > > > > >> >> > > > > > > > > > especially > > > > > >> >> > > > > > > > > > > > to > > > > > >> >> > > > > > > > > > > > > support arbitrary user interfaces, no > > > need > > > > to > > > > > >> >> change > > > > > >> >> > > > > existing > > > > > >> >> > > > > > > > > > > transforms, > > > > > >> >> > > > > > > > > > > > > no need for transform authors to even > > > know > > > > > that > > > > > >> the > > > > > >> >> > > > > machinery > > > > > >> >> > > > > > > > > exists > > > > > >> >> > > > > > > > > > to > > > > > >> >> > > > > > > > > > > > > make side inputs usable in their > > > transforms > > > > > >> (and no > > > > > >> >> > > need > > > > > >> >> > > > > for > > > > > >> >> > > > > > > > > authors > > > > > >> >> > > > > > > > > > to > > > > > >> >> > > > > > > > > > > > > think about whether or not they > should > > > > > support > > > > > >> side > > > > > >> >> > > > > inputs). > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Moreover, like Reuven says, > annotations > > > > don't > > > > > >> work > > > > > >> >> > with > > > > > >> >> > > > > > lambdas > > > > > >> >> > > > > > > > at > > > > > >> >> > > > > > > > > > all: > > > > > >> >> > > > > > > > > > > > > creating a lambda with a flexible set > > of > > > > > >> annotation > > > > > >> >> > > > > arguments > > > > > >> >> > > > > > > > > appears > > > > > >> >> > > > > > > > > > > to > > > > > >> >> > > > > > > > > > > > be > > > > > >> >> > > > > > > > > > > > > currently impossible, and even > > capturing > > > > the > > > > > >> >> > > annotations > > > > > >> >> > > > on > > > > > >> >> > > > > > > > > arguments > > > > > >> >> > > > > > > > > > > of > > > > > >> >> > > > > > > > > > > > a > > > > > >> >> > > > > > > > > > > > > lambda is I believe also impossible > > > because > > > > > the > > > > > >> >> Java > > > > > >> >> > > > > compiler > > > > > >> >> > > > > > > > drops > > > > > >> >> > > > > > > > > > > them > > > > > >> >> > > > > > > > > > > > in > > > > > >> >> > > > > > > > > > > > > the generated class or method handle. > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz > > > Cwik > > > > > >> >> > > > > > > > > <lc...@google.com.invalid > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > wrote: > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > >> I believe we should follow the > pattern > > > > that > > > > > >> state > > > > > >> >> > uses > > > > > >> >> > > > and > > > > > >> >> > > > > > > add a > > > > > >> >> > > > > > > > > > type > > > > > >> >> > > > > > > > > > > > >> annotation to link the side input > > > > > definition to > > > > > >> >> its > > > > > >> >> > > > usage > > > > > >> >> > > > > > > > > directly. > > > > > >> >> > > > > > > > > > > This > > > > > >> >> > > > > > > > > > > > >> would allow us to know that the side > > > input > > > > > was > > > > > >> >> > > > definitely > > > > > >> >> > > > > > > being > > > > > >> >> > > > > > > > > > > accessed > > > > > >> >> > > > > > > > > > > > >> and perform validation during graph > > > > > >> construction > > > > > >> >> for > > > > > >> >> > > any > > > > > >> >> > > > > > used > > > > > >> >> > > > > > > > but > > > > > >> >> > > > > > > > > > > > >> unspecified side inputs. > > > > > >> >> > > > > > > > > > > > >> > > > > > >> >> > > > > > > > > > > > >> Code snippet: > > > > > >> >> > > > > > > > > > > > >> final PCollectionView<String> foo = > > > > > >> >> > > > > > pipeline.apply("fooName", > > > > > >> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.< > > > > > >> >> String>asSingleton()); > > > > > >> >> > > > > > > > > > > > >> PCollection<String> output = > pipeline > > > > > >> >> > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > >> >> > > > > > > > > > > > >> .apply(MapElements.via( > > > > > >> >> > > > > > > > > > > > >> new SimpleFunction<Integer, > > > > > String>() { > > > > > >> >> > > > > > > > > > > > >> @Override > > > > > >> >> > > > > > > > > > > > >> public String > apply(Integer > > > > input, > > > > > >> >> > > > > > > > @SideInput("fooName") > > > > > >> >> > > > > > > > > > > > String > > > > > >> >> > > > > > > > > > > > >> fooValue) { > > > > > >> >> > > > > > > > > > > > >> return fooValue + " " + > > > input; > > > > > >> >> > > > > > > > > > > > >> } > > > > > >> >> > > > > > > > > > > > >> }).withSideInputs(foo));* > > > > > >> >> > > > > > > > > > > > >> > > > > > >> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, > Eugene > > > > > >> Kirpichov < > > > > > >> >> > > > > > > > > > > > >> kirpic...@google.com.invalid> > wrote: > > > > > >> >> > > > > > > > > > > > >> > > > > > >> >> > > > > > > > > > > > >> > Sure, here's how a modified > > (passing) > > > > > >> >> MapElements > > > > > >> >> > > unit > > > > > >> >> > > > > > test > > > > > >> >> > > > > > > > > looks > > > > > >> >> > > > > > > > > > > > like, > > > > > >> >> > > > > > > > > > > > >> > with usage of side inputs: > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > @Test > > > > > >> >> > > > > > > > > > > > >> > @Category(NeedsRunner.class) > > > > > >> >> > > > > > > > > > > > >> > public void > > > > testMapBasicWithSideInput() > > > > > >> throws > > > > > >> >> > > > > > Exception { > > > > > >> >> > > > > > > > > > > > >> > * final PCollectionView<String> > > foo > > > > =* > > > > > >> >> > > > > > > > > > > > >> > * pipeline.apply("foo", > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > >> >> > > > > > > > > > > > >> > PCollection<String> output = > > > > pipeline > > > > > >> >> > > > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > >> >> > > > > > > > > > > > >> > .apply(MapElements.via( > > > > > >> >> > > > > > > > > > > > >> > new > > > SimpleFunction<Integer, > > > > > >> >> String>() > > > > > >> >> > { > > > > > >> >> > > > > > > > > > > > >> > @Override > > > > > >> >> > > > > > > > > > > > >> > public String > > > > apply(Integer > > > > > >> >> input) { > > > > > >> >> > > > > > > > > > > > >> > return* foo.get() > > *+ " > > > > " + > > > > > >> >> input; > > > > > >> >> > > > > > > > > > > > >> > } > > > > > >> >> > > > > > > > > > > > >> > }) > > > > > >> >> > > > > > > > > > > > >> > *.withSideInputs(foo));* > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > PAssert.that(output). > > > > > >> >> containsInAnyOrder("foo > > > > > >> >> > 1", > > > > > >> >> > > > > "foo > > > > > >> >> > > > > > > 2", > > > > > >> >> > > > > > > > > > "foo > > > > > >> >> > > > > > > > > > > > 3"); > > > > > >> >> > > > > > > > > > > > >> > pipeline.run(); > > > > > >> >> > > > > > > > > > > > >> > } > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM > > Reuven > > > > Lax > > > > > >> >> > > > > > > > > > <re...@google.com.invalid > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > >> > wrote: > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > > Can you provide a code snippet > > > showing > > > > > how > > > > > >> >> this > > > > > >> >> > > > would > > > > > >> >> > > > > > > look? > > > > > >> >> > > > > > > > > > > > >> > > > > > > > >> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, > > > Eugene > > > > > >> >> > Kirpichov < > > > > > >> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid> > > > wrote: > > > > > >> >> > > > > > > > > > > > >> > > > > > > > >> >> > > > > > > > > > > > >> > > > TL;DR Introduce method > > > > > >> >> PCollectionView.get(), > > > > > >> >> > > > > > > implemented > > > > > >> >> > > > > > > > > as: > > > > > >> >> > > > > > > > > > > get > > > > > >> >> > > > > > > > > > > > >> > > > thread-local ProcessContext > and > > do > > > > > >> >> > > > > c.sideInput(this). > > > > > >> >> > > > > > > As a > > > > > >> >> > > > > > > > > > > result, > > > > > >> >> > > > > > > > > > > > >> any > > > > > >> >> > > > > > > > > > > > >> > > user > > > > > >> >> > > > > > > > > > > > >> > > > lambdas such as MapElements > can > > > use > > > > > side > > > > > >> >> > inputs. > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > Quite a few transforms have > > > > user-code > > > > > >> >> > callbacks > > > > > >> >> > > or > > > > > >> >> > > > > > > > lambdas: > > > > > >> >> > > > > > > > > > > ParDo > > > > > >> >> > > > > > > > > > > > >> > (DoFn), > > > > > >> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the > > > > > >> DynamicDestinations > > > > > >> >> > > > classes > > > > > >> >> > > > > > in > > > > > >> >> > > > > > > > > > various > > > > > >> >> > > > > > > > > > > > IOs, > > > > > >> >> > > > > > > > > > > > >> > > > combine fns, the PollFn > callback > > > in > > > > > >> Watch, > > > > > >> >> > etc. > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > Of these, only DoFn and > > CombineFn > > > > have > > > > > >> >> > built-in > > > > > >> >> > > > > > support > > > > > >> >> > > > > > > > for > > > > > >> >> > > > > > > > > > side > > > > > >> >> > > > > > > > > > > > >> > inputs; > > > > > >> >> > > > > > > > > > > > >> > > > for DynamicDestinations it is > > > > plumbed > > > > > >> >> > > explicitly; > > > > > >> >> > > > > > others > > > > > >> >> > > > > > > > > don't > > > > > >> >> > > > > > > > > > > > have > > > > > >> >> > > > > > > > > > > > >> > > access > > > > > >> >> > > > > > > > > > > > >> > > > (e.g. you can't access side > > inputs > > > > > from > > > > > >> >> > > > > > > > Map/FlatMapElements > > > > > >> >> > > > > > > > > > > > because > > > > > >> >> > > > > > > > > > > > >> > they > > > > > >> >> > > > > > > > > > > > >> > > > don't have a ProcessContext or > > any > > > > > >> context > > > > > >> >> for > > > > > >> >> > > > that > > > > > >> >> > > > > > > > matter). > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > I think it's important to > solve > > > > this, > > > > > >> >> > especially > > > > > >> >> > > > as > > > > > >> >> > > > > > > Java 8 > > > > > >> >> > > > > > > > > > > becomes > > > > > >> >> > > > > > > > > > > > >> > > people's > > > > > >> >> > > > > > > > > > > > >> > > > default choice: users will > want > > to > > > > use > > > > > >> side > > > > > >> >> > > inputs > > > > > >> >> > > > > in > > > > > >> >> > > > > > > > > > > > >> > > Map/FlatMapElements. > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > It also appears to be quite > easy > > > to > > > > > >> >> implement: > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > Runner part: > > > > > >> >> > > > > > > > > > > > >> > > > - introduce a > SideInputAccessor > > > > > interface > > > > > >> >> > > > > > > > > > > > >> > > > - make .get() on a > > PCollectionView > > > > > get it > > > > > >> >> > from a > > > > > >> >> > > > > > > > > thread-local > > > > > >> >> > > > > > > > > > > > >> > > > SideInputAccessor > > > > > >> >> > > > > > > > > > > > >> > > > - make runners set the > > > thread-local > > > > > >> >> > > > > SideInputAccessor > > > > > >> >> > > > > > > any > > > > > >> >> > > > > > > > > time > > > > > >> >> > > > > > > > > > > the > > > > > >> >> > > > > > > > > > > > >> > runner > > > > > >> >> > > > > > > > > > > > >> > > > is evaluating something in a > > > context > > > > > >> where > > > > > >> >> > side > > > > > >> >> > > > > inputs > > > > > >> >> > > > > > > are > > > > > >> >> > > > > > > > > > > > >> available, > > > > > >> >> > > > > > > > > > > > >> > > e.g. > > > > > >> >> > > > > > > > > > > > >> > > > a ProcessElement method, or > > > > applying a > > > > > >> >> > > CombineFn - > > > > > >> >> > > > > the > > > > > >> >> > > > > > > set > > > > > >> >> > > > > > > > > of > > > > > >> >> > > > > > > > > > > such > > > > > >> >> > > > > > > > > > > > >> > places > > > > > >> >> > > > > > > > > > > > >> > > > will be quite small. I believe > > > only > > > > > >> runners > > > > > >> >> > (but > > > > > >> >> > > > not > > > > > >> >> > > > > > > > > > transforms) > > > > > >> >> > > > > > > > > > > > >> will > > > > > >> >> > > > > > > > > > > > >> > > ever > > > > > >> >> > > > > > > > > > > > >> > > > need to set this thread-local > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > Transform part: > > > > > >> >> > > > > > > > > > > > >> > > > - Transforms that take > user-code > > > > > lambdas > > > > > >> and > > > > > >> >> > > want > > > > > >> >> > > > to > > > > > >> >> > > > > > let > > > > > >> >> > > > > > > > > them > > > > > >> >> > > > > > > > > > > > access > > > > > >> >> > > > > > > > > > > > >> > side > > > > > >> >> > > > > > > > > > > > >> > > > inputs still will need to be > > > > > configurable > > > > > >> >> > with a > > > > > >> >> > > > > > method > > > > > >> >> > > > > > > > like > > > > > >> >> > > > > > > > > > > > >> > > > .withSideInputs(view1, > view2...) > > > and > > > > > will > > > > > >> >> need > > > > > >> >> > > to > > > > > >> >> > > > > > plumb > > > > > >> >> > > > > > > > > those > > > > > >> >> > > > > > > > > > > down > > > > > >> >> > > > > > > > > > > > >> to > > > > > >> >> > > > > > > > > > > > >> > the > > > > > >> >> > > > > > > > > > > > >> > > > primitive DoFn's or > CombineFn's > > - > > > > > >> >> information > > > > > >> >> > on > > > > > >> >> > > > > > *which* > > > > > >> >> > > > > > > > > side > > > > > >> >> > > > > > > > > > > > inputs > > > > > >> >> > > > > > > > > > > > >> > will > > > > > >> >> > > > > > > > > > > > >> > > > be accessed, of course, still > > > needs > > > > to > > > > > >> be in > > > > > >> >> > the > > > > > >> >> > > > > > graph. > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > I quickly prototyped this in > > > direct > > > > > >> runner > > > > > >> >> and > > > > > >> >> > > > > > > > MapElements, > > > > > >> >> > > > > > > > > > and > > > > > >> >> > > > > > > > > > > it > > > > > >> >> > > > > > > > > > > > >> > worked > > > > > >> >> > > > > > > > > > > > >> > > > with no issues - I'm wondering > > if > > > > > there's > > > > > >> >> > > > something > > > > > >> >> > > > > > > subtle > > > > > >> >> > > > > > > > > > that > > > > > >> >> > > > > > > > > > > > I'm > > > > > >> >> > > > > > > > > > > > >> > > > missing, or is it actually a > > good > > > > > idea? > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > >> >> > > > > > > > > > > > >> > > > > > >> >> > > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> > > > > > > > > > > > > > > >