On Wed, Sep 13, 2017 at 1:44 PM Robert Bradshaw <rober...@google.com.invalid> wrote:
> On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > Hi Robert, > > > > Given the anticipated usage of this proposal in Java, I'm not sure the > > Python approach you quoted is the right one. > > Perhaps not, but does that mean it would be a Java-ism only or would > we implement it in Python despite it being worse there? > I'm not sure, but I don't see why the proposed approach of view.get() wouldn't work well, or be harder to implement in Python. > > > The main reason: I see how it works with Map/FlatMap, but what about > cases > > like FileIO.write(), parameterized by several lambdas (element -> > > destination, destination -> filename policy, destination -> sink), where > > different lambdas may want to access different side inputs? It feels > > excessive to make each of the lambdas take all of the side inputs in the > > same order; moreover, if the composite transform internally needs to pass > > some more side inputs to the DoFn's executing these lambdas, it will need > > to manipulate the argument lists in nontrivial ways to make sure it > passes > > them only the side inputs the user asked for, and in the proper order. > > In Python it would be trivial to "slice" the side input arguments > across the lambdas in a natural way, but I can see that this would be > more of a pain in Java, especially as lambdas are unnecessarily > crippled during compilation. > > > Another reason is, I think with Java's type system it's impossible to > have > > a NewDoFn-style API for lambdas, because annotations on lambda arguments > > are dropped when the lambda is converted to the respective single-method > > interface - a lambda is subject to a lot more type erasure than anonymous > > class. > > Yeah, this is unfortunate. But, as mentioned, side inputs don't need > to be annotated, just counted. For something like inspecting the > window the NewDoFn has a lot of advantages over implicit access (and > makes it so you can't "forget" to declare your dependency), but I do > see advantages for the implicit way of doing things for delegating to > other callables. > > On the other hand, there is a bit of precedence for this: metrics have > the "implicit" api. If we do go this direction for side inputs, we > should also consider it for state and side outputs. > I think Kenn is very strongly against using it for state, whereas I don't have an opinion either way because I can't think of a use case for accessing state from a lambda - we should probably discuss this separately, with proposed code examples in front of us. For side outputs, yes, it might be nice to ".add()" to a PCollection, but it would require bigger changes - e.g. creating intermediate PCollection's and inserting an implicit Flatten in front of all steps that contribute to this PCollection, because a PCollection currently can be produced only by 1 step. Maybe there's a different way to express implicit side outputs. Either way I support the idea of looking for such a way because it would simplify use cases such as error handling dead-letter collections. I guess the bigger point is: do we want to block the discussion of implicit side inputs on making a decision about the implicitness of other things (side outputs, state, PipelineOptions, window etc). I can see the argument for a "yes, block", but can also see the argument for a "no, don't block" - because this proposal is (as indicated earlier in the thread) forward-compatible with annotation-based wiring, because we already have a precedent for implicit access of something via ValueProvider, and because of the advantages it offers. Want to mention another advantage: lambdas are likely to be much easier than NewDoFn approach to use from non-Java but JVM languages/SDKs (e.g. Scio), which might have even more type erasure, or might have less sophisticated annotation machinery, or NewDoFn-style anonymous classes might be highly non-idiomatic in them. Lambdas are idiomatic in every language that supports lambdas, which these days is basically every language. [I might be opening a can of worms here, but I guess you can consider this an argument against NewDoFn in general - though that's certainly outside the scope of this thread]. > > > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw > <rober...@google.com.invalid> > > wrote: > > > >> +1 to reducing the amount of boilerplate for dealing with side inputs. > >> > >> I prefer the "NewDoFn" style of side inputs for consistency. The > >> primary drawback seems to be lambda's incompatibility with > >> annotations. This is solved in Python by letting all the first > >> annotated argument of the process method be the main input, and > >> subsequent ones be the side input. For example > >> > >> main_pcoll | beam.Map( > >> lambda main_input_elem, side_input_value: main_input_elem + > >> side_input_value, > >> side_input_pvalue) > >> > >> For multiple side inputs they are mapped positionally (though Python > >> has the advantage that arguments can be passed by keyword as well to > >> enhance readability when there are many of them, and we allow that for > >> side inputs). Note that side_input_pvalue is not referenced anywhere > >> else, so we don't even have to store it and pass it around (one > >> typically writes pvalue.AsList(some_pcoll) inline here). When the > >> concrete PCollectionView is used to access the value this means that > >> it must be passed separately to both the ParDo and the callback > >> (unless we can infer it, which I don't think we can do in all (many?) > >> cases). > >> > >> There's no reason we couldn't do this, or something very similar, in > >> Java as well. > >> > >> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax <re...@google.com.invalid> > >> wrote: > >> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < > >> > kirpic...@google.com.invalid> wrote: > >> > > >> >> Hi, > >> >> > >> >> I agree with these concerns to an extent, however I think the > advantage > >> of > >> >> transparently letting any user code access side inputs, especially > >> >> including lambdas, is so great that we should find a way to address > >> these > >> >> concerns within the constraints of the pattern I'm proposing. See > more > >> >> below. > >> >> > >> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers > >> <bchamb...@google.com.invalid > >> >> > > >> >> wrote: > >> >> > >> >> > One possible issue with this is that updating a thread local is > >> likely to > >> >> > be much more expensive than passing an additional argument. > >> >> > >> >> This is an implementation detail that can be fixed - Luke made a > >> suggestion > >> >> on the PR to set up the side input context once per bundle rather > than > >> once > >> >> per element. > >> >> > >> > > >> > However remember that bundles might be small. Dataflow streaming > runner > >> > creates small bundles by design. The Flink runner creates > single-element > >> > bundles. > >> > > >> > > >> >> > >> >> > >> >> > Also, not all > >> >> > code called from within the DoFn will necessarily be in the same > >> thread > >> >> > (eg., sometimes we create a pool of threads for doing work). > >> >> > >> >> I think we already require that c.output() can not be done from > multiple > >> >> threads; and I don't think we document c.sideInput() to be > thread-safe > >> - it > >> >> may be reasonable to declare that it isn't and has to be accessed > from > >> the > >> >> same thread as the ProcessElement call. If we want to relax this, > then > >> >> there might be ways to deal with that too, e.g. provide utilities for > >> the > >> >> user to capture the "user code context" and restoring it inside a > >> thread. > >> >> This would likely be valuable for other purposes, such as making > those > >> >> extra threads visible to our profiling utilities. > >> >> > >> > > >> > This seems fair, but we should be be very careful about our > >> documentation. > >> > And +1 to adding utilities to make multi-threaded work easier to > manage. > >> > > >> >> > >> >> > >> >> > It may be > >> >> > *more* confusing for this to sometimes work magically and sometimes > >> fail > >> >> > horribly. Also, requiring the PCollectionView to be passed to user > >> code > >> >> > that accesses it is nice because it makes *very clear* that the > side > >> >> input > >> >> > needs to be provided from the DoFn to that particular utility. If > it > >> is > >> >> > accessed via "spooky action at a distance" we lose that piece of > >> "free" > >> >> > documentation, which may lead to extensive misuse of these utility > >> >> methods. > >> >> > > >> >> I'd like to understand this concern better - from this description > it's > >> not > >> >> clear to me. The pattern I'm proposing is that, when you're > authoring a > >> >> PTransform that is configured by any user callbacks, then: > >> >> - you should provide a builder method .withSideInputs(...) > >> >> - you should propagate those side inputs to all your internal DoFn's > >> that > >> >> invoke the user code > >> >> - in return the user callbacks will be allowed to access those > >> particular > >> >> side inputs > >> >> This seems like a simple enough model to me to understand, both from > a > >> >> user's perspective and from a transform author's perspective. Steps 1 > >> and 2 > >> >> may eventually be automated by annotation analysis or other means > (e.g. > >> SDK > >> >> giving a way to provide given side inputs automatically to everything > >> >> inside a composite transform rather than to individual DoFn's). > >> >> > >> >> > >> >> > > >> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov > >> >> > <kirpic...@google.com.invalid> wrote: > >> >> > > >> >> > > Hi, > >> >> > > > >> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles > >> <k...@google.com.invalid > >> >> > > >> >> > > wrote: > >> >> > > > >> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > >> >> > > > kirpic...@google.com.invalid> wrote: > >> >> > > > > >> >> > > > > > >> >> > > > > The differences are: > >> >> > > > > - The proposal in the doc allows wiring different side > inputs to > >> >> the > >> >> > > same > >> >> > > > > Supplier, but I'm not convinced that this is important - you > can > >> >> just > >> >> > > as > >> >> > > > > easily call the constructor of your DoFn passing different > >> >> > > > > PCollectionView's for it to capture. > >> >> > > > > > >> >> > > > > >> >> > > > I disagree with this bit about it being "just as easy". Passing > >> the > >> >> > > needed > >> >> > > > PCollectionViews to your constructor (or even having a > >> constructor) > >> >> is > >> >> > a > >> >> > > > pain. Every time I have to do it, it adds a ton of boilerplate > >> that > >> >> > feels > >> >> > > > like pure noise. To make a DoFn reusable it must be made into a > >> named > >> >> > > class > >> >> > > > with a constructor, versus inlined with no constructor. > >> >> > > > >> >> > > Hm, why? You can have the DoFn be an anonymous class capturing > the > >> >> > > PCollectionView into a @SideInput field as a closure. > >> >> > > > >> >> > > > >> >> > > > A generous analogy > >> >> > > > is is that it is "just" manual closure conversion/currying, > >> changing > >> >> > > > f(side, main) to f(side)(main). But in practice in Beam the > second > >> >> one > >> >> > > has > >> >> > > > much more boilerplate. > >> >> > > > > >> >> > > > Also, Beam is worse. We present the user with higher-order > >> functions, > >> >> > > which > >> >> > > > is where the actual annoyance comes in. When you want to > pardo(f) > >> you > >> >> > > have > >> >> > > > to write pardo(f(side))(side, main). Your proposal is to > support > >> >> > > > pardo(f(side))(main) and mine is to support pardo(f)(side, > main). > >> I > >> >> > still > >> >> > > > propose that we support both (as they get implemented). If you > >> buy in > >> >> > to > >> >> > > my > >> >> > > > analogy, then there's decades of precedent and the burden of > proof > >> >> > falls > >> >> > > > heavily on whoever doesn't want to support both. > >> >> > > > > >> >> > > I see your point. I think the proposal is compatible with what > >> you're > >> >> > > suggesting too - in DoFn we could have @SideInput *parameters* of > >> type > >> >> > > PCollectionView, with the same semantics as a field. > >> >> > > > >> >> > > > >> >> > > > > >> >> > > > - My proposal allows getting rid of .withSideInputs() entirely, > >> >> because > >> >> > > the > >> >> > > > > DoFn captures the PCollectionView so you don't need to > specify > >> it > >> >> > > > > explicitly for wiring. > >> >> > > > > > >> >> > > > > >> >> > > > I've decided to change to full +1 (whatever that means > compared to > >> >> 0.75 > >> >> > > :-) > >> >> > > > to adding support for @SideInput fields, because the benefits > >> >> outweigh > >> >> > > this > >> >> > > > failure mode: > >> >> > > > > >> >> > > > new DoFn { > >> >> > > > // forgot the annotation > >> >> > > > private final PCollectionView whatever; > >> >> > > > > >> >> > > > @ProcessElement public void process(...) { > >> >> > > > whatever.get(); // crash during execution > >> >> > > > } > >> >> > > > } > >> >> > > > > >> >> > > > But ideas to mitigate that would be cool. > >> >> > > > >> >> > > Hm, can't think of anything less hacky than "prohibit having > fields > >> of > >> >> > type > >> >> > > PCollectionView that are not public, final, and annotated with > >> >> > @SideInput" > >> >> > > - not sure we'd want to go down this road. I suppose a good error > >> >> message > >> >> > > in .get() would be sufficient, saying "Did you forget to specify > a > >> >> > > requirement for this side input via .withSideInputs() or by > >> annotating > >> >> > the > >> >> > > field as @SideInput" or something like that. > >> >> > > > >> >> > > > > >> >> > > > >> >> > > > >> >> > > > Kenn > >> >> > > > > >> >> > > > > >> >> > > > > > >> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik > >> >> <lc...@google.com.invalid > >> >> > > > >> >> > > > > wrote: > >> >> > > > > > >> >> > > > > > My concern with the proposal is not the specifics of how it > >> will > >> >> > work > >> >> > > > and > >> >> > > > > > more about it being yet another way on how our API is to be > >> used > >> >> > even > >> >> > > > > > though we have a proposal [1] of an API style we were > working > >> >> > towards > >> >> > > > in > >> >> > > > > > Java and Python. I would rather re-open that discussion now > >> about > >> >> > > what > >> >> > > > we > >> >> > > > > > want that API to look like for our major features and work > >> >> towards > >> >> > > > > > consistency (or not if there is a strong argument as to why > >> some > >> >> > > > feature > >> >> > > > > > should have a different style). > >> >> > > > > > > >> >> > > > > > 1: https://s.apache.org/a-new-dofn > >> >> > > > > > > >> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > >> >> > > > <k...@google.com.invalid > >> >> > > > > > > >> >> > > > > > wrote: > >> >> > > > > > > >> >> > > > > > > +0.75 because I'd like to bring up invalid pipelines. > >> >> > > > > > > > >> >> > > > > > > I had proposed side inputs as parameters to DoFn in > >> >> > > > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so > >> the > >> >> > only > >> >> > > > > place > >> >> > > > > > > they are specified is in the graph construction, making > the > >> >> DoFn > >> >> > > more > >> >> > > > > > > reusable and errors impossible. I've actually been > noodling > >> my > >> >> > way > >> >> > > > > > towards > >> >> > > > > > > this in a branch :-) > >> >> > > > > > > > >> >> > > > > > > Eugene's proposal is a sort of converse, where the side > >> inputs > >> >> > are > >> >> > > > > values > >> >> > > > > > > captured in the closure and not parameters, yet the only > >> place > >> >> > they > >> >> > > > are > >> >> > > > > > > specified is in the DoFn. > >> >> > > > > > > > >> >> > > > > > > I see no conflict between these two. It is very natural > to > >> have > >> >> > > both > >> >> > > > > the > >> >> > > > > > > capability to accept parameters and the ability to > capture > >> >> > > variables > >> >> > > > in > >> >> > > > > > the > >> >> > > > > > > closure. Supporting both is totally standard in > up-to-date > >> >> > > > programming > >> >> > > > > > > languages. > >> >> > > > > > > > >> >> > > > > > > Today we have the worse of both worlds: PCollectionView > >> behaves > >> >> > as > >> >> > > > > > > something captured in the closure/constructor, but must > >> still > >> >> be > >> >> > > > > > explicitly > >> >> > > > > > > wired up. > >> >> > > > > > > > >> >> > > > > > > But if I use PCollectionView.get() and have not wired it > up > >> in > >> >> > any > >> >> > > > way, > >> >> > > > > > > what happens? Just like today, you can try to > >> .sideInput(...) a > >> >> > > thing > >> >> > > > > > that > >> >> > > > > > > is not available. With side inputs as parameters, this is > >> not > >> >> > > > possible. > >> >> > > > > > If > >> >> > > > > > > you want to treat them as captured in a closure, while > >> avoiding > >> >> > > > errors, > >> >> > > > > > it > >> >> > > > > > > seems like you might need to do some low-level magic, > like > >> the > >> >> > > > > > > serialization-based detection that Luke has suggested > before > >> >> > (there > >> >> > > > are > >> >> > > > > > > known downsides that we haven't explored, like > overcapture). > >> >> > > > > > > > >> >> > > > > > > Kenn > >> >> > > > > > > > >> >> > > > > > > [1] > >> >> > > > > > > > >> >> > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > >> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > >> >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > >> >> > > > > > > kirpic...@google.com.invalid> wrote: > >> >> > > > > > > > >> >> > > > > > > > Hm, I guess you're right - for outputs it could be > indeed > >> >> quite > >> >> > > > > > valuable > >> >> > > > > > > to > >> >> > > > > > > > output to them without plumbing (e.g. outputting > errors). > >> >> Could > >> >> > > be > >> >> > > > > done > >> >> > > > > > > > perhaps via TupleTag.output()? (assuming the same > TupleTag > >> >> can > >> >> > > not > >> >> > > > be > >> >> > > > > > > > reused to tag multiple PCollection's) > >> >> > > > > > > > > >> >> > > > > > > > For now I sent a PR for side input support > >> >> > > > > > > > https://github.com/apache/beam/pull/3814 . > >> >> > > > > > > > > >> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > >> >> > > > <lc...@google.com.invalid > >> >> > > > > > > >> >> > > > > > > > wrote: > >> >> > > > > > > > > >> >> > > > > > > > > I disagree, state may not care where it is used as > well > >> >> > since a > >> >> > > > > > person > >> >> > > > > > > > may > >> >> > > > > > > > > call a function which needs to store/retrieve state > and > >> >> > instead > >> >> > > > of > >> >> > > > > > > having > >> >> > > > > > > > > the DoFn declare the StateSpec and then pass in the > >> state > >> >> > > > > > > implementation > >> >> > > > > > > > > down into the function everywhere. Similarly for > >> outputs, > >> >> the > >> >> > > > > > internal > >> >> > > > > > > > > functions could take the TupleTag and request an > output > >> >> > manager > >> >> > > > or > >> >> > > > > > take > >> >> > > > > > > > an > >> >> > > > > > > > > "output" reference which give functions the ability > to > >> >> > produce > >> >> > > > > output > >> >> > > > > > > > > directly without needing to pass everything that is > >> needed > >> >> to > >> >> > > be > >> >> > > > > > output > >> >> > > > > > > > > back to the caller. > >> >> > > > > > > > > > >> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > >> >> > > > > > > > > kirpic...@google.com.invalid> wrote: > >> >> > > > > > > > > > >> >> > > > > > > > > > Hm, I think of these things (state, side outputs > >> etc.), > >> >> > only > >> >> > > > side > >> >> > > > > > > > inputs > >> >> > > > > > > > > > make sense to access in arbitrary user callbacks > >> without > >> >> > > > explicit > >> >> > > > > > > > > knowledge > >> >> > > > > > > > > > of the surrounding transform - so only side inputs > >> can be > >> >> > > > > implicit > >> >> > > > > > > like > >> >> > > > > > > > > > this. > >> >> > > > > > > > > > > >> >> > > > > > > > > > Ultimately we'll probably end up removing > >> ProcessContext, > >> >> > and > >> >> > > > > > keeping > >> >> > > > > > > > > only > >> >> > > > > > > > > > annotations (on fields / methods / parameters). In > >> that > >> >> > > world, > >> >> > > > a > >> >> > > > > > > field > >> >> > > > > > > > > > annotation could be used (like per my previous > email) > >> to > >> >> > > > > statically > >> >> > > > > > > > > specify > >> >> > > > > > > > > > which side inputs will be needed - while the value > >> could > >> >> > > still > >> >> > > > be > >> >> > > > > > > > > accessed > >> >> > > > > > > > > > via .get(), just like state cells are accessed via > >> >> .read() > >> >> > > and > >> >> > > > > > > > .write(): > >> >> > > > > > > > > > i.e., #get() is not a new method of access. > >> >> > > > > > > > > > > >> >> > > > > > > > > > Overall, it seems like I should proceed with the > >> idea. I > >> >> > > filed > >> >> > > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > >> >> > > > > > > > > > > >> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > >> >> > > > > > <lc...@google.com.invalid > >> >> > > > > > > > > >> >> > > > > > > > > > wrote: > >> >> > > > > > > > > > > >> >> > > > > > > > > > > For API consistency reasons, it would be good if > we > >> did > >> >> > > this > >> >> > > > > > > > > holistically > >> >> > > > > > > > > > > and expanded this approach to state, side > outputs, > >> ... > >> >> so > >> >> > > > that > >> >> > > > > a > >> >> > > > > > > > person > >> >> > > > > > > > > > can > >> >> > > > > > > > > > > always call Something.get() to return something > that > >> >> they > >> >> > > can > >> >> > > > > > > access > >> >> > > > > > > > > > > implementation wise. It will be confusing for our > >> users > >> >> > to > >> >> > > > have > >> >> > > > > > > many > >> >> > > > > > > > > > > variations in our style of how all these concepts > >> are > >> >> > used > >> >> > > > > > > > > > (ProcessContext > >> >> > > > > > > > > > > / Annotations / #get()) > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov > < > >> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote: > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > Also, I think my approach is compatible with > >> >> > annotations > >> >> > > > and > >> >> > > > > > > future > >> >> > > > > > > > > > > removal > >> >> > > > > > > > > > > > of .withSideInputs if we annotate a field: > >> >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...; > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > class MyDoFn { > >> >> > > > > > > > > > > > @SideInput > >> >> > > > > > > > > > > > PCollectionView<Foo> foo = foo; > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > ...foo.get()... > >> >> > > > > > > > > > > > } > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > We can extract the accessed views from the DoFn > >> >> > instance > >> >> > > > > using > >> >> > > > > > > > > > > reflection. > >> >> > > > > > > > > > > > Still not compatible with lambdas, but > compatible > >> >> > > > > automatically > >> >> > > > > > > > with > >> >> > > > > > > > > > all > >> >> > > > > > > > > > > > anonymous classes. > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > >> >> > > > > > > > kirpic...@google.com> > >> >> > > > > > > > > > > > wrote: > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > Hi Luke, > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > I know this (annotations) is the pattern we > were > >> >> > > > > considering > >> >> > > > > > > for > >> >> > > > > > > > > side > >> >> > > > > > > > > > > > > inputs, but I no longer think it is the best > >> way to > >> >> > > > access > >> >> > > > > > > them. > >> >> > > > > > > > > > > > > Annotations help getting rid of the > >> >> .withSideInputs() > >> >> > > > call, > >> >> > > > > > but > >> >> > > > > > > > > this > >> >> > > > > > > > > > is > >> >> > > > > > > > > > > > > where their advantage ends. > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > The advantages of the proposed approach are > >> that it > >> >> > > > > > > automatically > >> >> > > > > > > > > > works > >> >> > > > > > > > > > > > > with all existing callback or lambda code. No > >> need > >> >> to > >> >> > > > > further > >> >> > > > > > > > > develop > >> >> > > > > > > > > > > the > >> >> > > > > > > > > > > > > reflection machinery to support side input > >> >> > annotations > >> >> > > - > >> >> > > > > and > >> >> > > > > > > > > > especially > >> >> > > > > > > > > > > > to > >> >> > > > > > > > > > > > > support arbitrary user interfaces, no need to > >> >> change > >> >> > > > > existing > >> >> > > > > > > > > > > transforms, > >> >> > > > > > > > > > > > > no need for transform authors to even know > that > >> the > >> >> > > > > machinery > >> >> > > > > > > > > exists > >> >> > > > > > > > > > to > >> >> > > > > > > > > > > > > make side inputs usable in their transforms > >> (and no > >> >> > > need > >> >> > > > > for > >> >> > > > > > > > > authors > >> >> > > > > > > > > > to > >> >> > > > > > > > > > > > > think about whether or not they should > support > >> side > >> >> > > > > inputs). > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > Moreover, like Reuven says, annotations don't > >> work > >> >> > with > >> >> > > > > > lambdas > >> >> > > > > > > > at > >> >> > > > > > > > > > all: > >> >> > > > > > > > > > > > > creating a lambda with a flexible set of > >> annotation > >> >> > > > > arguments > >> >> > > > > > > > > appears > >> >> > > > > > > > > > > to > >> >> > > > > > > > > > > > be > >> >> > > > > > > > > > > > > currently impossible, and even capturing the > >> >> > > annotations > >> >> > > > on > >> >> > > > > > > > > arguments > >> >> > > > > > > > > > > of > >> >> > > > > > > > > > > > a > >> >> > > > > > > > > > > > > lambda is I believe also impossible because > the > >> >> Java > >> >> > > > > compiler > >> >> > > > > > > > drops > >> >> > > > > > > > > > > them > >> >> > > > > > > > > > > > in > >> >> > > > > > > > > > > > > the generated class or method handle. > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > >> >> > > > > > > > > <lc...@google.com.invalid > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > > wrote: > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > >> I believe we should follow the pattern that > >> state > >> >> > uses > >> >> > > > and > >> >> > > > > > > add a > >> >> > > > > > > > > > type > >> >> > > > > > > > > > > > >> annotation to link the side input > definition to > >> >> its > >> >> > > > usage > >> >> > > > > > > > > directly. > >> >> > > > > > > > > > > This > >> >> > > > > > > > > > > > >> would allow us to know that the side input > was > >> >> > > > definitely > >> >> > > > > > > being > >> >> > > > > > > > > > > accessed > >> >> > > > > > > > > > > > >> and perform validation during graph > >> construction > >> >> for > >> >> > > any > >> >> > > > > > used > >> >> > > > > > > > but > >> >> > > > > > > > > > > > >> unspecified side inputs. > >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> Code snippet: > >> >> > > > > > > > > > > > >> final PCollectionView<String> foo = > >> >> > > > > > pipeline.apply("fooName", > >> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.< > >> >> String>asSingleton()); > >> >> > > > > > > > > > > > >> PCollection<String> output = pipeline > >> >> > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > >> >> > > > > > > > > > > > >> .apply(MapElements.via( > >> >> > > > > > > > > > > > >> new SimpleFunction<Integer, > String>() { > >> >> > > > > > > > > > > > >> @Override > >> >> > > > > > > > > > > > >> public String apply(Integer input, > >> >> > > > > > > > @SideInput("fooName") > >> >> > > > > > > > > > > > String > >> >> > > > > > > > > > > > >> fooValue) { > >> >> > > > > > > > > > > > >> return fooValue + " " + input; > >> >> > > > > > > > > > > > >> } > >> >> > > > > > > > > > > > >> }).withSideInputs(foo));* > >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene > >> Kirpichov < > >> >> > > > > > > > > > > > >> kirpic...@google.com.invalid> wrote: > >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> > Sure, here's how a modified (passing) > >> >> MapElements > >> >> > > unit > >> >> > > > > > test > >> >> > > > > > > > > looks > >> >> > > > > > > > > > > > like, > >> >> > > > > > > > > > > > >> > with usage of side inputs: > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > @Test > >> >> > > > > > > > > > > > >> > @Category(NeedsRunner.class) > >> >> > > > > > > > > > > > >> > public void testMapBasicWithSideInput() > >> throws > >> >> > > > > > Exception { > >> >> > > > > > > > > > > > >> > * final PCollectionView<String> foo =* > >> >> > > > > > > > > > > > >> > * pipeline.apply("foo", > >> >> > > > > > > > > > > > >> > > >> >> > > Create.of("foo")).apply(View.<String>asSingleton());* > >> >> > > > > > > > > > > > >> > PCollection<String> output = pipeline > >> >> > > > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) > >> >> > > > > > > > > > > > >> > .apply(MapElements.via( > >> >> > > > > > > > > > > > >> > new SimpleFunction<Integer, > >> >> String>() > >> >> > { > >> >> > > > > > > > > > > > >> > @Override > >> >> > > > > > > > > > > > >> > public String apply(Integer > >> >> input) { > >> >> > > > > > > > > > > > >> > return* foo.get() *+ " " + > >> >> input; > >> >> > > > > > > > > > > > >> > } > >> >> > > > > > > > > > > > >> > }) > >> >> > > > > > > > > > > > >> > *.withSideInputs(foo));* > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > PAssert.that(output). > >> >> containsInAnyOrder("foo > >> >> > 1", > >> >> > > > > "foo > >> >> > > > > > > 2", > >> >> > > > > > > > > > "foo > >> >> > > > > > > > > > > > 3"); > >> >> > > > > > > > > > > > >> > pipeline.run(); > >> >> > > > > > > > > > > > >> > } > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > >> >> > > > > > > > > > <re...@google.com.invalid > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> > wrote: > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > > Can you provide a code snippet showing > how > >> >> this > >> >> > > > would > >> >> > > > > > > look? > >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene > >> >> > Kirpichov < > >> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote: > >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > TL;DR Introduce method > >> >> PCollectionView.get(), > >> >> > > > > > > implemented > >> >> > > > > > > > > as: > >> >> > > > > > > > > > > get > >> >> > > > > > > > > > > > >> > > > thread-local ProcessContext and do > >> >> > > > > c.sideInput(this). > >> >> > > > > > > As a > >> >> > > > > > > > > > > result, > >> >> > > > > > > > > > > > >> any > >> >> > > > > > > > > > > > >> > > user > >> >> > > > > > > > > > > > >> > > > lambdas such as MapElements can use > side > >> >> > inputs. > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Quite a few transforms have user-code > >> >> > callbacks > >> >> > > or > >> >> > > > > > > > lambdas: > >> >> > > > > > > > > > > ParDo > >> >> > > > > > > > > > > > >> > (DoFn), > >> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the > >> DynamicDestinations > >> >> > > > classes > >> >> > > > > > in > >> >> > > > > > > > > > various > >> >> > > > > > > > > > > > IOs, > >> >> > > > > > > > > > > > >> > > > combine fns, the PollFn callback in > >> Watch, > >> >> > etc. > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have > >> >> > built-in > >> >> > > > > > support > >> >> > > > > > > > for > >> >> > > > > > > > > > side > >> >> > > > > > > > > > > > >> > inputs; > >> >> > > > > > > > > > > > >> > > > for DynamicDestinations it is plumbed > >> >> > > explicitly; > >> >> > > > > > others > >> >> > > > > > > > > don't > >> >> > > > > > > > > > > > have > >> >> > > > > > > > > > > > >> > > access > >> >> > > > > > > > > > > > >> > > > (e.g. you can't access side inputs > from > >> >> > > > > > > > Map/FlatMapElements > >> >> > > > > > > > > > > > because > >> >> > > > > > > > > > > > >> > they > >> >> > > > > > > > > > > > >> > > > don't have a ProcessContext or any > >> context > >> >> for > >> >> > > > that > >> >> > > > > > > > matter). > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > I think it's important to solve this, > >> >> > especially > >> >> > > > as > >> >> > > > > > > Java 8 > >> >> > > > > > > > > > > becomes > >> >> > > > > > > > > > > > >> > > people's > >> >> > > > > > > > > > > > >> > > > default choice: users will want to use > >> side > >> >> > > inputs > >> >> > > > > in > >> >> > > > > > > > > > > > >> > > Map/FlatMapElements. > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > It also appears to be quite easy to > >> >> implement: > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Runner part: > >> >> > > > > > > > > > > > >> > > > - introduce a SideInputAccessor > interface > >> >> > > > > > > > > > > > >> > > > - make .get() on a PCollectionView > get it > >> >> > from a > >> >> > > > > > > > > thread-local > >> >> > > > > > > > > > > > >> > > > SideInputAccessor > >> >> > > > > > > > > > > > >> > > > - make runners set the thread-local > >> >> > > > > SideInputAccessor > >> >> > > > > > > any > >> >> > > > > > > > > time > >> >> > > > > > > > > > > the > >> >> > > > > > > > > > > > >> > runner > >> >> > > > > > > > > > > > >> > > > is evaluating something in a context > >> where > >> >> > side > >> >> > > > > inputs > >> >> > > > > > > are > >> >> > > > > > > > > > > > >> available, > >> >> > > > > > > > > > > > >> > > e.g. > >> >> > > > > > > > > > > > >> > > > a ProcessElement method, or applying a > >> >> > > CombineFn - > >> >> > > > > the > >> >> > > > > > > set > >> >> > > > > > > > > of > >> >> > > > > > > > > > > such > >> >> > > > > > > > > > > > >> > places > >> >> > > > > > > > > > > > >> > > > will be quite small. I believe only > >> runners > >> >> > (but > >> >> > > > not > >> >> > > > > > > > > > transforms) > >> >> > > > > > > > > > > > >> will > >> >> > > > > > > > > > > > >> > > ever > >> >> > > > > > > > > > > > >> > > > need to set this thread-local > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > Transform part: > >> >> > > > > > > > > > > > >> > > > - Transforms that take user-code > lambdas > >> and > >> >> > > want > >> >> > > > to > >> >> > > > > > let > >> >> > > > > > > > > them > >> >> > > > > > > > > > > > access > >> >> > > > > > > > > > > > >> > side > >> >> > > > > > > > > > > > >> > > > inputs still will need to be > configurable > >> >> > with a > >> >> > > > > > method > >> >> > > > > > > > like > >> >> > > > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and > will > >> >> need > >> >> > > to > >> >> > > > > > plumb > >> >> > > > > > > > > those > >> >> > > > > > > > > > > down > >> >> > > > > > > > > > > > >> to > >> >> > > > > > > > > > > > >> > the > >> >> > > > > > > > > > > > >> > > > primitive DoFn's or CombineFn's - > >> >> information > >> >> > on > >> >> > > > > > *which* > >> >> > > > > > > > > side > >> >> > > > > > > > > > > > inputs > >> >> > > > > > > > > > > > >> > will > >> >> > > > > > > > > > > > >> > > > be accessed, of course, still needs to > >> be in > >> >> > the > >> >> > > > > > graph. > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > I quickly prototyped this in direct > >> runner > >> >> and > >> >> > > > > > > > MapElements, > >> >> > > > > > > > > > and > >> >> > > > > > > > > > > it > >> >> > > > > > > > > > > > >> > worked > >> >> > > > > > > > > > > > >> > > > with no issues - I'm wondering if > there's > >> >> > > > something > >> >> > > > > > > subtle > >> >> > > > > > > > > > that > >> >> > > > > > > > > > > > I'm > >> >> > > > > > > > > > > > >> > > > missing, or is it actually a good > idea? > >> >> > > > > > > > > > > > >> > > > > >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > > > >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >