Sorry for the delayed reply. This may be a non-issue, but my overarching comment was to address how (if at all) this relates to the portable model of a pipeline.
One easy way to avoid violating this is to wait until https://github.com/apache/beam/pull/3938 is completed. This includes a portability round-trip before running things on the Java DirectRunner, which ensures that we do not develop features without a portability story. On Wed, Oct 4, 2017 at 7:45 PM, Eugene Kirpichov < kirpic...@google.com.invalid> wrote: > A bunch of people have commented on the doc, without it seems any major > disagreement. The PR is out for review. > > On Fri, Sep 29, 2017 at 1:53 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > > > Hi all, > > > > Please take a look at some notes from a discussion we had about this with > > a few folks, and an updated proposal and a couple of demo PRs > implementing > > the proposal. > > http://s.apache.org/context-fn > > > > I hope this proposal is more agreeable. > > > > On Wed, Sep 13, 2017 at 3:46 PM Kenneth Knowles <k...@google.com.invalid> > > wrote: > > > >> ValueProvider is global, PCollectionView is per-window, state is > >> per-step/key/window, etc. > >> > >> So my unhappiness increases as we move through that list, adding more > and > >> more constraints on correct use, none of which are reflected in the API. > >> Your description of "its context is an execution of the pipeline" is > >> accurate for ValueProvider. The question is not merely "which DoFn will > >> need which side inputs" but in which methods the side input is accessed > >> (forbidden in every DoFn method other than @ProcessElement and > @OnTimer). > >> > >> As for lambdas being more universal - I agree! But the capabilities of > >> ParDo are not. I don't think we should transparently make them available > >> anywhere you have a lambda. For example, multiply triggered side inputs > >> fundamentally alter the semantics of MapElements and Filter to vary over > >> time. The only reason this isn't a showstopper is that multiply > triggered > >> side inputs have very loose consistency already, and you can write > >> nondeterministic predicates and map functions anyhow. If either of those > >> were better, we'd want to keep them that way. > >> > >> Since NewDoFn is somewhat tied to the alternative proposal, and there's > >> the > >> point that since lambdas are cross-language we might reconsider > >> ProcessContext (aka "pile of mud") style. But this universality - being > >> the > >> lowest common denominator across languages - is not a goal. Python > already > >> is quite different from Java, using | and >> and kwarg side inputs to > good > >> effect. And those two languages are quite similar. Go will look entirely > >> different. For Java, annotation-driven APIs are common and offer > important > >> advantages for readability, validation, and forward/backward > >> compatibility. > >> And incidentally NewDoFn subsumes ProcessContext. > >> > >> On Wed, Sep 13, 2017 at 2:32 PM, Eugene Kirpichov < > >> kirpic...@google.com.invalid> wrote: > >> > >> > Thanks! > >> > > >> > I think most of the issues you point out [validation, scheduling, > >> > prefetching] are in the area of wiring. I reiterate that they can be > >> solved > >> > - both of the methods below will give the runner an answer to the > >> low-level > >> > question "which DoFn will need which side inputs": > >> > > >> > 1) Providing withSideInputs() builder methods on transforms that are > >> > parameterized by user code. If only some side inputs should be made > >> > available to particular bits of user code, provide more detailed > >> > withBlahSideInputs() methods - this is up to the transform. > >> > > >> > 2) Inferring this from something annotation-driven as indicated in the > >> > thread, e.g. capturing the PCollectionView in @SideInput-annotated > >> public > >> > fields. This can't be done on a lambda, because lambdas don't have > >> fields > >> > [so I think method #1 will keep being necessary], but it can be done > on > >> an > >> > anonymous class. > >> > > >> > As for direct access being misleading: I'm not sure I agree. I think > the > >> > intuition for PCollectionView.get() is no more wrong than the > intuition > >> for > >> > ValueProvider.get(): the return value is, logically, context-free > [more > >> > like: its context is an execution of the pipeline], so I have no issue > >> with > >> > it being accessed implicitly. > >> > > >> > On Wed, Sep 13, 2017 at 2:05 PM Kenneth Knowles > <k...@google.com.invalid > >> > > >> > wrote: > >> > > >> > > I made some comments on > >> https://issues.apache.org/jira/browse/BEAM-2950 > >> > > which was filed to do a similar thing for State. Luke correctly > >> pointed > >> > out > >> > > that many of the points apply here as well. I said most of the same > >> > above, > >> > > but I thought I'd pull them out again from that ticket and rephrase > to > >> > > apply to side inputs: > >> > > > >> > > - Direct access at first appears "more intuitive" because to a > >> newcomer > >> > it > >> > > "looks like" normal [captured variable] access. But in fact it is > >> nothing > >> > > like normal [captured variable] access so this intuition is > misleading > >> > and > >> > > should not be encouraged. So it is actually less readable because > your > >> > > intuitive reading is wrong. > >> > > > >> > > - This design would miss the validation aspect. One way it is > >> different > >> > > than normal [functional] programming is that there are many places > it > >> is > >> > > illegal to reference [side inputs], such as > StartBundle/FinishBundle, > >> or > >> > > passing to another object. This proposal would turn those into > dynamic > >> > > failures at best, or in the worst case data corruption (runner fails > >> to > >> > > catch illegal access, and permits some thread-global context to > leak) > >> > > > >> > > - It is actually mandatory that we are always able to detect [side > >> > inputs, > >> > > or the user has to manually wire them], as it [must be scheduled > >> > > differently] > >> > > > >> > > - A runner can't automatically prefetch, because it doesn't know > >> which > >> > > [side input] is used by which methods. > >> > > > >> > > - Magic by mutating stuff into place is just less readable / more > >> error > >> > > prone. > >> > > > >> > > State has even more compelling issues and none of the benefits so my > >> > +0.75 > >> > > for side inputs (now I am feeling more like +0.25) is a -1 for > state. > >> We > >> > > should definitely not block one feature on all vaguely similar > >> features. > >> > > > >> > > Kenn > >> > > > >> > > > >> > > > >> > > On Wed, Sep 13, 2017 at 1:56 PM, Eugene Kirpichov < > >> > > kirpic...@google.com.invalid> wrote: > >> > > > >> > > > On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw > >> > > > <rober...@google.com.invalid> > >> > > > wrote: > >> > > > > >> > > > > On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov > >> > > > > <kirpic...@google.com.invalid> wrote: > >> > > > > > Hi Robert, > >> > > > > > > >> > > > > > Given the anticipated usage of this proposal in Java, I'm not > >> sure > >> > > the > >> > > > > > Python approach you quoted is the right one. > >> > > > > > >> > > > > Perhaps not, but does that mean it would be a Java-ism only or > >> would > >> > > > > we implement it in Python despite it being worse there? > >> > > > > > >> > > > I'm not sure, but I don't see why the proposed approach of > >> view.get() > >> > > > wouldn't work well, or be harder to implement in Python. > >> > > > > >> > > > > >> > > > > > >> > > > > > The main reason: I see how it works with Map/FlatMap, but what > >> > about > >> > > > > cases > >> > > > > > like FileIO.write(), parameterized by several lambdas (element > >> -> > >> > > > > > destination, destination -> filename policy, destination -> > >> sink), > >> > > > where > >> > > > > > different lambdas may want to access different side inputs? It > >> > feels > >> > > > > > excessive to make each of the lambdas take all of the side > >> inputs > >> > in > >> > > > the > >> > > > > > same order; moreover, if the composite transform internally > >> needs > >> > to > >> > > > pass > >> > > > > > some more side inputs to the DoFn's executing these lambdas, > it > >> > will > >> > > > need > >> > > > > > to manipulate the argument lists in nontrivial ways to make > >> sure it > >> > > > > passes > >> > > > > > them only the side inputs the user asked for, and in the > proper > >> > > order. > >> > > > > > >> > > > > In Python it would be trivial to "slice" the side input > arguments > >> > > > > across the lambdas in a natural way, but I can see that this > >> would be > >> > > > > more of a pain in Java, especially as lambdas are unnecessarily > >> > > > > crippled during compilation. > >> > > > > > >> > > > > > Another reason is, I think with Java's type system it's > >> impossible > >> > to > >> > > > > have > >> > > > > > a NewDoFn-style API for lambdas, because annotations on lambda > >> > > > arguments > >> > > > > > are dropped when the lambda is converted to the respective > >> > > > single-method > >> > > > > > interface - a lambda is subject to a lot more type erasure > than > >> > > > anonymous > >> > > > > > class. > >> > > > > > >> > > > > Yeah, this is unfortunate. But, as mentioned, side inputs don't > >> need > >> > > > > to be annotated, just counted. For something like inspecting the > >> > > > > window the NewDoFn has a lot of advantages over implicit access > >> (and > >> > > > > makes it so you can't "forget" to declare your dependency), but > I > >> do > >> > > > > see advantages for the implicit way of doing things for > >> delegating to > >> > > > > other callables. > >> > > > > > >> > > > > On the other hand, there is a bit of precedence for this: > metrics > >> > have > >> > > > > the "implicit" api. If we do go this direction for side inputs, > we > >> > > > > should also consider it for state and side outputs. > >> > > > > > >> > > > I think Kenn is very strongly against using it for state, whereas > I > >> > don't > >> > > > have an opinion either way because I can't think of a use case for > >> > > > accessing state from a lambda - we should probably discuss this > >> > > separately, > >> > > > with proposed code examples in front of us. > >> > > > > >> > > > For side outputs, yes, it might be nice to ".add()" to a > >> PCollection, > >> > but > >> > > > it would require bigger changes - e.g. creating intermediate > >> > > PCollection's > >> > > > and inserting an implicit Flatten in front of all steps that > >> contribute > >> > > to > >> > > > this PCollection, because a PCollection currently can be produced > >> only > >> > > by 1 > >> > > > step. Maybe there's a different way to express implicit side > >> outputs. > >> > > > Either way I support the idea of looking for such a way because it > >> > would > >> > > > simplify use cases such as error handling dead-letter collections. > >> > > > > >> > > > I guess the bigger point is: do we want to block the discussion of > >> > > implicit > >> > > > side inputs on making a decision about the implicitness of other > >> things > >> > > > (side outputs, state, PipelineOptions, window etc). I can see the > >> > > argument > >> > > > for a "yes, block", but can also see the argument for a "no, don't > >> > > block" - > >> > > > because this proposal is (as indicated earlier in the thread) > >> > > > forward-compatible with annotation-based wiring, because we > already > >> > have > >> > > a > >> > > > precedent for implicit access of something via ValueProvider, and > >> > because > >> > > > of the advantages it offers. > >> > > > > >> > > > Want to mention another advantage: lambdas are likely to be much > >> easier > >> > > > than NewDoFn approach to use from non-Java but JVM languages/SDKs > >> (e.g. > >> > > > Scio), which might have even more type erasure, or might have less > >> > > > sophisticated annotation machinery, or NewDoFn-style anonymous > >> classes > >> > > > might be highly non-idiomatic in them. Lambdas are idiomatic in > >> every > >> > > > language that supports lambdas, which these days is basically > every > >> > > > language. [I might be opening a can of worms here, but I guess you > >> can > >> > > > consider this an argument against NewDoFn in general - though > that's > >> > > > certainly outside the scope of this thread]. > >> > > > > >> > > > > >> > > > > > >> > > > > > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw > >> > > > > <rober...@google.com.invalid> > >> > > > > > wrote: > >> > > > > > > >> > > > > >> +1 to reducing the amount of boilerplate for dealing with > side > >> > > inputs. > >> > > > > >> > >> > > > > >> I prefer the "NewDoFn" style of side inputs for consistency. > >> The > >> > > > > >> primary drawback seems to be lambda's incompatibility with > >> > > > > >> annotations. This is solved in Python by letting all the > first > >> > > > > >> annotated argument of the process method be the main input, > and > >> > > > > >> subsequent ones be the side input. For example > >> > > > > >> > >> > > > > >> main_pcoll | beam.Map( > >> > > > > >> lambda main_input_elem, side_input_value: > main_input_elem + > >> > > > > >> side_input_value, > >> > > > > >> side_input_pvalue) > >> > > > > >> > >> > > > > >> For multiple side inputs they are mapped positionally (though > >> > Python > >> > > > > >> has the advantage that arguments can be passed by keyword as > >> well > >> > to > >> > > > > >> enhance readability when there are many of them, and we allow > >> that > >> > > for > >> > > > > >> side inputs). Note that side_input_pvalue is not referenced > >> > anywhere > >> > > > > >> else, so we don't even have to store it and pass it around > (one > >> > > > > >> typically writes pvalue.AsList(some_pcoll) inline here). When > >> the > >> > > > > >> concrete PCollectionView is used to access the value this > means > >> > that > >> > > > > >> it must be passed separately to both the ParDo and the > callback > >> > > > > >> (unless we can infer it, which I don't think we can do in all > >> > > (many?) > >> > > > > >> cases). > >> > > > > >> > >> > > > > >> There's no reason we couldn't do this, or something very > >> similar, > >> > in > >> > > > > >> Java as well. > >> > > > > >> > >> > > > > >> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax > >> > > <re...@google.com.invalid > >> > > > > > >> > > > > >> wrote: > >> > > > > >> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < > >> > > > > >> > kirpic...@google.com.invalid> wrote: > >> > > > > >> > > >> > > > > >> >> Hi, > >> > > > > >> >> > >> > > > > >> >> I agree with these concerns to an extent, however I think > >> the > >> > > > > advantage > >> > > > > >> of > >> > > > > >> >> transparently letting any user code access side inputs, > >> > > especially > >> > > > > >> >> including lambdas, is so great that we should find a way > to > >> > > address > >> > > > > >> these > >> > > > > >> >> concerns within the constraints of the pattern I'm > >> proposing. > >> > See > >> > > > > more > >> > > > > >> >> below. > >> > > > > >> >> > >> > > > > >> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers > >> > > > > >> <bchamb...@google.com.invalid > >> > > > > >> >> > > >> > > > > >> >> wrote: > >> > > > > >> >> > >> > > > > >> >> > One possible issue with this is that updating a thread > >> local > >> > is > >> > > > > >> likely to > >> > > > > >> >> > be much more expensive than passing an additional > >> argument. > >> > > > > >> >> > >> > > > > >> >> This is an implementation detail that can be fixed - Luke > >> made > >> > a > >> > > > > >> suggestion > >> > > > > >> >> on the PR to set up the side input context once per bundle > >> > rather > >> > > > > than > >> > > > > >> once > >> > > > > >> >> per element. > >> > > > > >> >> > >> > > > > >> > > >> > > > > >> > However remember that bundles might be small. Dataflow > >> streaming > >> > > > > runner > >> > > > > >> > creates small bundles by design. The Flink runner creates > >> > > > > single-element > >> > > > > >> > bundles. > >> > > > > >> > > >> > > > > >> > > >> > > > > >> >> > >> > > > > >> >> > >> > > > > >> >> > Also, not all > >> > > > > >> >> > code called from within the DoFn will necessarily be in > >> the > >> > > same > >> > > > > >> thread > >> > > > > >> >> > (eg., sometimes we create a pool of threads for doing > >> work). > >> > > > > >> >> > >> > > > > >> >> I think we already require that c.output() can not be done > >> from > >> > > > > multiple > >> > > > > >> >> threads; and I don't think we document c.sideInput() to be > >> > > > > thread-safe > >> > > > > >> - it > >> > > > > >> >> may be reasonable to declare that it isn't and has to be > >> > accessed > >> > > > > from > >> > > > > >> the > >> > > > > >> >> same thread as the ProcessElement call. If we want to > relax > >> > this, > >> > > > > then > >> > > > > >> >> there might be ways to deal with that too, e.g. provide > >> > utilities > >> > > > for > >> > > > > >> the > >> > > > > >> >> user to capture the "user code context" and restoring it > >> > inside a > >> > > > > >> thread. > >> > > > > >> >> This would likely be valuable for other purposes, such as > >> > making > >> > > > > those > >> > > > > >> >> extra threads visible to our profiling utilities. > >> > > > > >> >> > >> > > > > >> > > >> > > > > >> > This seems fair, but we should be be very careful about our > >> > > > > >> documentation. > >> > > > > >> > And +1 to adding utilities to make multi-threaded work > >> easier to > >> > > > > manage. > >> > > > > >> > > >> > > > > >> >> > >> > > > > >> >> > >> > > > > >> >> > It may be > >> > > > > >> >> > *more* confusing for this to sometimes work magically > and > >> > > > sometimes > >> > > > > >> fail > >> > > > > >> >> > horribly. Also, requiring the PCollectionView to be > >> passed to > >> > > > user > >> > > > > >> code > >> > > > > >> >> > that accesses it is nice because it makes *very clear* > >> that > >> > the > >> > > > > side > >> > > > > >> >> input > >> > > > > >> >> > needs to be provided from the DoFn to that particular > >> > utility. > >> > > If > >> > > > > it > >> > > > > >> is > >> > > > > >> >> > accessed via "spooky action at a distance" we lose that > >> piece > >> > > of > >> > > > > >> "free" > >> > > > > >> >> > documentation, which may lead to extensive misuse of > these > >> > > > utility > >> > > > > >> >> methods. > >> > > > > >> >> > > >> > > > > >> >> I'd like to understand this concern better - from this > >> > > description > >> > > > > it's > >> > > > > >> not > >> > > > > >> >> clear to me. The pattern I'm proposing is that, when > you're > >> > > > > authoring a > >> > > > > >> >> PTransform that is configured by any user callbacks, then: > >> > > > > >> >> - you should provide a builder method .withSideInputs(...) > >> > > > > >> >> - you should propagate those side inputs to all your > >> internal > >> > > > DoFn's > >> > > > > >> that > >> > > > > >> >> invoke the user code > >> > > > > >> >> - in return the user callbacks will be allowed to access > >> those > >> > > > > >> particular > >> > > > > >> >> side inputs > >> > > > > >> >> This seems like a simple enough model to me to understand, > >> both > >> > > > from > >> > > > > a > >> > > > > >> >> user's perspective and from a transform author's > >> perspective. > >> > > > Steps 1 > >> > > > > >> and 2 > >> > > > > >> >> may eventually be automated by annotation analysis or > other > >> > means > >> > > > > (e.g. > >> > > > > >> SDK > >> > > > > >> >> giving a way to provide given side inputs automatically to > >> > > > everything > >> > > > > >> >> inside a composite transform rather than to individual > >> DoFn's). > >> > > > > >> >> > >> > > > > >> >> > >> > > > > >> >> > > >> > > > > >> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov > >> > > > > >> >> > <kirpic...@google.com.invalid> wrote: > >> > > > > >> >> > > >> > > > > >> >> > > Hi, > >> > > > > >> >> > > > >> > > > > >> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles > >> > > > > >> <k...@google.com.invalid > >> > > > > >> >> > > >> > > > > >> >> > > wrote: > >> > > > > >> >> > > > >> > > > > >> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > >> > > > > >> >> > > > kirpic...@google.com.invalid> wrote: > >> > > > > >> >> > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > The differences are: > >> > > > > >> >> > > > > - The proposal in the doc allows wiring different > >> side > >> > > > > inputs to > >> > > > > >> >> the > >> > > > > >> >> > > same > >> > > > > >> >> > > > > Supplier, but I'm not convinced that this is > >> important > >> > - > >> > > > you > >> > > > > can > >> > > > > >> >> just > >> > > > > >> >> > > as > >> > > > > >> >> > > > > easily call the constructor of your DoFn passing > >> > > different > >> > > > > >> >> > > > > PCollectionView's for it to capture. > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > I disagree with this bit about it being "just as > >> easy". > >> > > > Passing > >> > > > > >> the > >> > > > > >> >> > > needed > >> > > > > >> >> > > > PCollectionViews to your constructor (or even > having a > >> > > > > >> constructor) > >> > > > > >> >> is > >> > > > > >> >> > a > >> > > > > >> >> > > > pain. Every time I have to do it, it adds a ton of > >> > > > boilerplate > >> > > > > >> that > >> > > > > >> >> > feels > >> > > > > >> >> > > > like pure noise. To make a DoFn reusable it must be > >> made > >> > > > into a > >> > > > > >> named > >> > > > > >> >> > > class > >> > > > > >> >> > > > with a constructor, versus inlined with no > >> constructor. > >> > > > > >> >> > > > >> > > > > >> >> > > Hm, why? You can have the DoFn be an anonymous class > >> > > capturing > >> > > > > the > >> > > > > >> >> > > PCollectionView into a @SideInput field as a closure. > >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > A generous analogy > >> > > > > >> >> > > > is is that it is "just" manual closure > >> > conversion/currying, > >> > > > > >> changing > >> > > > > >> >> > > > f(side, main) to f(side)(main). But in practice in > >> Beam > >> > the > >> > > > > second > >> > > > > >> >> one > >> > > > > >> >> > > has > >> > > > > >> >> > > > much more boilerplate. > >> > > > > >> >> > > > > >> > > > > >> >> > > > Also, Beam is worse. We present the user with > >> > higher-order > >> > > > > >> functions, > >> > > > > >> >> > > which > >> > > > > >> >> > > > is where the actual annoyance comes in. When you > want > >> to > >> > > > > pardo(f) > >> > > > > >> you > >> > > > > >> >> > > have > >> > > > > >> >> > > > to write pardo(f(side))(side, main). Your proposal > is > >> to > >> > > > > support > >> > > > > >> >> > > > pardo(f(side))(main) and mine is to support > >> > pardo(f)(side, > >> > > > > main). > >> > > > > >> I > >> > > > > >> >> > still > >> > > > > >> >> > > > propose that we support both (as they get > >> implemented). > >> > If > >> > > > you > >> > > > > >> buy in > >> > > > > >> >> > to > >> > > > > >> >> > > my > >> > > > > >> >> > > > analogy, then there's decades of precedent and the > >> burden > >> > > of > >> > > > > proof > >> > > > > >> >> > falls > >> > > > > >> >> > > > heavily on whoever doesn't want to support both. > >> > > > > >> >> > > > > >> > > > > >> >> > > I see your point. I think the proposal is compatible > >> with > >> > > what > >> > > > > >> you're > >> > > > > >> >> > > suggesting too - in DoFn we could have @SideInput > >> > > *parameters* > >> > > > of > >> > > > > >> type > >> > > > > >> >> > > PCollectionView, with the same semantics as a field. > >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > - My proposal allows getting rid of > .withSideInputs() > >> > > > entirely, > >> > > > > >> >> because > >> > > > > >> >> > > the > >> > > > > >> >> > > > > DoFn captures the PCollectionView so you don't > need > >> to > >> > > > > specify > >> > > > > >> it > >> > > > > >> >> > > > > explicitly for wiring. > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > I've decided to change to full +1 (whatever that > means > >> > > > > compared to > >> > > > > >> >> 0.75 > >> > > > > >> >> > > :-) > >> > > > > >> >> > > > to adding support for @SideInput fields, because the > >> > > benefits > >> > > > > >> >> outweigh > >> > > > > >> >> > > this > >> > > > > >> >> > > > failure mode: > >> > > > > >> >> > > > > >> > > > > >> >> > > > new DoFn { > >> > > > > >> >> > > > // forgot the annotation > >> > > > > >> >> > > > private final PCollectionView whatever; > >> > > > > >> >> > > > > >> > > > > >> >> > > > @ProcessElement public void process(...) { > >> > > > > >> >> > > > whatever.get(); // crash during execution > >> > > > > >> >> > > > } > >> > > > > >> >> > > > } > >> > > > > >> >> > > > > >> > > > > >> >> > > > But ideas to mitigate that would be cool. > >> > > > > >> >> > > > >> > > > > >> >> > > Hm, can't think of anything less hacky than "prohibit > >> > having > >> > > > > fields > >> > > > > >> of > >> > > > > >> >> > type > >> > > > > >> >> > > PCollectionView that are not public, final, and > >> annotated > >> > > with > >> > > > > >> >> > @SideInput" > >> > > > > >> >> > > - not sure we'd want to go down this road. I suppose a > >> good > >> > > > error > >> > > > > >> >> message > >> > > > > >> >> > > in .get() would be sufficient, saying "Did you forget > to > >> > > > specify > >> > > > > a > >> > > > > >> >> > > requirement for this side input via .withSideInputs() > >> or by > >> > > > > >> annotating > >> > > > > >> >> > the > >> > > > > >> >> > > field as @SideInput" or something like that. > >> > > > > >> >> > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > > >> > > > > >> >> > > > Kenn > >> > > > > >> >> > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik > >> > > > > >> >> <lc...@google.com.invalid > >> > > > > >> >> > > > >> > > > > >> >> > > > > wrote: > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > > My concern with the proposal is not the > specifics > >> of > >> > > how > >> > > > it > >> > > > > >> will > >> > > > > >> >> > work > >> > > > > >> >> > > > and > >> > > > > >> >> > > > > > more about it being yet another way on how our > >> API is > >> > > to > >> > > > be > >> > > > > >> used > >> > > > > >> >> > even > >> > > > > >> >> > > > > > though we have a proposal [1] of an API style we > >> were > >> > > > > working > >> > > > > >> >> > towards > >> > > > > >> >> > > > in > >> > > > > >> >> > > > > > Java and Python. I would rather re-open that > >> > discussion > >> > > > now > >> > > > > >> about > >> > > > > >> >> > > what > >> > > > > >> >> > > > we > >> > > > > >> >> > > > > > want that API to look like for our major > features > >> and > >> > > > work > >> > > > > >> >> towards > >> > > > > >> >> > > > > > consistency (or not if there is a strong > argument > >> as > >> > to > >> > > > why > >> > > > > >> some > >> > > > > >> >> > > > feature > >> > > > > >> >> > > > > > should have a different style). > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > 1: https://s.apache.org/a-new-dofn > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > >> > > > > >> >> > > > <k...@google.com.invalid > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > wrote: > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > +0.75 because I'd like to bring up invalid > >> > pipelines. > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > I had proposed side inputs as parameters to > >> DoFn in > >> > > > > >> >> > > > > > > https://s.apache.org/a-new-dofn (specifically > >> at > >> > > [1]) > >> > > > so > >> > > > > >> the > >> > > > > >> >> > only > >> > > > > >> >> > > > > place > >> > > > > >> >> > > > > > > they are specified is in the graph > construction, > >> > > making > >> > > > > the > >> > > > > >> >> DoFn > >> > > > > >> >> > > more > >> > > > > >> >> > > > > > > reusable and errors impossible. I've actually > >> been > >> > > > > noodling > >> > > > > >> my > >> > > > > >> >> > way > >> > > > > >> >> > > > > > towards > >> > > > > >> >> > > > > > > this in a branch :-) > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > Eugene's proposal is a sort of converse, where > >> the > >> > > side > >> > > > > >> inputs > >> > > > > >> >> > are > >> > > > > >> >> > > > > values > >> > > > > >> >> > > > > > > captured in the closure and not parameters, > yet > >> the > >> > > > only > >> > > > > >> place > >> > > > > >> >> > they > >> > > > > >> >> > > > are > >> > > > > >> >> > > > > > > specified is in the DoFn. > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > I see no conflict between these two. It is > very > >> > > natural > >> > > > > to > >> > > > > >> have > >> > > > > >> >> > > both > >> > > > > >> >> > > > > the > >> > > > > >> >> > > > > > > capability to accept parameters and the > ability > >> to > >> > > > > capture > >> > > > > >> >> > > variables > >> > > > > >> >> > > > in > >> > > > > >> >> > > > > > the > >> > > > > >> >> > > > > > > closure. Supporting both is totally standard > in > >> > > > > up-to-date > >> > > > > >> >> > > > programming > >> > > > > >> >> > > > > > > languages. > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > Today we have the worse of both worlds: > >> > > PCollectionView > >> > > > > >> behaves > >> > > > > >> >> > as > >> > > > > >> >> > > > > > > something captured in the closure/constructor, > >> but > >> > > must > >> > > > > >> still > >> > > > > >> >> be > >> > > > > >> >> > > > > > explicitly > >> > > > > >> >> > > > > > > wired up. > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > But if I use PCollectionView.get() and have > not > >> > wired > >> > > > it > >> > > > > up > >> > > > > >> in > >> > > > > >> >> > any > >> > > > > >> >> > > > way, > >> > > > > >> >> > > > > > > what happens? Just like today, you can try to > >> > > > > >> .sideInput(...) a > >> > > > > >> >> > > thing > >> > > > > >> >> > > > > > that > >> > > > > >> >> > > > > > > is not available. With side inputs as > >> parameters, > >> > > this > >> > > > is > >> > > > > >> not > >> > > > > >> >> > > > possible. > >> > > > > >> >> > > > > > If > >> > > > > >> >> > > > > > > you want to treat them as captured in a > closure, > >> > > while > >> > > > > >> avoiding > >> > > > > >> >> > > > errors, > >> > > > > >> >> > > > > > it > >> > > > > >> >> > > > > > > seems like you might need to do some low-level > >> > magic, > >> > > > > like > >> > > > > >> the > >> > > > > >> >> > > > > > > serialization-based detection that Luke has > >> > suggested > >> > > > > before > >> > > > > >> >> > (there > >> > > > > >> >> > > > are > >> > > > > >> >> > > > > > > known downsides that we haven't explored, like > >> > > > > overcapture). > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > Kenn > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > [1] > >> > > > > >> >> > > > > > > > >> > > > > >> >> > https://docs.google.com/document/d/ > >> > > > 1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > >> > > > > >> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene > >> Kirpichov < > >> > > > > >> >> > > > > > > kirpic...@google.com.invalid> wrote: > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > > Hm, I guess you're right - for outputs it > >> could > >> > be > >> > > > > indeed > >> > > > > >> >> quite > >> > > > > >> >> > > > > > valuable > >> > > > > >> >> > > > > > > to > >> > > > > >> >> > > > > > > > output to them without plumbing (e.g. > >> outputting > >> > > > > errors). > >> > > > > >> >> Could > >> > > > > >> >> > > be > >> > > > > >> >> > > > > done > >> > > > > >> >> > > > > > > > perhaps via TupleTag.output()? (assuming the > >> same > >> > > > > TupleTag > >> > > > > >> >> can > >> > > > > >> >> > > not > >> > > > > >> >> > > > be > >> > > > > >> >> > > > > > > > reused to tag multiple PCollection's) > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > For now I sent a PR for side input support > >> > > > > >> >> > > > > > > > https://github.com/apache/beam/pull/3814 . > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > >> > > > > >> >> > > > <lc...@google.com.invalid > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > > > wrote: > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > I disagree, state may not care where it is > >> used > >> > > as > >> > > > > well > >> > > > > >> >> > since a > >> > > > > >> >> > > > > > person > >> > > > > >> >> > > > > > > > may > >> > > > > >> >> > > > > > > > > call a function which needs to > >> store/retrieve > >> > > state > >> > > > > and > >> > > > > >> >> > instead > >> > > > > >> >> > > > of > >> > > > > >> >> > > > > > > having > >> > > > > >> >> > > > > > > > > the DoFn declare the StateSpec and then > >> pass in > >> > > the > >> > > > > >> state > >> > > > > >> >> > > > > > > implementation > >> > > > > >> >> > > > > > > > > down into the function everywhere. > Similarly > >> > for > >> > > > > >> outputs, > >> > > > > >> >> the > >> > > > > >> >> > > > > > internal > >> > > > > >> >> > > > > > > > > functions could take the TupleTag and > >> request > >> > an > >> > > > > output > >> > > > > >> >> > manager > >> > > > > >> >> > > > or > >> > > > > >> >> > > > > > take > >> > > > > >> >> > > > > > > > an > >> > > > > >> >> > > > > > > > > "output" reference which give functions > the > >> > > ability > >> > > > > to > >> > > > > >> >> > produce > >> > > > > >> >> > > > > output > >> > > > > >> >> > > > > > > > > directly without needing to pass > everything > >> > that > >> > > is > >> > > > > >> needed > >> > > > > >> >> to > >> > > > > >> >> > > be > >> > > > > >> >> > > > > > output > >> > > > > >> >> > > > > > > > > back to the caller. > >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene > >> > Kirpichov > >> > > < > >> > > > > >> >> > > > > > > > > kirpic...@google.com.invalid> wrote: > >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > > Hm, I think of these things (state, side > >> > > outputs > >> > > > > >> etc.), > >> > > > > >> >> > only > >> > > > > >> >> > > > side > >> > > > > >> >> > > > > > > > inputs > >> > > > > >> >> > > > > > > > > > make sense to access in arbitrary user > >> > > callbacks > >> > > > > >> without > >> > > > > >> >> > > > explicit > >> > > > > >> >> > > > > > > > > knowledge > >> > > > > >> >> > > > > > > > > > of the surrounding transform - so only > >> side > >> > > > inputs > >> > > > > >> can be > >> > > > > >> >> > > > > implicit > >> > > > > >> >> > > > > > > like > >> > > > > >> >> > > > > > > > > > this. > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > Ultimately we'll probably end up > removing > >> > > > > >> ProcessContext, > >> > > > > >> >> > and > >> > > > > >> >> > > > > > keeping > >> > > > > >> >> > > > > > > > > only > >> > > > > >> >> > > > > > > > > > annotations (on fields / methods / > >> > parameters). > >> > > > In > >> > > > > >> that > >> > > > > >> >> > > world, > >> > > > > >> >> > > > a > >> > > > > >> >> > > > > > > field > >> > > > > >> >> > > > > > > > > > annotation could be used (like per my > >> > previous > >> > > > > email) > >> > > > > >> to > >> > > > > >> >> > > > > statically > >> > > > > >> >> > > > > > > > > specify > >> > > > > >> >> > > > > > > > > > which side inputs will be needed - while > >> the > >> > > > value > >> > > > > >> could > >> > > > > >> >> > > still > >> > > > > >> >> > > > be > >> > > > > >> >> > > > > > > > > accessed > >> > > > > >> >> > > > > > > > > > via .get(), just like state cells are > >> > accessed > >> > > > via > >> > > > > >> >> .read() > >> > > > > >> >> > > and > >> > > > > >> >> > > > > > > > .write(): > >> > > > > >> >> > > > > > > > > > i.e., #get() is not a new method of > >> access. > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > Overall, it seems like I should proceed > >> with > >> > > the > >> > > > > >> idea. I > >> > > > > >> >> > > filed > >> > > > > >> >> > > > > > > > > > > >> > > https://issues.apache.org/jira/browse/BEAM-2844. > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz > Cwik > >> > > > > >> >> > > > > > <lc...@google.com.invalid > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > > > wrote: > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > For API consistency reasons, it would > be > >> > good > >> > > > if > >> > > > > we > >> > > > > >> did > >> > > > > >> >> > > this > >> > > > > >> >> > > > > > > > > holistically > >> > > > > >> >> > > > > > > > > > > and expanded this approach to state, > >> side > >> > > > > outputs, > >> > > > > >> ... > >> > > > > >> >> so > >> > > > > >> >> > > > that > >> > > > > >> >> > > > > a > >> > > > > >> >> > > > > > > > person > >> > > > > >> >> > > > > > > > > > can > >> > > > > >> >> > > > > > > > > > > always call Something.get() to return > >> > > something > >> > > > > that > >> > > > > >> >> they > >> > > > > >> >> > > can > >> > > > > >> >> > > > > > > access > >> > > > > >> >> > > > > > > > > > > implementation wise. It will be > >> confusing > >> > for > >> > > > our > >> > > > > >> users > >> > > > > >> >> > to > >> > > > > >> >> > > > have > >> > > > > >> >> > > > > > > many > >> > > > > >> >> > > > > > > > > > > variations in our style of how all > these > >> > > > concepts > >> > > > > >> are > >> > > > > >> >> > used > >> > > > > >> >> > > > > > > > > > (ProcessContext > >> > > > > >> >> > > > > > > > > > > / Annotations / #get()) > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene > >> > > > Kirpichov > >> > > > > < > >> > > > > >> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote: > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > Also, I think my approach is > >> compatible > >> > > with > >> > > > > >> >> > annotations > >> > > > > >> >> > > > and > >> > > > > >> >> > > > > > > future > >> > > > > >> >> > > > > > > > > > > removal > >> > > > > >> >> > > > > > > > > > > > of .withSideInputs if we annotate a > >> > field: > >> > > > > >> >> > > > > > > > > > > > final PCollectionView<Foo> foo = > ...; > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > class MyDoFn { > >> > > > > >> >> > > > > > > > > > > > @SideInput > >> > > > > >> >> > > > > > > > > > > > PCollectionView<Foo> foo = foo; > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > ...foo.get()... > >> > > > > >> >> > > > > > > > > > > > } > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > We can extract the accessed views > from > >> > the > >> > > > DoFn > >> > > > > >> >> > instance > >> > > > > >> >> > > > > using > >> > > > > >> >> > > > > > > > > > > reflection. > >> > > > > >> >> > > > > > > > > > > > Still not compatible with lambdas, > but > >> > > > > compatible > >> > > > > >> >> > > > > automatically > >> > > > > >> >> > > > > > > > with > >> > > > > >> >> > > > > > > > > > all > >> > > > > >> >> > > > > > > > > > > > anonymous classes. > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene > >> > > > Kirpichov < > >> > > > > >> >> > > > > > > > kirpic...@google.com> > >> > > > > >> >> > > > > > > > > > > > wrote: > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Hi Luke, > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > I know this (annotations) is the > >> > pattern > >> > > we > >> > > > > were > >> > > > > >> >> > > > > considering > >> > > > > >> >> > > > > > > for > >> > > > > >> >> > > > > > > > > side > >> > > > > >> >> > > > > > > > > > > > > inputs, but I no longer think it > is > >> the > >> > > > best > >> > > > > >> way to > >> > > > > >> >> > > > access > >> > > > > >> >> > > > > > > them. > >> > > > > >> >> > > > > > > > > > > > > Annotations help getting rid of > the > >> > > > > >> >> .withSideInputs() > >> > > > > >> >> > > > call, > >> > > > > >> >> > > > > > but > >> > > > > >> >> > > > > > > > > this > >> > > > > >> >> > > > > > > > > > is > >> > > > > >> >> > > > > > > > > > > > > where their advantage ends. > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > The advantages of the proposed > >> approach > >> > > are > >> > > > > >> that it > >> > > > > >> >> > > > > > > automatically > >> > > > > >> >> > > > > > > > > > works > >> > > > > >> >> > > > > > > > > > > > > with all existing callback or > lambda > >> > > code. > >> > > > No > >> > > > > >> need > >> > > > > >> >> to > >> > > > > >> >> > > > > further > >> > > > > >> >> > > > > > > > > develop > >> > > > > >> >> > > > > > > > > > > the > >> > > > > >> >> > > > > > > > > > > > > reflection machinery to support > side > >> > > input > >> > > > > >> >> > annotations > >> > > > > >> >> > > - > >> > > > > >> >> > > > > and > >> > > > > >> >> > > > > > > > > > especially > >> > > > > >> >> > > > > > > > > > > > to > >> > > > > >> >> > > > > > > > > > > > > support arbitrary user interfaces, > >> no > >> > > need > >> > > > to > >> > > > > >> >> change > >> > > > > >> >> > > > > existing > >> > > > > >> >> > > > > > > > > > > transforms, > >> > > > > >> >> > > > > > > > > > > > > no need for transform authors to > >> even > >> > > know > >> > > > > that > >> > > > > >> the > >> > > > > >> >> > > > > machinery > >> > > > > >> >> > > > > > > > > exists > >> > > > > >> >> > > > > > > > > > to > >> > > > > >> >> > > > > > > > > > > > > make side inputs usable in their > >> > > transforms > >> > > > > >> (and no > >> > > > > >> >> > > need > >> > > > > >> >> > > > > for > >> > > > > >> >> > > > > > > > > authors > >> > > > > >> >> > > > > > > > > > to > >> > > > > >> >> > > > > > > > > > > > > think about whether or not they > >> should > >> > > > > support > >> > > > > >> side > >> > > > > >> >> > > > > inputs). > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > Moreover, like Reuven says, > >> annotations > >> > > > don't > >> > > > > >> work > >> > > > > >> >> > with > >> > > > > >> >> > > > > > lambdas > >> > > > > >> >> > > > > > > > at > >> > > > > >> >> > > > > > > > > > all: > >> > > > > >> >> > > > > > > > > > > > > creating a lambda with a flexible > >> set > >> > of > >> > > > > >> annotation > >> > > > > >> >> > > > > arguments > >> > > > > >> >> > > > > > > > > appears > >> > > > > >> >> > > > > > > > > > > to > >> > > > > >> >> > > > > > > > > > > > be > >> > > > > >> >> > > > > > > > > > > > > currently impossible, and even > >> > capturing > >> > > > the > >> > > > > >> >> > > annotations > >> > > > > >> >> > > > on > >> > > > > >> >> > > > > > > > > arguments > >> > > > > >> >> > > > > > > > > > > of > >> > > > > >> >> > > > > > > > > > > > a > >> > > > > >> >> > > > > > > > > > > > > lambda is I believe also > impossible > >> > > because > >> > > > > the > >> > > > > >> >> Java > >> > > > > >> >> > > > > compiler > >> > > > > >> >> > > > > > > > drops > >> > > > > >> >> > > > > > > > > > > them > >> > > > > >> >> > > > > > > > > > > > in > >> > > > > >> >> > > > > > > > > > > > > the generated class or method > >> handle. > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM > >> Lukasz > >> > > Cwik > >> > > > > >> >> > > > > > > > > <lc...@google.com.invalid > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > wrote: > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> I believe we should follow the > >> pattern > >> > > > that > >> > > > > >> state > >> > > > > >> >> > uses > >> > > > > >> >> > > > and > >> > > > > >> >> > > > > > > add a > >> > > > > >> >> > > > > > > > > > type > >> > > > > >> >> > > > > > > > > > > > >> annotation to link the side input > >> > > > > definition to > >> > > > > >> >> its > >> > > > > >> >> > > > usage > >> > > > > >> >> > > > > > > > > directly. > >> > > > > >> >> > > > > > > > > > > This > >> > > > > >> >> > > > > > > > > > > > >> would allow us to know that the > >> side > >> > > input > >> > > > > was > >> > > > > >> >> > > > definitely > >> > > > > >> >> > > > > > > being > >> > > > > >> >> > > > > > > > > > > accessed > >> > > > > >> >> > > > > > > > > > > > >> and perform validation during > graph > >> > > > > >> construction > >> > > > > >> >> for > >> > > > > >> >> > > any > >> > > > > >> >> > > > > > used > >> > > > > >> >> > > > > > > > but > >> > > > > >> >> > > > > > > > > > > > >> unspecified side inputs. > >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> Code snippet: > >> > > > > >> >> > > > > > > > > > > > >> final PCollectionView<String> > foo = > >> > > > > >> >> > > > > > pipeline.apply("fooName", > >> > > > > >> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.< > >> > > > > >> >> String>asSingleton()); > >> > > > > >> >> > > > > > > > > > > > >> PCollection<String> output = > >> pipeline > >> > > > > >> >> > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > >> > > > > >> >> > > > > > > > > > > > >> .apply(MapElements.via( > >> > > > > >> >> > > > > > > > > > > > >> new > SimpleFunction<Integer, > >> > > > > String>() { > >> > > > > >> >> > > > > > > > > > > > >> @Override > >> > > > > >> >> > > > > > > > > > > > >> public String > >> apply(Integer > >> > > > input, > >> > > > > >> >> > > > > > > > @SideInput("fooName") > >> > > > > >> >> > > > > > > > > > > > String > >> > > > > >> >> > > > > > > > > > > > >> fooValue) { > >> > > > > >> >> > > > > > > > > > > > >> return fooValue + " > " + > >> > > input; > >> > > > > >> >> > > > > > > > > > > > >> } > >> > > > > >> >> > > > > > > > > > > > >> }).withSideInputs(foo));* > >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, > >> Eugene > >> > > > > >> Kirpichov < > >> > > > > >> >> > > > > > > > > > > > >> kirpic...@google.com.invalid> > >> wrote: > >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > >> > Sure, here's how a modified > >> > (passing) > >> > > > > >> >> MapElements > >> > > > > >> >> > > unit > >> > > > > >> >> > > > > > test > >> > > > > >> >> > > > > > > > > looks > >> > > > > >> >> > > > > > > > > > > > like, > >> > > > > >> >> > > > > > > > > > > > >> > with usage of side inputs: > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > @Test > >> > > > > >> >> > > > > > > > > > > > >> > @Category(NeedsRunner.class) > >> > > > > >> >> > > > > > > > > > > > >> > public void > >> > > > testMapBasicWithSideInput() > >> > > > > >> throws > >> > > > > >> >> > > > > > Exception { > >> > > > > >> >> > > > > > > > > > > > >> > * final > >> PCollectionView<String> > >> > foo > >> > > > =* > >> > > > > >> >> > > > > > > > > > > > >> > * pipeline.apply("foo", > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > Create.of("foo")).apply(View.<String>asSingleton());* > >> > > > > >> >> > > > > > > > > > > > >> > PCollection<String> output > = > >> > > > pipeline > >> > > > > >> >> > > > > > > > > > > > >> > .apply(Create.of(1, 2, > >> 3)) > >> > > > > >> >> > > > > > > > > > > > >> > .apply(MapElements.via( > >> > > > > >> >> > > > > > > > > > > > >> > new > >> > > SimpleFunction<Integer, > >> > > > > >> >> String>() > >> > > > > >> >> > { > >> > > > > >> >> > > > > > > > > > > > >> > @Override > >> > > > > >> >> > > > > > > > > > > > >> > public String > >> > > > apply(Integer > >> > > > > >> >> input) { > >> > > > > >> >> > > > > > > > > > > > >> > return* > foo.get() > >> > *+ " > >> > > > " + > >> > > > > >> >> input; > >> > > > > >> >> > > > > > > > > > > > >> > } > >> > > > > >> >> > > > > > > > > > > > >> > }) > >> > > > > >> >> > > > > > > > > > > > >> > > *.withSideInputs(foo));* > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > PAssert.that(output). > >> > > > > >> >> containsInAnyOrder("foo > >> > > > > >> >> > 1", > >> > > > > >> >> > > > > "foo > >> > > > > >> >> > > > > > > 2", > >> > > > > >> >> > > > > > > > > > "foo > >> > > > > >> >> > > > > > > > > > > > 3"); > >> > > > > >> >> > > > > > > > > > > > >> > pipeline.run(); > >> > > > > >> >> > > > > > > > > > > > >> > } > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM > >> > Reuven > >> > > > Lax > >> > > > > >> >> > > > > > > > > > <re...@google.com.invalid > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > wrote: > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > > Can you provide a code > snippet > >> > > showing > >> > > > > how > >> > > > > >> >> this > >> > > > > >> >> > > > would > >> > > > > >> >> > > > > > > look? > >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 > PM, > >> > > Eugene > >> > > > > >> >> > Kirpichov < > >> > > > > >> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid > > > >> > > wrote: > >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > > TL;DR Introduce method > >> > > > > >> >> PCollectionView.get(), > >> > > > > >> >> > > > > > > implemented > >> > > > > >> >> > > > > > > > > as: > >> > > > > >> >> > > > > > > > > > > get > >> > > > > >> >> > > > > > > > > > > > >> > > > thread-local ProcessContext > >> and > >> > do > >> > > > > >> >> > > > > c.sideInput(this). > >> > > > > >> >> > > > > > > As a > >> > > > > >> >> > > > > > > > > > > result, > >> > > > > >> >> > > > > > > > > > > > >> any > >> > > > > >> >> > > > > > > > > > > > >> > > user > >> > > > > >> >> > > > > > > > > > > > >> > > > lambdas such as MapElements > >> can > >> > > use > >> > > > > side > >> > > > > >> >> > inputs. > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Quite a few transforms have > >> > > > user-code > >> > > > > >> >> > callbacks > >> > > > > >> >> > > or > >> > > > > >> >> > > > > > > > lambdas: > >> > > > > >> >> > > > > > > > > > > ParDo > >> > > > > >> >> > > > > > > > > > > > >> > (DoFn), > >> > > > > >> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the > >> > > > > >> DynamicDestinations > >> > > > > >> >> > > > classes > >> > > > > >> >> > > > > > in > >> > > > > >> >> > > > > > > > > > various > >> > > > > >> >> > > > > > > > > > > > IOs, > >> > > > > >> >> > > > > > > > > > > > >> > > > combine fns, the PollFn > >> callback > >> > > in > >> > > > > >> Watch, > >> > > > > >> >> > etc. > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Of these, only DoFn and > >> > CombineFn > >> > > > have > >> > > > > >> >> > built-in > >> > > > > >> >> > > > > > support > >> > > > > >> >> > > > > > > > for > >> > > > > >> >> > > > > > > > > > side > >> > > > > >> >> > > > > > > > > > > > >> > inputs; > >> > > > > >> >> > > > > > > > > > > > >> > > > for DynamicDestinations it > is > >> > > > plumbed > >> > > > > >> >> > > explicitly; > >> > > > > >> >> > > > > > others > >> > > > > >> >> > > > > > > > > don't > >> > > > > >> >> > > > > > > > > > > > have > >> > > > > >> >> > > > > > > > > > > > >> > > access > >> > > > > >> >> > > > > > > > > > > > >> > > > (e.g. you can't access side > >> > inputs > >> > > > > from > >> > > > > >> >> > > > > > > > Map/FlatMapElements > >> > > > > >> >> > > > > > > > > > > > because > >> > > > > >> >> > > > > > > > > > > > >> > they > >> > > > > >> >> > > > > > > > > > > > >> > > > don't have a ProcessContext > >> or > >> > any > >> > > > > >> context > >> > > > > >> >> for > >> > > > > >> >> > > > that > >> > > > > >> >> > > > > > > > matter). > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > I think it's important to > >> solve > >> > > > this, > >> > > > > >> >> > especially > >> > > > > >> >> > > > as > >> > > > > >> >> > > > > > > Java 8 > >> > > > > >> >> > > > > > > > > > > becomes > >> > > > > >> >> > > > > > > > > > > > >> > > people's > >> > > > > >> >> > > > > > > > > > > > >> > > > default choice: users will > >> want > >> > to > >> > > > use > >> > > > > >> side > >> > > > > >> >> > > inputs > >> > > > > >> >> > > > > in > >> > > > > >> >> > > > > > > > > > > > >> > > Map/FlatMapElements. > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > It also appears to be quite > >> easy > >> > > to > >> > > > > >> >> implement: > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Runner part: > >> > > > > >> >> > > > > > > > > > > > >> > > > - introduce a > >> SideInputAccessor > >> > > > > interface > >> > > > > >> >> > > > > > > > > > > > >> > > > - make .get() on a > >> > PCollectionView > >> > > > > get it > >> > > > > >> >> > from a > >> > > > > >> >> > > > > > > > > thread-local > >> > > > > >> >> > > > > > > > > > > > >> > > > SideInputAccessor > >> > > > > >> >> > > > > > > > > > > > >> > > > - make runners set the > >> > > thread-local > >> > > > > >> >> > > > > SideInputAccessor > >> > > > > >> >> > > > > > > any > >> > > > > >> >> > > > > > > > > time > >> > > > > >> >> > > > > > > > > > > the > >> > > > > >> >> > > > > > > > > > > > >> > runner > >> > > > > >> >> > > > > > > > > > > > >> > > > is evaluating something in > a > >> > > context > >> > > > > >> where > >> > > > > >> >> > side > >> > > > > >> >> > > > > inputs > >> > > > > >> >> > > > > > > are > >> > > > > >> >> > > > > > > > > > > > >> available, > >> > > > > >> >> > > > > > > > > > > > >> > > e.g. > >> > > > > >> >> > > > > > > > > > > > >> > > > a ProcessElement method, or > >> > > > applying a > >> > > > > >> >> > > CombineFn - > >> > > > > >> >> > > > > the > >> > > > > >> >> > > > > > > set > >> > > > > >> >> > > > > > > > > of > >> > > > > >> >> > > > > > > > > > > such > >> > > > > >> >> > > > > > > > > > > > >> > places > >> > > > > >> >> > > > > > > > > > > > >> > > > will be quite small. I > >> believe > >> > > only > >> > > > > >> runners > >> > > > > >> >> > (but > >> > > > > >> >> > > > not > >> > > > > >> >> > > > > > > > > > transforms) > >> > > > > >> >> > > > > > > > > > > > >> will > >> > > > > >> >> > > > > > > > > > > > >> > > ever > >> > > > > >> >> > > > > > > > > > > > >> > > > need to set this > thread-local > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Transform part: > >> > > > > >> >> > > > > > > > > > > > >> > > > - Transforms that take > >> user-code > >> > > > > lambdas > >> > > > > >> and > >> > > > > >> >> > > want > >> > > > > >> >> > > > to > >> > > > > >> >> > > > > > let > >> > > > > >> >> > > > > > > > > them > >> > > > > >> >> > > > > > > > > > > > access > >> > > > > >> >> > > > > > > > > > > > >> > side > >> > > > > >> >> > > > > > > > > > > > >> > > > inputs still will need to > be > >> > > > > configurable > >> > > > > >> >> > with a > >> > > > > >> >> > > > > > method > >> > > > > >> >> > > > > > > > like > >> > > > > >> >> > > > > > > > > > > > >> > > > .withSideInputs(view1, > >> view2...) > >> > > and > >> > > > > will > >> > > > > >> >> need > >> > > > > >> >> > > to > >> > > > > >> >> > > > > > plumb > >> > > > > >> >> > > > > > > > > those > >> > > > > >> >> > > > > > > > > > > down > >> > > > > >> >> > > > > > > > > > > > >> to > >> > > > > >> >> > > > > > > > > > > > >> > the > >> > > > > >> >> > > > > > > > > > > > >> > > > primitive DoFn's or > >> CombineFn's > >> > - > >> > > > > >> >> information > >> > > > > >> >> > on > >> > > > > >> >> > > > > > *which* > >> > > > > >> >> > > > > > > > > side > >> > > > > >> >> > > > > > > > > > > > inputs > >> > > > > >> >> > > > > > > > > > > > >> > will > >> > > > > >> >> > > > > > > > > > > > >> > > > be accessed, of course, > still > >> > > needs > >> > > > to > >> > > > > >> be in > >> > > > > >> >> > the > >> > > > > >> >> > > > > > graph. > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > I quickly prototyped this > in > >> > > direct > >> > > > > >> runner > >> > > > > >> >> and > >> > > > > >> >> > > > > > > > MapElements, > >> > > > > >> >> > > > > > > > > > and > >> > > > > >> >> > > > > > > > > > > it > >> > > > > >> >> > > > > > > > > > > > >> > worked > >> > > > > >> >> > > > > > > > > > > > >> > > > with no issues - I'm > >> wondering > >> > if > >> > > > > there's > >> > > > > >> >> > > > something > >> > > > > >> >> > > > > > > subtle > >> > > > > >> >> > > > > > > > > > that > >> > > > > >> >> > > > > > > > > > > > I'm > >> > > > > >> >> > > > > > > > > > > > >> > > > missing, or is it actually > a > >> > good > >> > > > > idea? > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > >> > > > > >> >> > > > > > > > > > > > >> > > >> > > > > >> >> > > > > > > > > > > > >> > >> > > > > >> >> > > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > >> > > > > >> >> > > > > > > > > > >> > > > > >> >> > > > > > > > > >> > > > > >> >> > > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> > > > > > >> > > > > >> >> > > > > >> > > > > >> >> > > > >> > > > > >> >> > > >> > > > > >> >> > >> > > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > > >