A bunch of people have commented on the doc, without it seems any major disagreement. The PR is out for review.
On Fri, Sep 29, 2017 at 1:53 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Hi all, > > Please take a look at some notes from a discussion we had about this with > a few folks, and an updated proposal and a couple of demo PRs implementing > the proposal. > http://s.apache.org/context-fn > > I hope this proposal is more agreeable. > > On Wed, Sep 13, 2017 at 3:46 PM Kenneth Knowles <k...@google.com.invalid> > wrote: > >> ValueProvider is global, PCollectionView is per-window, state is >> per-step/key/window, etc. >> >> So my unhappiness increases as we move through that list, adding more and >> more constraints on correct use, none of which are reflected in the API. >> Your description of "its context is an execution of the pipeline" is >> accurate for ValueProvider. The question is not merely "which DoFn will >> need which side inputs" but in which methods the side input is accessed >> (forbidden in every DoFn method other than @ProcessElement and @OnTimer). >> >> As for lambdas being more universal - I agree! But the capabilities of >> ParDo are not. I don't think we should transparently make them available >> anywhere you have a lambda. For example, multiply triggered side inputs >> fundamentally alter the semantics of MapElements and Filter to vary over >> time. The only reason this isn't a showstopper is that multiply triggered >> side inputs have very loose consistency already, and you can write >> nondeterministic predicates and map functions anyhow. If either of those >> were better, we'd want to keep them that way. >> >> Since NewDoFn is somewhat tied to the alternative proposal, and there's >> the >> point that since lambdas are cross-language we might reconsider >> ProcessContext (aka "pile of mud") style. But this universality - being >> the >> lowest common denominator across languages - is not a goal. Python already >> is quite different from Java, using | and >> and kwarg side inputs to good >> effect. And those two languages are quite similar. Go will look entirely >> different. For Java, annotation-driven APIs are common and offer important >> advantages for readability, validation, and forward/backward >> compatibility. >> And incidentally NewDoFn subsumes ProcessContext. >> >> On Wed, Sep 13, 2017 at 2:32 PM, Eugene Kirpichov < >> kirpic...@google.com.invalid> wrote: >> >> > Thanks! >> > >> > I think most of the issues you point out [validation, scheduling, >> > prefetching] are in the area of wiring. I reiterate that they can be >> solved >> > - both of the methods below will give the runner an answer to the >> low-level >> > question "which DoFn will need which side inputs": >> > >> > 1) Providing withSideInputs() builder methods on transforms that are >> > parameterized by user code. If only some side inputs should be made >> > available to particular bits of user code, provide more detailed >> > withBlahSideInputs() methods - this is up to the transform. >> > >> > 2) Inferring this from something annotation-driven as indicated in the >> > thread, e.g. capturing the PCollectionView in @SideInput-annotated >> public >> > fields. This can't be done on a lambda, because lambdas don't have >> fields >> > [so I think method #1 will keep being necessary], but it can be done on >> an >> > anonymous class. >> > >> > As for direct access being misleading: I'm not sure I agree. I think the >> > intuition for PCollectionView.get() is no more wrong than the intuition >> for >> > ValueProvider.get(): the return value is, logically, context-free [more >> > like: its context is an execution of the pipeline], so I have no issue >> with >> > it being accessed implicitly. >> > >> > On Wed, Sep 13, 2017 at 2:05 PM Kenneth Knowles <k...@google.com.invalid >> > >> > wrote: >> > >> > > I made some comments on >> https://issues.apache.org/jira/browse/BEAM-2950 >> > > which was filed to do a similar thing for State. Luke correctly >> pointed >> > out >> > > that many of the points apply here as well. I said most of the same >> > above, >> > > but I thought I'd pull them out again from that ticket and rephrase to >> > > apply to side inputs: >> > > >> > > - Direct access at first appears "more intuitive" because to a >> newcomer >> > it >> > > "looks like" normal [captured variable] access. But in fact it is >> nothing >> > > like normal [captured variable] access so this intuition is misleading >> > and >> > > should not be encouraged. So it is actually less readable because your >> > > intuitive reading is wrong. >> > > >> > > - This design would miss the validation aspect. One way it is >> different >> > > than normal [functional] programming is that there are many places it >> is >> > > illegal to reference [side inputs], such as StartBundle/FinishBundle, >> or >> > > passing to another object. This proposal would turn those into dynamic >> > > failures at best, or in the worst case data corruption (runner fails >> to >> > > catch illegal access, and permits some thread-global context to leak) >> > > >> > > - It is actually mandatory that we are always able to detect [side >> > inputs, >> > > or the user has to manually wire them], as it [must be scheduled >> > > differently] >> > > >> > > - A runner can't automatically prefetch, because it doesn't know >> which >> > > [side input] is used by which methods. >> > > >> > > - Magic by mutating stuff into place is just less readable / more >> error >> > > prone. >> > > >> > > State has even more compelling issues and none of the benefits so my >> > +0.75 >> > > for side inputs (now I am feeling more like +0.25) is a -1 for state. >> We >> > > should definitely not block one feature on all vaguely similar >> features. >> > > >> > > Kenn >> > > >> > > >> > > >> > > On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov < >> > > kirpic...@google.com.invalid> wrote: >> > > >> > > > On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw >> > > > <rober...@google.com.invalid> >> > > > wrote: >> > > > >> > > > > On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov >> > > > > <kirpic...@google.com.invalid> wrote: >> > > > > > Hi Robert, >> > > > > > >> > > > > > Given the anticipated usage of this proposal in Java, I'm not >> sure >> > > the >> > > > > > Python approach you quoted is the right one. >> > > > > >> > > > > Perhaps not, but does that mean it would be a Java-ism only or >> would >> > > > > we implement it in Python despite it being worse there? >> > > > > >> > > > I'm not sure, but I don't see why the proposed approach of >> view.get() >> > > > wouldn't work well, or be harder to implement in Python. >> > > > >> > > > >> > > > > >> > > > > > The main reason: I see how it works with Map/FlatMap, but what >> > about >> > > > > cases >> > > > > > like FileIO.write(), parameterized by several lambdas (element >> -> >> > > > > > destination, destination -> filename policy, destination -> >> sink), >> > > > where >> > > > > > different lambdas may want to access different side inputs? It >> > feels >> > > > > > excessive to make each of the lambdas take all of the side >> inputs >> > in >> > > > the >> > > > > > same order; moreover, if the composite transform internally >> needs >> > to >> > > > pass >> > > > > > some more side inputs to the DoFn's executing these lambdas, it >> > will >> > > > need >> > > > > > to manipulate the argument lists in nontrivial ways to make >> sure it >> > > > > passes >> > > > > > them only the side inputs the user asked for, and in the proper >> > > order. >> > > > > >> > > > > In Python it would be trivial to "slice" the side input arguments >> > > > > across the lambdas in a natural way, but I can see that this >> would be >> > > > > more of a pain in Java, especially as lambdas are unnecessarily >> > > > > crippled during compilation. >> > > > > >> > > > > > Another reason is, I think with Java's type system it's >> impossible >> > to >> > > > > have >> > > > > > a NewDoFn-style API for lambdas, because annotations on lambda >> > > > arguments >> > > > > > are dropped when the lambda is converted to the respective >> > > > single-method >> > > > > > interface - a lambda is subject to a lot more type erasure than >> > > > anonymous >> > > > > > class. >> > > > > >> > > > > Yeah, this is unfortunate. But, as mentioned, side inputs don't >> need >> > > > > to be annotated, just counted. For something like inspecting the >> > > > > window the NewDoFn has a lot of advantages over implicit access >> (and >> > > > > makes it so you can't "forget" to declare your dependency), but I >> do >> > > > > see advantages for the implicit way of doing things for >> delegating to >> > > > > other callables. >> > > > > >> > > > > On the other hand, there is a bit of precedence for this: metrics >> > have >> > > > > the "implicit" api. If we do go this direction for side inputs, we >> > > > > should also consider it for state and side outputs. >> > > > > >> > > > I think Kenn is very strongly against using it for state, whereas I >> > don't >> > > > have an opinion either way because I can't think of a use case for >> > > > accessing state from a lambda - we should probably discuss this >> > > separately, >> > > > with proposed code examples in front of us. >> > > > >> > > > For side outputs, yes, it might be nice to ".add()" to a >> PCollection, >> > but >> > > > it would require bigger changes - e.g. creating intermediate >> > > PCollection's >> > > > and inserting an implicit Flatten in front of all steps that >> contribute >> > > to >> > > > this PCollection, because a PCollection currently can be produced >> only >> > > by 1 >> > > > step. Maybe there's a different way to express implicit side >> outputs. >> > > > Either way I support the idea of looking for such a way because it >> > would >> > > > simplify use cases such as error handling dead-letter collections. >> > > > >> > > > I guess the bigger point is: do we want to block the discussion of >> > > implicit >> > > > side inputs on making a decision about the implicitness of other >> things >> > > > (side outputs, state, PipelineOptions, window etc). I can see the >> > > argument >> > > > for a "yes, block", but can also see the argument for a "no, don't >> > > block" - >> > > > because this proposal is (as indicated earlier in the thread) >> > > > forward-compatible with annotation-based wiring, because we already >> > have >> > > a >> > > > precedent for implicit access of something via ValueProvider, and >> > because >> > > > of the advantages it offers. >> > > > >> > > > Want to mention another advantage: lambdas are likely to be much >> easier >> > > > than NewDoFn approach to use from non-Java but JVM languages/SDKs >> (e.g. >> > > > Scio), which might have even more type erasure, or might have less >> > > > sophisticated annotation machinery, or NewDoFn-style anonymous >> classes >> > > > might be highly non-idiomatic in them. Lambdas are idiomatic in >> every >> > > > language that supports lambdas, which these days is basically every >> > > > language. [I might be opening a can of worms here, but I guess you >> can >> > > > consider this an argument against NewDoFn in general - though that's >> > > > certainly outside the scope of this thread]. >> > > > >> > > > >> > > > > >> > > > > > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw >> > > > > <rober...@google.com.invalid> >> > > > > > wrote: >> > > > > > >> > > > > >> +1 to reducing the amount of boilerplate for dealing with side >> > > inputs. >> > > > > >> >> > > > > >> I prefer the "NewDoFn" style of side inputs for consistency. >> The >> > > > > >> primary drawback seems to be lambda's incompatibility with >> > > > > >> annotations. This is solved in Python by letting all the first >> > > > > >> annotated argument of the process method be the main input, and >> > > > > >> subsequent ones be the side input. For example >> > > > > >> >> > > > > >> main_pcoll | beam.Map( >> > > > > >> lambda main_input_elem, side_input_value: main_input_elem + >> > > > > >> side_input_value, >> > > > > >> side_input_pvalue) >> > > > > >> >> > > > > >> For multiple side inputs they are mapped positionally (though >> > Python >> > > > > >> has the advantage that arguments can be passed by keyword as >> well >> > to >> > > > > >> enhance readability when there are many of them, and we allow >> that >> > > for >> > > > > >> side inputs). Note that side_input_pvalue is not referenced >> > anywhere >> > > > > >> else, so we don't even have to store it and pass it around (one >> > > > > >> typically writes pvalue.AsList(some_pcoll) inline here). When >> the >> > > > > >> concrete PCollectionView is used to access the value this means >> > that >> > > > > >> it must be passed separately to both the ParDo and the callback >> > > > > >> (unless we can infer it, which I don't think we can do in all >> > > (many?) >> > > > > >> cases). >> > > > > >> >> > > > > >> There's no reason we couldn't do this, or something very >> similar, >> > in >> > > > > >> Java as well. >> > > > > >> >> > > > > >> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax >> > > <re...@google.com.invalid >> > > > > >> > > > > >> wrote: >> > > > > >> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < >> > > > > >> > kirpic...@google.com.invalid> wrote: >> > > > > >> > >> > > > > >> >> Hi, >> > > > > >> >> >> > > > > >> >> I agree with these concerns to an extent, however I think >> the >> > > > > advantage >> > > > > >> of >> > > > > >> >> transparently letting any user code access side inputs, >> > > especially >> > > > > >> >> including lambdas, is so great that we should find a way to >> > > address >> > > > > >> these >> > > > > >> >> concerns within the constraints of the pattern I'm >> proposing. >> > See >> > > > > more >> > > > > >> >> below. >> > > > > >> >> >> > > > > >> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers >> > > > > >> <bchamb...@google.com.invalid >> > > > > >> >> > >> > > > > >> >> wrote: >> > > > > >> >> >> > > > > >> >> > One possible issue with this is that updating a thread >> local >> > is >> > > > > >> likely to >> > > > > >> >> > be much more expensive than passing an additional >> argument. >> > > > > >> >> >> > > > > >> >> This is an implementation detail that can be fixed - Luke >> made >> > a >> > > > > >> suggestion >> > > > > >> >> on the PR to set up the side input context once per bundle >> > rather >> > > > > than >> > > > > >> once >> > > > > >> >> per element. >> > > > > >> >> >> > > > > >> > >> > > > > >> > However remember that bundles might be small. Dataflow >> streaming >> > > > > runner >> > > > > >> > creates small bundles by design. The Flink runner creates >> > > > > single-element >> > > > > >> > bundles. >> > > > > >> > >> > > > > >> > >> > > > > >> >> >> > > > > >> >> >> > > > > >> >> > Also, not all >> > > > > >> >> > code called from within the DoFn will necessarily be in >> the >> > > same >> > > > > >> thread >> > > > > >> >> > (eg., sometimes we create a pool of threads for doing >> work). >> > > > > >> >> >> > > > > >> >> I think we already require that c.output() can not be done >> from >> > > > > multiple >> > > > > >> >> threads; and I don't think we document c.sideInput() to be >> > > > > thread-safe >> > > > > >> - it >> > > > > >> >> may be reasonable to declare that it isn't and has to be >> > accessed >> > > > > from >> > > > > >> the >> > > > > >> >> same thread as the ProcessElement call. If we want to relax >> > this, >> > > > > then >> > > > > >> >> there might be ways to deal with that too, e.g. provide >> > utilities >> > > > for >> > > > > >> the >> > > > > >> >> user to capture the "user code context" and restoring it >> > inside a >> > > > > >> thread. >> > > > > >> >> This would likely be valuable for other purposes, such as >> > making >> > > > > those >> > > > > >> >> extra threads visible to our profiling utilities. >> > > > > >> >> >> > > > > >> > >> > > > > >> > This seems fair, but we should be be very careful about our >> > > > > >> documentation. >> > > > > >> > And +1 to adding utilities to make multi-threaded work >> easier to >> > > > > manage. >> > > > > >> > >> > > > > >> >> >> > > > > >> >> >> > > > > >> >> > It may be >> > > > > >> >> > *more* confusing for this to sometimes work magically and >> > > > sometimes >> > > > > >> fail >> > > > > >> >> > horribly. Also, requiring the PCollectionView to be >> passed to >> > > > user >> > > > > >> code >> > > > > >> >> > that accesses it is nice because it makes *very clear* >> that >> > the >> > > > > side >> > > > > >> >> input >> > > > > >> >> > needs to be provided from the DoFn to that particular >> > utility. >> > > If >> > > > > it >> > > > > >> is >> > > > > >> >> > accessed via "spooky action at a distance" we lose that >> piece >> > > of >> > > > > >> "free" >> > > > > >> >> > documentation, which may lead to extensive misuse of these >> > > > utility >> > > > > >> >> methods. >> > > > > >> >> > >> > > > > >> >> I'd like to understand this concern better - from this >> > > description >> > > > > it's >> > > > > >> not >> > > > > >> >> clear to me. The pattern I'm proposing is that, when you're >> > > > > authoring a >> > > > > >> >> PTransform that is configured by any user callbacks, then: >> > > > > >> >> - you should provide a builder method .withSideInputs(...) >> > > > > >> >> - you should propagate those side inputs to all your >> internal >> > > > DoFn's >> > > > > >> that >> > > > > >> >> invoke the user code >> > > > > >> >> - in return the user callbacks will be allowed to access >> those >> > > > > >> particular >> > > > > >> >> side inputs >> > > > > >> >> This seems like a simple enough model to me to understand, >> both >> > > > from >> > > > > a >> > > > > >> >> user's perspective and from a transform author's >> perspective. >> > > > Steps 1 >> > > > > >> and 2 >> > > > > >> >> may eventually be automated by annotation analysis or other >> > means >> > > > > (e.g. >> > > > > >> SDK >> > > > > >> >> giving a way to provide given side inputs automatically to >> > > > everything >> > > > > >> >> inside a composite transform rather than to individual >> DoFn's). >> > > > > >> >> >> > > > > >> >> >> > > > > >> >> > >> > > > > >> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov >> > > > > >> >> > <kirpic...@google.com.invalid> wrote: >> > > > > >> >> > >> > > > > >> >> > > Hi, >> > > > > >> >> > > >> > > > > >> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles >> > > > > >> <k...@google.com.invalid >> > > > > >> >> > >> > > > > >> >> > > wrote: >> > > > > >> >> > > >> > > > > >> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < >> > > > > >> >> > > > kirpic...@google.com.invalid> wrote: >> > > > > >> >> > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > > The differences are: >> > > > > >> >> > > > > - The proposal in the doc allows wiring different >> side >> > > > > inputs to >> > > > > >> >> the >> > > > > >> >> > > same >> > > > > >> >> > > > > Supplier, but I'm not convinced that this is >> important >> > - >> > > > you >> > > > > can >> > > > > >> >> just >> > > > > >> >> > > as >> > > > > >> >> > > > > easily call the constructor of your DoFn passing >> > > different >> > > > > >> >> > > > > PCollectionView's for it to capture. >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > > I disagree with this bit about it being "just as >> easy". >> > > > Passing >> > > > > >> the >> > > > > >> >> > > needed >> > > > > >> >> > > > PCollectionViews to your constructor (or even having a >> > > > > >> constructor) >> > > > > >> >> is >> > > > > >> >> > a >> > > > > >> >> > > > pain. Every time I have to do it, it adds a ton of >> > > > boilerplate >> > > > > >> that >> > > > > >> >> > feels >> > > > > >> >> > > > like pure noise. To make a DoFn reusable it must be >> made >> > > > into a >> > > > > >> named >> > > > > >> >> > > class >> > > > > >> >> > > > with a constructor, versus inlined with no >> constructor. >> > > > > >> >> > > >> > > > > >> >> > > Hm, why? You can have the DoFn be an anonymous class >> > > capturing >> > > > > the >> > > > > >> >> > > PCollectionView into a @SideInput field as a closure. >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > > A generous analogy >> > > > > >> >> > > > is is that it is "just" manual closure >> > conversion/currying, >> > > > > >> changing >> > > > > >> >> > > > f(side, main) to f(side)(main). But in practice in >> Beam >> > the >> > > > > second >> > > > > >> >> one >> > > > > >> >> > > has >> > > > > >> >> > > > much more boilerplate. >> > > > > >> >> > > > >> > > > > >> >> > > > Also, Beam is worse. We present the user with >> > higher-order >> > > > > >> functions, >> > > > > >> >> > > which >> > > > > >> >> > > > is where the actual annoyance comes in. When you want >> to >> > > > > pardo(f) >> > > > > >> you >> > > > > >> >> > > have >> > > > > >> >> > > > to write pardo(f(side))(side, main). Your proposal is >> to >> > > > > support >> > > > > >> >> > > > pardo(f(side))(main) and mine is to support >> > pardo(f)(side, >> > > > > main). >> > > > > >> I >> > > > > >> >> > still >> > > > > >> >> > > > propose that we support both (as they get >> implemented). >> > If >> > > > you >> > > > > >> buy in >> > > > > >> >> > to >> > > > > >> >> > > my >> > > > > >> >> > > > analogy, then there's decades of precedent and the >> burden >> > > of >> > > > > proof >> > > > > >> >> > falls >> > > > > >> >> > > > heavily on whoever doesn't want to support both. >> > > > > >> >> > > > >> > > > > >> >> > > I see your point. I think the proposal is compatible >> with >> > > what >> > > > > >> you're >> > > > > >> >> > > suggesting too - in DoFn we could have @SideInput >> > > *parameters* >> > > > of >> > > > > >> type >> > > > > >> >> > > PCollectionView, with the same semantics as a field. >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > > >> > > > > >> >> > > > - My proposal allows getting rid of .withSideInputs() >> > > > entirely, >> > > > > >> >> because >> > > > > >> >> > > the >> > > > > >> >> > > > > DoFn captures the PCollectionView so you don't need >> to >> > > > > specify >> > > > > >> it >> > > > > >> >> > > > > explicitly for wiring. >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > > I've decided to change to full +1 (whatever that means >> > > > > compared to >> > > > > >> >> 0.75 >> > > > > >> >> > > :-) >> > > > > >> >> > > > to adding support for @SideInput fields, because the >> > > benefits >> > > > > >> >> outweigh >> > > > > >> >> > > this >> > > > > >> >> > > > failure mode: >> > > > > >> >> > > > >> > > > > >> >> > > > new DoFn { >> > > > > >> >> > > > // forgot the annotation >> > > > > >> >> > > > private final PCollectionView whatever; >> > > > > >> >> > > > >> > > > > >> >> > > > @ProcessElement public void process(...) { >> > > > > >> >> > > > whatever.get(); // crash during execution >> > > > > >> >> > > > } >> > > > > >> >> > > > } >> > > > > >> >> > > > >> > > > > >> >> > > > But ideas to mitigate that would be cool. >> > > > > >> >> > > >> > > > > >> >> > > Hm, can't think of anything less hacky than "prohibit >> > having >> > > > > fields >> > > > > >> of >> > > > > >> >> > type >> > > > > >> >> > > PCollectionView that are not public, final, and >> annotated >> > > with >> > > > > >> >> > @SideInput" >> > > > > >> >> > > - not sure we'd want to go down this road. I suppose a >> good >> > > > error >> > > > > >> >> message >> > > > > >> >> > > in .get() would be sufficient, saying "Did you forget to >> > > > specify >> > > > > a >> > > > > >> >> > > requirement for this side input via .withSideInputs() >> or by >> > > > > >> annotating >> > > > > >> >> > the >> > > > > >> >> > > field as @SideInput" or something like that. >> > > > > >> >> > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > > >> > > > > >> >> > > > Kenn >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik >> > > > > >> >> <lc...@google.com.invalid >> > > > > >> >> > > >> > > > > >> >> > > > > wrote: >> > > > > >> >> > > > > >> > > > > >> >> > > > > > My concern with the proposal is not the specifics >> of >> > > how >> > > > it >> > > > > >> will >> > > > > >> >> > work >> > > > > >> >> > > > and >> > > > > >> >> > > > > > more about it being yet another way on how our >> API is >> > > to >> > > > be >> > > > > >> used >> > > > > >> >> > even >> > > > > >> >> > > > > > though we have a proposal [1] of an API style we >> were >> > > > > working >> > > > > >> >> > towards >> > > > > >> >> > > > in >> > > > > >> >> > > > > > Java and Python. I would rather re-open that >> > discussion >> > > > now >> > > > > >> about >> > > > > >> >> > > what >> > > > > >> >> > > > we >> > > > > >> >> > > > > > want that API to look like for our major features >> and >> > > > work >> > > > > >> >> towards >> > > > > >> >> > > > > > consistency (or not if there is a strong argument >> as >> > to >> > > > why >> > > > > >> some >> > > > > >> >> > > > feature >> > > > > >> >> > > > > > should have a different style). >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > 1: https://s.apache.org/a-new-dofn >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles >> > > > > >> >> > > > <k...@google.com.invalid >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > wrote: >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > > +0.75 because I'd like to bring up invalid >> > pipelines. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > I had proposed side inputs as parameters to >> DoFn in >> > > > > >> >> > > > > > > https://s.apache.org/a-new-dofn (specifically >> at >> > > [1]) >> > > > so >> > > > > >> the >> > > > > >> >> > only >> > > > > >> >> > > > > place >> > > > > >> >> > > > > > > they are specified is in the graph construction, >> > > making >> > > > > the >> > > > > >> >> DoFn >> > > > > >> >> > > more >> > > > > >> >> > > > > > > reusable and errors impossible. I've actually >> been >> > > > > noodling >> > > > > >> my >> > > > > >> >> > way >> > > > > >> >> > > > > > towards >> > > > > >> >> > > > > > > this in a branch :-) >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > Eugene's proposal is a sort of converse, where >> the >> > > side >> > > > > >> inputs >> > > > > >> >> > are >> > > > > >> >> > > > > values >> > > > > >> >> > > > > > > captured in the closure and not parameters, yet >> the >> > > > only >> > > > > >> place >> > > > > >> >> > they >> > > > > >> >> > > > are >> > > > > >> >> > > > > > > specified is in the DoFn. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > I see no conflict between these two. It is very >> > > natural >> > > > > to >> > > > > >> have >> > > > > >> >> > > both >> > > > > >> >> > > > > the >> > > > > >> >> > > > > > > capability to accept parameters and the ability >> to >> > > > > capture >> > > > > >> >> > > variables >> > > > > >> >> > > > in >> > > > > >> >> > > > > > the >> > > > > >> >> > > > > > > closure. Supporting both is totally standard in >> > > > > up-to-date >> > > > > >> >> > > > programming >> > > > > >> >> > > > > > > languages. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > Today we have the worse of both worlds: >> > > PCollectionView >> > > > > >> behaves >> > > > > >> >> > as >> > > > > >> >> > > > > > > something captured in the closure/constructor, >> but >> > > must >> > > > > >> still >> > > > > >> >> be >> > > > > >> >> > > > > > explicitly >> > > > > >> >> > > > > > > wired up. >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > But if I use PCollectionView.get() and have not >> > wired >> > > > it >> > > > > up >> > > > > >> in >> > > > > >> >> > any >> > > > > >> >> > > > way, >> > > > > >> >> > > > > > > what happens? Just like today, you can try to >> > > > > >> .sideInput(...) a >> > > > > >> >> > > thing >> > > > > >> >> > > > > > that >> > > > > >> >> > > > > > > is not available. With side inputs as >> parameters, >> > > this >> > > > is >> > > > > >> not >> > > > > >> >> > > > possible. >> > > > > >> >> > > > > > If >> > > > > >> >> > > > > > > you want to treat them as captured in a closure, >> > > while >> > > > > >> avoiding >> > > > > >> >> > > > errors, >> > > > > >> >> > > > > > it >> > > > > >> >> > > > > > > seems like you might need to do some low-level >> > magic, >> > > > > like >> > > > > >> the >> > > > > >> >> > > > > > > serialization-based detection that Luke has >> > suggested >> > > > > before >> > > > > >> >> > (there >> > > > > >> >> > > > are >> > > > > >> >> > > > > > > known downsides that we haven't explored, like >> > > > > overcapture). >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > Kenn >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > [1] >> > > > > >> >> > > > > > > >> > > > > >> >> > https://docs.google.com/document/d/ >> > > > 1ClmQ6LqdnfseRzeSw3SL68DAO1f8j >> > > > > >> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene >> Kirpichov < >> > > > > >> >> > > > > > > kirpic...@google.com.invalid> wrote: >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > > Hm, I guess you're right - for outputs it >> could >> > be >> > > > > indeed >> > > > > >> >> quite >> > > > > >> >> > > > > > valuable >> > > > > >> >> > > > > > > to >> > > > > >> >> > > > > > > > output to them without plumbing (e.g. >> outputting >> > > > > errors). >> > > > > >> >> Could >> > > > > >> >> > > be >> > > > > >> >> > > > > done >> > > > > >> >> > > > > > > > perhaps via TupleTag.output()? (assuming the >> same >> > > > > TupleTag >> > > > > >> >> can >> > > > > >> >> > > not >> > > > > >> >> > > > be >> > > > > >> >> > > > > > > > reused to tag multiple PCollection's) >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > For now I sent a PR for side input support >> > > > > >> >> > > > > > > > https://github.com/apache/beam/pull/3814 . >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik >> > > > > >> >> > > > <lc...@google.com.invalid >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > > > wrote: >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > > I disagree, state may not care where it is >> used >> > > as >> > > > > well >> > > > > >> >> > since a >> > > > > >> >> > > > > > person >> > > > > >> >> > > > > > > > may >> > > > > >> >> > > > > > > > > call a function which needs to >> store/retrieve >> > > state >> > > > > and >> > > > > >> >> > instead >> > > > > >> >> > > > of >> > > > > >> >> > > > > > > having >> > > > > >> >> > > > > > > > > the DoFn declare the StateSpec and then >> pass in >> > > the >> > > > > >> state >> > > > > >> >> > > > > > > implementation >> > > > > >> >> > > > > > > > > down into the function everywhere. Similarly >> > for >> > > > > >> outputs, >> > > > > >> >> the >> > > > > >> >> > > > > > internal >> > > > > >> >> > > > > > > > > functions could take the TupleTag and >> request >> > an >> > > > > output >> > > > > >> >> > manager >> > > > > >> >> > > > or >> > > > > >> >> > > > > > take >> > > > > >> >> > > > > > > > an >> > > > > >> >> > > > > > > > > "output" reference which give functions the >> > > ability >> > > > > to >> > > > > >> >> > produce >> > > > > >> >> > > > > output >> > > > > >> >> > > > > > > > > directly without needing to pass everything >> > that >> > > is >> > > > > >> needed >> > > > > >> >> to >> > > > > >> >> > > be >> > > > > >> >> > > > > > output >> > > > > >> >> > > > > > > > > back to the caller. >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene >> > Kirpichov >> > > < >> > > > > >> >> > > > > > > > > kirpic...@google.com.invalid> wrote: >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > > Hm, I think of these things (state, side >> > > outputs >> > > > > >> etc.), >> > > > > >> >> > only >> > > > > >> >> > > > side >> > > > > >> >> > > > > > > > inputs >> > > > > >> >> > > > > > > > > > make sense to access in arbitrary user >> > > callbacks >> > > > > >> without >> > > > > >> >> > > > explicit >> > > > > >> >> > > > > > > > > knowledge >> > > > > >> >> > > > > > > > > > of the surrounding transform - so only >> side >> > > > inputs >> > > > > >> can be >> > > > > >> >> > > > > implicit >> > > > > >> >> > > > > > > like >> > > > > >> >> > > > > > > > > > this. >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > Ultimately we'll probably end up removing >> > > > > >> ProcessContext, >> > > > > >> >> > and >> > > > > >> >> > > > > > keeping >> > > > > >> >> > > > > > > > > only >> > > > > >> >> > > > > > > > > > annotations (on fields / methods / >> > parameters). >> > > > In >> > > > > >> that >> > > > > >> >> > > world, >> > > > > >> >> > > > a >> > > > > >> >> > > > > > > field >> > > > > >> >> > > > > > > > > > annotation could be used (like per my >> > previous >> > > > > email) >> > > > > >> to >> > > > > >> >> > > > > statically >> > > > > >> >> > > > > > > > > specify >> > > > > >> >> > > > > > > > > > which side inputs will be needed - while >> the >> > > > value >> > > > > >> could >> > > > > >> >> > > still >> > > > > >> >> > > > be >> > > > > >> >> > > > > > > > > accessed >> > > > > >> >> > > > > > > > > > via .get(), just like state cells are >> > accessed >> > > > via >> > > > > >> >> .read() >> > > > > >> >> > > and >> > > > > >> >> > > > > > > > .write(): >> > > > > >> >> > > > > > > > > > i.e., #get() is not a new method of >> access. >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > Overall, it seems like I should proceed >> with >> > > the >> > > > > >> idea. I >> > > > > >> >> > > filed >> > > > > >> >> > > > > > > > > > >> > > https://issues.apache.org/jira/browse/BEAM-2844. >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik >> > > > > >> >> > > > > > <lc...@google.com.invalid >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > > > wrote: >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > > For API consistency reasons, it would be >> > good >> > > > if >> > > > > we >> > > > > >> did >> > > > > >> >> > > this >> > > > > >> >> > > > > > > > > holistically >> > > > > >> >> > > > > > > > > > > and expanded this approach to state, >> side >> > > > > outputs, >> > > > > >> ... >> > > > > >> >> so >> > > > > >> >> > > > that >> > > > > >> >> > > > > a >> > > > > >> >> > > > > > > > person >> > > > > >> >> > > > > > > > > > can >> > > > > >> >> > > > > > > > > > > always call Something.get() to return >> > > something >> > > > > that >> > > > > >> >> they >> > > > > >> >> > > can >> > > > > >> >> > > > > > > access >> > > > > >> >> > > > > > > > > > > implementation wise. It will be >> confusing >> > for >> > > > our >> > > > > >> users >> > > > > >> >> > to >> > > > > >> >> > > > have >> > > > > >> >> > > > > > > many >> > > > > >> >> > > > > > > > > > > variations in our style of how all these >> > > > concepts >> > > > > >> are >> > > > > >> >> > used >> > > > > >> >> > > > > > > > > > (ProcessContext >> > > > > >> >> > > > > > > > > > > / Annotations / #get()) >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene >> > > > Kirpichov >> > > > > < >> > > > > >> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote: >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > Also, I think my approach is >> compatible >> > > with >> > > > > >> >> > annotations >> > > > > >> >> > > > and >> > > > > >> >> > > > > > > future >> > > > > >> >> > > > > > > > > > > removal >> > > > > >> >> > > > > > > > > > > > of .withSideInputs if we annotate a >> > field: >> > > > > >> >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...; >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > class MyDoFn { >> > > > > >> >> > > > > > > > > > > > @SideInput >> > > > > >> >> > > > > > > > > > > > PCollectionView<Foo> foo = foo; >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > ...foo.get()... >> > > > > >> >> > > > > > > > > > > > } >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > We can extract the accessed views from >> > the >> > > > DoFn >> > > > > >> >> > instance >> > > > > >> >> > > > > using >> > > > > >> >> > > > > > > > > > > reflection. >> > > > > >> >> > > > > > > > > > > > Still not compatible with lambdas, but >> > > > > compatible >> > > > > >> >> > > > > automatically >> > > > > >> >> > > > > > > > with >> > > > > >> >> > > > > > > > > > all >> > > > > >> >> > > > > > > > > > > > anonymous classes. >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene >> > > > Kirpichov < >> > > > > >> >> > > > > > > > kirpic...@google.com> >> > > > > >> >> > > > > > > > > > > > wrote: >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Hi Luke, >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > I know this (annotations) is the >> > pattern >> > > we >> > > > > were >> > > > > >> >> > > > > considering >> > > > > >> >> > > > > > > for >> > > > > >> >> > > > > > > > > side >> > > > > >> >> > > > > > > > > > > > > inputs, but I no longer think it is >> the >> > > > best >> > > > > >> way to >> > > > > >> >> > > > access >> > > > > >> >> > > > > > > them. >> > > > > >> >> > > > > > > > > > > > > Annotations help getting rid of the >> > > > > >> >> .withSideInputs() >> > > > > >> >> > > > call, >> > > > > >> >> > > > > > but >> > > > > >> >> > > > > > > > > this >> > > > > >> >> > > > > > > > > > is >> > > > > >> >> > > > > > > > > > > > > where their advantage ends. >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > The advantages of the proposed >> approach >> > > are >> > > > > >> that it >> > > > > >> >> > > > > > > automatically >> > > > > >> >> > > > > > > > > > works >> > > > > >> >> > > > > > > > > > > > > with all existing callback or lambda >> > > code. >> > > > No >> > > > > >> need >> > > > > >> >> to >> > > > > >> >> > > > > further >> > > > > >> >> > > > > > > > > develop >> > > > > >> >> > > > > > > > > > > the >> > > > > >> >> > > > > > > > > > > > > reflection machinery to support side >> > > input >> > > > > >> >> > annotations >> > > > > >> >> > > - >> > > > > >> >> > > > > and >> > > > > >> >> > > > > > > > > > especially >> > > > > >> >> > > > > > > > > > > > to >> > > > > >> >> > > > > > > > > > > > > support arbitrary user interfaces, >> no >> > > need >> > > > to >> > > > > >> >> change >> > > > > >> >> > > > > existing >> > > > > >> >> > > > > > > > > > > transforms, >> > > > > >> >> > > > > > > > > > > > > no need for transform authors to >> even >> > > know >> > > > > that >> > > > > >> the >> > > > > >> >> > > > > machinery >> > > > > >> >> > > > > > > > > exists >> > > > > >> >> > > > > > > > > > to >> > > > > >> >> > > > > > > > > > > > > make side inputs usable in their >> > > transforms >> > > > > >> (and no >> > > > > >> >> > > need >> > > > > >> >> > > > > for >> > > > > >> >> > > > > > > > > authors >> > > > > >> >> > > > > > > > > > to >> > > > > >> >> > > > > > > > > > > > > think about whether or not they >> should >> > > > > support >> > > > > >> side >> > > > > >> >> > > > > inputs). >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Moreover, like Reuven says, >> annotations >> > > > don't >> > > > > >> work >> > > > > >> >> > with >> > > > > >> >> > > > > > lambdas >> > > > > >> >> > > > > > > > at >> > > > > >> >> > > > > > > > > > all: >> > > > > >> >> > > > > > > > > > > > > creating a lambda with a flexible >> set >> > of >> > > > > >> annotation >> > > > > >> >> > > > > arguments >> > > > > >> >> > > > > > > > > appears >> > > > > >> >> > > > > > > > > > > to >> > > > > >> >> > > > > > > > > > > > be >> > > > > >> >> > > > > > > > > > > > > currently impossible, and even >> > capturing >> > > > the >> > > > > >> >> > > annotations >> > > > > >> >> > > > on >> > > > > >> >> > > > > > > > > arguments >> > > > > >> >> > > > > > > > > > > of >> > > > > >> >> > > > > > > > > > > > a >> > > > > >> >> > > > > > > > > > > > > lambda is I believe also impossible >> > > because >> > > > > the >> > > > > >> >> Java >> > > > > >> >> > > > > compiler >> > > > > >> >> > > > > > > > drops >> > > > > >> >> > > > > > > > > > > them >> > > > > >> >> > > > > > > > > > > > in >> > > > > >> >> > > > > > > > > > > > > the generated class or method >> handle. >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM >> Lukasz >> > > Cwik >> > > > > >> >> > > > > > > > > <lc...@google.com.invalid >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > wrote: >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> I believe we should follow the >> pattern >> > > > that >> > > > > >> state >> > > > > >> >> > uses >> > > > > >> >> > > > and >> > > > > >> >> > > > > > > add a >> > > > > >> >> > > > > > > > > > type >> > > > > >> >> > > > > > > > > > > > >> annotation to link the side input >> > > > > definition to >> > > > > >> >> its >> > > > > >> >> > > > usage >> > > > > >> >> > > > > > > > > directly. >> > > > > >> >> > > > > > > > > > > This >> > > > > >> >> > > > > > > > > > > > >> would allow us to know that the >> side >> > > input >> > > > > was >> > > > > >> >> > > > definitely >> > > > > >> >> > > > > > > being >> > > > > >> >> > > > > > > > > > > accessed >> > > > > >> >> > > > > > > > > > > > >> and perform validation during graph >> > > > > >> construction >> > > > > >> >> for >> > > > > >> >> > > any >> > > > > >> >> > > > > > used >> > > > > >> >> > > > > > > > but >> > > > > >> >> > > > > > > > > > > > >> unspecified side inputs. >> > > > > >> >> > > > > > > > > > > > >> >> > > > > >> >> > > > > > > > > > > > >> Code snippet: >> > > > > >> >> > > > > > > > > > > > >> final PCollectionView<String> foo = >> > > > > >> >> > > > > > pipeline.apply("fooName", >> > > > > >> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.< >> > > > > >> >> String>asSingleton()); >> > > > > >> >> > > > > > > > > > > > >> PCollection<String> output = >> pipeline >> > > > > >> >> > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) >> > > > > >> >> > > > > > > > > > > > >> .apply(MapElements.via( >> > > > > >> >> > > > > > > > > > > > >> new SimpleFunction<Integer, >> > > > > String>() { >> > > > > >> >> > > > > > > > > > > > >> @Override >> > > > > >> >> > > > > > > > > > > > >> public String >> apply(Integer >> > > > input, >> > > > > >> >> > > > > > > > @SideInput("fooName") >> > > > > >> >> > > > > > > > > > > > String >> > > > > >> >> > > > > > > > > > > > >> fooValue) { >> > > > > >> >> > > > > > > > > > > > >> return fooValue + " " + >> > > input; >> > > > > >> >> > > > > > > > > > > > >> } >> > > > > >> >> > > > > > > > > > > > >> }).withSideInputs(foo));* >> > > > > >> >> > > > > > > > > > > > >> >> > > > > >> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, >> Eugene >> > > > > >> Kirpichov < >> > > > > >> >> > > > > > > > > > > > >> kirpic...@google.com.invalid> >> wrote: >> > > > > >> >> > > > > > > > > > > > >> >> > > > > >> >> > > > > > > > > > > > >> > Sure, here's how a modified >> > (passing) >> > > > > >> >> MapElements >> > > > > >> >> > > unit >> > > > > >> >> > > > > > test >> > > > > >> >> > > > > > > > > looks >> > > > > >> >> > > > > > > > > > > > like, >> > > > > >> >> > > > > > > > > > > > >> > with usage of side inputs: >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> > @Test >> > > > > >> >> > > > > > > > > > > > >> > @Category(NeedsRunner.class) >> > > > > >> >> > > > > > > > > > > > >> > public void >> > > > testMapBasicWithSideInput() >> > > > > >> throws >> > > > > >> >> > > > > > Exception { >> > > > > >> >> > > > > > > > > > > > >> > * final >> PCollectionView<String> >> > foo >> > > > =* >> > > > > >> >> > > > > > > > > > > > >> > * pipeline.apply("foo", >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > Create.of("foo")).apply(View.<String>asSingleton());* >> > > > > >> >> > > > > > > > > > > > >> > PCollection<String> output = >> > > > pipeline >> > > > > >> >> > > > > > > > > > > > >> > .apply(Create.of(1, 2, >> 3)) >> > > > > >> >> > > > > > > > > > > > >> > .apply(MapElements.via( >> > > > > >> >> > > > > > > > > > > > >> > new >> > > SimpleFunction<Integer, >> > > > > >> >> String>() >> > > > > >> >> > { >> > > > > >> >> > > > > > > > > > > > >> > @Override >> > > > > >> >> > > > > > > > > > > > >> > public String >> > > > apply(Integer >> > > > > >> >> input) { >> > > > > >> >> > > > > > > > > > > > >> > return* foo.get() >> > *+ " >> > > > " + >> > > > > >> >> input; >> > > > > >> >> > > > > > > > > > > > >> > } >> > > > > >> >> > > > > > > > > > > > >> > }) >> > > > > >> >> > > > > > > > > > > > >> > *.withSideInputs(foo));* >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> > PAssert.that(output). >> > > > > >> >> containsInAnyOrder("foo >> > > > > >> >> > 1", >> > > > > >> >> > > > > "foo >> > > > > >> >> > > > > > > 2", >> > > > > >> >> > > > > > > > > > "foo >> > > > > >> >> > > > > > > > > > > > 3"); >> > > > > >> >> > > > > > > > > > > > >> > pipeline.run(); >> > > > > >> >> > > > > > > > > > > > >> > } >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM >> > Reuven >> > > > Lax >> > > > > >> >> > > > > > > > > > <re...@google.com.invalid >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > wrote: >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> > > Can you provide a code snippet >> > > showing >> > > > > how >> > > > > >> >> this >> > > > > >> >> > > > would >> > > > > >> >> > > > > > > look? >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, >> > > Eugene >> > > > > >> >> > Kirpichov < >> > > > > >> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid> >> > > wrote: >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > > > TL;DR Introduce method >> > > > > >> >> PCollectionView.get(), >> > > > > >> >> > > > > > > implemented >> > > > > >> >> > > > > > > > > as: >> > > > > >> >> > > > > > > > > > > get >> > > > > >> >> > > > > > > > > > > > >> > > > thread-local ProcessContext >> and >> > do >> > > > > >> >> > > > > c.sideInput(this). >> > > > > >> >> > > > > > > As a >> > > > > >> >> > > > > > > > > > > result, >> > > > > >> >> > > > > > > > > > > > >> any >> > > > > >> >> > > > > > > > > > > > >> > > user >> > > > > >> >> > > > > > > > > > > > >> > > > lambdas such as MapElements >> can >> > > use >> > > > > side >> > > > > >> >> > inputs. >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Quite a few transforms have >> > > > user-code >> > > > > >> >> > callbacks >> > > > > >> >> > > or >> > > > > >> >> > > > > > > > lambdas: >> > > > > >> >> > > > > > > > > > > ParDo >> > > > > >> >> > > > > > > > > > > > >> > (DoFn), >> > > > > >> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the >> > > > > >> DynamicDestinations >> > > > > >> >> > > > classes >> > > > > >> >> > > > > > in >> > > > > >> >> > > > > > > > > > various >> > > > > >> >> > > > > > > > > > > > IOs, >> > > > > >> >> > > > > > > > > > > > >> > > > combine fns, the PollFn >> callback >> > > in >> > > > > >> Watch, >> > > > > >> >> > etc. >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Of these, only DoFn and >> > CombineFn >> > > > have >> > > > > >> >> > built-in >> > > > > >> >> > > > > > support >> > > > > >> >> > > > > > > > for >> > > > > >> >> > > > > > > > > > side >> > > > > >> >> > > > > > > > > > > > >> > inputs; >> > > > > >> >> > > > > > > > > > > > >> > > > for DynamicDestinations it is >> > > > plumbed >> > > > > >> >> > > explicitly; >> > > > > >> >> > > > > > others >> > > > > >> >> > > > > > > > > don't >> > > > > >> >> > > > > > > > > > > > have >> > > > > >> >> > > > > > > > > > > > >> > > access >> > > > > >> >> > > > > > > > > > > > >> > > > (e.g. you can't access side >> > inputs >> > > > > from >> > > > > >> >> > > > > > > > Map/FlatMapElements >> > > > > >> >> > > > > > > > > > > > because >> > > > > >> >> > > > > > > > > > > > >> > they >> > > > > >> >> > > > > > > > > > > > >> > > > don't have a ProcessContext >> or >> > any >> > > > > >> context >> > > > > >> >> for >> > > > > >> >> > > > that >> > > > > >> >> > > > > > > > matter). >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > I think it's important to >> solve >> > > > this, >> > > > > >> >> > especially >> > > > > >> >> > > > as >> > > > > >> >> > > > > > > Java 8 >> > > > > >> >> > > > > > > > > > > becomes >> > > > > >> >> > > > > > > > > > > > >> > > people's >> > > > > >> >> > > > > > > > > > > > >> > > > default choice: users will >> want >> > to >> > > > use >> > > > > >> side >> > > > > >> >> > > inputs >> > > > > >> >> > > > > in >> > > > > >> >> > > > > > > > > > > > >> > > Map/FlatMapElements. >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > It also appears to be quite >> easy >> > > to >> > > > > >> >> implement: >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Runner part: >> > > > > >> >> > > > > > > > > > > > >> > > > - introduce a >> SideInputAccessor >> > > > > interface >> > > > > >> >> > > > > > > > > > > > >> > > > - make .get() on a >> > PCollectionView >> > > > > get it >> > > > > >> >> > from a >> > > > > >> >> > > > > > > > > thread-local >> > > > > >> >> > > > > > > > > > > > >> > > > SideInputAccessor >> > > > > >> >> > > > > > > > > > > > >> > > > - make runners set the >> > > thread-local >> > > > > >> >> > > > > SideInputAccessor >> > > > > >> >> > > > > > > any >> > > > > >> >> > > > > > > > > time >> > > > > >> >> > > > > > > > > > > the >> > > > > >> >> > > > > > > > > > > > >> > runner >> > > > > >> >> > > > > > > > > > > > >> > > > is evaluating something in a >> > > context >> > > > > >> where >> > > > > >> >> > side >> > > > > >> >> > > > > inputs >> > > > > >> >> > > > > > > are >> > > > > >> >> > > > > > > > > > > > >> available, >> > > > > >> >> > > > > > > > > > > > >> > > e.g. >> > > > > >> >> > > > > > > > > > > > >> > > > a ProcessElement method, or >> > > > applying a >> > > > > >> >> > > CombineFn - >> > > > > >> >> > > > > the >> > > > > >> >> > > > > > > set >> > > > > >> >> > > > > > > > > of >> > > > > >> >> > > > > > > > > > > such >> > > > > >> >> > > > > > > > > > > > >> > places >> > > > > >> >> > > > > > > > > > > > >> > > > will be quite small. I >> believe >> > > only >> > > > > >> runners >> > > > > >> >> > (but >> > > > > >> >> > > > not >> > > > > >> >> > > > > > > > > > transforms) >> > > > > >> >> > > > > > > > > > > > >> will >> > > > > >> >> > > > > > > > > > > > >> > > ever >> > > > > >> >> > > > > > > > > > > > >> > > > need to set this thread-local >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Transform part: >> > > > > >> >> > > > > > > > > > > > >> > > > - Transforms that take >> user-code >> > > > > lambdas >> > > > > >> and >> > > > > >> >> > > want >> > > > > >> >> > > > to >> > > > > >> >> > > > > > let >> > > > > >> >> > > > > > > > > them >> > > > > >> >> > > > > > > > > > > > access >> > > > > >> >> > > > > > > > > > > > >> > side >> > > > > >> >> > > > > > > > > > > > >> > > > inputs still will need to be >> > > > > configurable >> > > > > >> >> > with a >> > > > > >> >> > > > > > method >> > > > > >> >> > > > > > > > like >> > > > > >> >> > > > > > > > > > > > >> > > > .withSideInputs(view1, >> view2...) >> > > and >> > > > > will >> > > > > >> >> need >> > > > > >> >> > > to >> > > > > >> >> > > > > > plumb >> > > > > >> >> > > > > > > > > those >> > > > > >> >> > > > > > > > > > > down >> > > > > >> >> > > > > > > > > > > > >> to >> > > > > >> >> > > > > > > > > > > > >> > the >> > > > > >> >> > > > > > > > > > > > >> > > > primitive DoFn's or >> CombineFn's >> > - >> > > > > >> >> information >> > > > > >> >> > on >> > > > > >> >> > > > > > *which* >> > > > > >> >> > > > > > > > > side >> > > > > >> >> > > > > > > > > > > > inputs >> > > > > >> >> > > > > > > > > > > > >> > will >> > > > > >> >> > > > > > > > > > > > >> > > > be accessed, of course, still >> > > needs >> > > > to >> > > > > >> be in >> > > > > >> >> > the >> > > > > >> >> > > > > > graph. >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > I quickly prototyped this in >> > > direct >> > > > > >> runner >> > > > > >> >> and >> > > > > >> >> > > > > > > > MapElements, >> > > > > >> >> > > > > > > > > > and >> > > > > >> >> > > > > > > > > > > it >> > > > > >> >> > > > > > > > > > > > >> > worked >> > > > > >> >> > > > > > > > > > > > >> > > > with no issues - I'm >> wondering >> > if >> > > > > there's >> > > > > >> >> > > > something >> > > > > >> >> > > > > > > subtle >> > > > > >> >> > > > > > > > > > that >> > > > > >> >> > > > > > > > > > > > I'm >> > > > > >> >> > > > > > > > > > > > >> > > > missing, or is it actually a >> > good >> > > > > idea? >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> >> >> > > > > >> >> > > > > >> > > > >> > > >> > >> >