On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov <kirpic...@google.com.invalid> wrote: > Hi Robert, > > Given the anticipated usage of this proposal in Java, I'm not sure the > Python approach you quoted is the right one.
Perhaps not, but does that mean it would be a Java-ism only or would we implement it in Python despite it being worse there? > The main reason: I see how it works with Map/FlatMap, but what about cases > like FileIO.write(), parameterized by several lambdas (element -> > destination, destination -> filename policy, destination -> sink), where > different lambdas may want to access different side inputs? It feels > excessive to make each of the lambdas take all of the side inputs in the > same order; moreover, if the composite transform internally needs to pass > some more side inputs to the DoFn's executing these lambdas, it will need > to manipulate the argument lists in nontrivial ways to make sure it passes > them only the side inputs the user asked for, and in the proper order. In Python it would be trivial to "slice" the side input arguments across the lambdas in a natural way, but I can see that this would be more of a pain in Java, especially as lambdas are unnecessarily crippled during compilation. > Another reason is, I think with Java's type system it's impossible to have > a NewDoFn-style API for lambdas, because annotations on lambda arguments > are dropped when the lambda is converted to the respective single-method > interface - a lambda is subject to a lot more type erasure than anonymous > class. Yeah, this is unfortunate. But, as mentioned, side inputs don't need to be annotated, just counted. For something like inspecting the window the NewDoFn has a lot of advantages over implicit access (and makes it so you can't "forget" to declare your dependency), but I do see advantages for the implicit way of doing things for delegating to other callables. On the other hand, there is a bit of precedence for this: metrics have the "implicit" api. If we do go this direction for side inputs, we should also consider it for state and side outputs. > On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw <rober...@google.com.invalid> > wrote: > >> +1 to reducing the amount of boilerplate for dealing with side inputs. >> >> I prefer the "NewDoFn" style of side inputs for consistency. The >> primary drawback seems to be lambda's incompatibility with >> annotations. This is solved in Python by letting all the first >> annotated argument of the process method be the main input, and >> subsequent ones be the side input. For example >> >> main_pcoll | beam.Map( >> lambda main_input_elem, side_input_value: main_input_elem + >> side_input_value, >> side_input_pvalue) >> >> For multiple side inputs they are mapped positionally (though Python >> has the advantage that arguments can be passed by keyword as well to >> enhance readability when there are many of them, and we allow that for >> side inputs). Note that side_input_pvalue is not referenced anywhere >> else, so we don't even have to store it and pass it around (one >> typically writes pvalue.AsList(some_pcoll) inline here). When the >> concrete PCollectionView is used to access the value this means that >> it must be passed separately to both the ParDo and the callback >> (unless we can infer it, which I don't think we can do in all (many?) >> cases). >> >> There's no reason we couldn't do this, or something very similar, in >> Java as well. >> >> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax <re...@google.com.invalid> >> wrote: >> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < >> > kirpic...@google.com.invalid> wrote: >> > >> >> Hi, >> >> >> >> I agree with these concerns to an extent, however I think the advantage >> of >> >> transparently letting any user code access side inputs, especially >> >> including lambdas, is so great that we should find a way to address >> these >> >> concerns within the constraints of the pattern I'm proposing. See more >> >> below. >> >> >> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers >> <bchamb...@google.com.invalid >> >> > >> >> wrote: >> >> >> >> > One possible issue with this is that updating a thread local is >> likely to >> >> > be much more expensive than passing an additional argument. >> >> >> >> This is an implementation detail that can be fixed - Luke made a >> suggestion >> >> on the PR to set up the side input context once per bundle rather than >> once >> >> per element. >> >> >> > >> > However remember that bundles might be small. Dataflow streaming runner >> > creates small bundles by design. The Flink runner creates single-element >> > bundles. >> > >> > >> >> >> >> >> >> > Also, not all >> >> > code called from within the DoFn will necessarily be in the same >> thread >> >> > (eg., sometimes we create a pool of threads for doing work). >> >> >> >> I think we already require that c.output() can not be done from multiple >> >> threads; and I don't think we document c.sideInput() to be thread-safe >> - it >> >> may be reasonable to declare that it isn't and has to be accessed from >> the >> >> same thread as the ProcessElement call. If we want to relax this, then >> >> there might be ways to deal with that too, e.g. provide utilities for >> the >> >> user to capture the "user code context" and restoring it inside a >> thread. >> >> This would likely be valuable for other purposes, such as making those >> >> extra threads visible to our profiling utilities. >> >> >> > >> > This seems fair, but we should be be very careful about our >> documentation. >> > And +1 to adding utilities to make multi-threaded work easier to manage. >> > >> >> >> >> >> >> > It may be >> >> > *more* confusing for this to sometimes work magically and sometimes >> fail >> >> > horribly. Also, requiring the PCollectionView to be passed to user >> code >> >> > that accesses it is nice because it makes *very clear* that the side >> >> input >> >> > needs to be provided from the DoFn to that particular utility. If it >> is >> >> > accessed via "spooky action at a distance" we lose that piece of >> "free" >> >> > documentation, which may lead to extensive misuse of these utility >> >> methods. >> >> > >> >> I'd like to understand this concern better - from this description it's >> not >> >> clear to me. The pattern I'm proposing is that, when you're authoring a >> >> PTransform that is configured by any user callbacks, then: >> >> - you should provide a builder method .withSideInputs(...) >> >> - you should propagate those side inputs to all your internal DoFn's >> that >> >> invoke the user code >> >> - in return the user callbacks will be allowed to access those >> particular >> >> side inputs >> >> This seems like a simple enough model to me to understand, both from a >> >> user's perspective and from a transform author's perspective. Steps 1 >> and 2 >> >> may eventually be automated by annotation analysis or other means (e.g. >> SDK >> >> giving a way to provide given side inputs automatically to everything >> >> inside a composite transform rather than to individual DoFn's). >> >> >> >> >> >> > >> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov >> >> > <kirpic...@google.com.invalid> wrote: >> >> > >> >> > > Hi, >> >> > > >> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles >> <k...@google.com.invalid >> >> > >> >> > > wrote: >> >> > > >> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < >> >> > > > kirpic...@google.com.invalid> wrote: >> >> > > > >> >> > > > > >> >> > > > > The differences are: >> >> > > > > - The proposal in the doc allows wiring different side inputs to >> >> the >> >> > > same >> >> > > > > Supplier, but I'm not convinced that this is important - you can >> >> just >> >> > > as >> >> > > > > easily call the constructor of your DoFn passing different >> >> > > > > PCollectionView's for it to capture. >> >> > > > > >> >> > > > >> >> > > > I disagree with this bit about it being "just as easy". Passing >> the >> >> > > needed >> >> > > > PCollectionViews to your constructor (or even having a >> constructor) >> >> is >> >> > a >> >> > > > pain. Every time I have to do it, it adds a ton of boilerplate >> that >> >> > feels >> >> > > > like pure noise. To make a DoFn reusable it must be made into a >> named >> >> > > class >> >> > > > with a constructor, versus inlined with no constructor. >> >> > > >> >> > > Hm, why? You can have the DoFn be an anonymous class capturing the >> >> > > PCollectionView into a @SideInput field as a closure. >> >> > > >> >> > > >> >> > > > A generous analogy >> >> > > > is is that it is "just" manual closure conversion/currying, >> changing >> >> > > > f(side, main) to f(side)(main). But in practice in Beam the second >> >> one >> >> > > has >> >> > > > much more boilerplate. >> >> > > > >> >> > > > Also, Beam is worse. We present the user with higher-order >> functions, >> >> > > which >> >> > > > is where the actual annoyance comes in. When you want to pardo(f) >> you >> >> > > have >> >> > > > to write pardo(f(side))(side, main). Your proposal is to support >> >> > > > pardo(f(side))(main) and mine is to support pardo(f)(side, main). >> I >> >> > still >> >> > > > propose that we support both (as they get implemented). If you >> buy in >> >> > to >> >> > > my >> >> > > > analogy, then there's decades of precedent and the burden of proof >> >> > falls >> >> > > > heavily on whoever doesn't want to support both. >> >> > > > >> >> > > I see your point. I think the proposal is compatible with what >> you're >> >> > > suggesting too - in DoFn we could have @SideInput *parameters* of >> type >> >> > > PCollectionView, with the same semantics as a field. >> >> > > >> >> > > >> >> > > > >> >> > > > - My proposal allows getting rid of .withSideInputs() entirely, >> >> because >> >> > > the >> >> > > > > DoFn captures the PCollectionView so you don't need to specify >> it >> >> > > > > explicitly for wiring. >> >> > > > > >> >> > > > >> >> > > > I've decided to change to full +1 (whatever that means compared to >> >> 0.75 >> >> > > :-) >> >> > > > to adding support for @SideInput fields, because the benefits >> >> outweigh >> >> > > this >> >> > > > failure mode: >> >> > > > >> >> > > > new DoFn { >> >> > > > // forgot the annotation >> >> > > > private final PCollectionView whatever; >> >> > > > >> >> > > > @ProcessElement public void process(...) { >> >> > > > whatever.get(); // crash during execution >> >> > > > } >> >> > > > } >> >> > > > >> >> > > > But ideas to mitigate that would be cool. >> >> > > >> >> > > Hm, can't think of anything less hacky than "prohibit having fields >> of >> >> > type >> >> > > PCollectionView that are not public, final, and annotated with >> >> > @SideInput" >> >> > > - not sure we'd want to go down this road. I suppose a good error >> >> message >> >> > > in .get() would be sufficient, saying "Did you forget to specify a >> >> > > requirement for this side input via .withSideInputs() or by >> annotating >> >> > the >> >> > > field as @SideInput" or something like that. >> >> > > >> >> > > > >> >> > > >> >> > > >> >> > > > Kenn >> >> > > > >> >> > > > >> >> > > > > >> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik >> >> <lc...@google.com.invalid >> >> > > >> >> > > > > wrote: >> >> > > > > >> >> > > > > > My concern with the proposal is not the specifics of how it >> will >> >> > work >> >> > > > and >> >> > > > > > more about it being yet another way on how our API is to be >> used >> >> > even >> >> > > > > > though we have a proposal [1] of an API style we were working >> >> > towards >> >> > > > in >> >> > > > > > Java and Python. I would rather re-open that discussion now >> about >> >> > > what >> >> > > > we >> >> > > > > > want that API to look like for our major features and work >> >> towards >> >> > > > > > consistency (or not if there is a strong argument as to why >> some >> >> > > > feature >> >> > > > > > should have a different style). >> >> > > > > > >> >> > > > > > 1: https://s.apache.org/a-new-dofn >> >> > > > > > >> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles >> >> > > > <k...@google.com.invalid >> >> > > > > > >> >> > > > > > wrote: >> >> > > > > > >> >> > > > > > > +0.75 because I'd like to bring up invalid pipelines. >> >> > > > > > > >> >> > > > > > > I had proposed side inputs as parameters to DoFn in >> >> > > > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so >> the >> >> > only >> >> > > > > place >> >> > > > > > > they are specified is in the graph construction, making the >> >> DoFn >> >> > > more >> >> > > > > > > reusable and errors impossible. I've actually been noodling >> my >> >> > way >> >> > > > > > towards >> >> > > > > > > this in a branch :-) >> >> > > > > > > >> >> > > > > > > Eugene's proposal is a sort of converse, where the side >> inputs >> >> > are >> >> > > > > values >> >> > > > > > > captured in the closure and not parameters, yet the only >> place >> >> > they >> >> > > > are >> >> > > > > > > specified is in the DoFn. >> >> > > > > > > >> >> > > > > > > I see no conflict between these two. It is very natural to >> have >> >> > > both >> >> > > > > the >> >> > > > > > > capability to accept parameters and the ability to capture >> >> > > variables >> >> > > > in >> >> > > > > > the >> >> > > > > > > closure. Supporting both is totally standard in up-to-date >> >> > > > programming >> >> > > > > > > languages. >> >> > > > > > > >> >> > > > > > > Today we have the worse of both worlds: PCollectionView >> behaves >> >> > as >> >> > > > > > > something captured in the closure/constructor, but must >> still >> >> be >> >> > > > > > explicitly >> >> > > > > > > wired up. >> >> > > > > > > >> >> > > > > > > But if I use PCollectionView.get() and have not wired it up >> in >> >> > any >> >> > > > way, >> >> > > > > > > what happens? Just like today, you can try to >> .sideInput(...) a >> >> > > thing >> >> > > > > > that >> >> > > > > > > is not available. With side inputs as parameters, this is >> not >> >> > > > possible. >> >> > > > > > If >> >> > > > > > > you want to treat them as captured in a closure, while >> avoiding >> >> > > > errors, >> >> > > > > > it >> >> > > > > > > seems like you might need to do some low-level magic, like >> the >> >> > > > > > > serialization-based detection that Luke has suggested before >> >> > (there >> >> > > > are >> >> > > > > > > known downsides that we haven't explored, like overcapture). >> >> > > > > > > >> >> > > > > > > Kenn >> >> > > > > > > >> >> > > > > > > [1] >> >> > > > > > > >> >> > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j >> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko >> >> > > > > > > >> >> > > > > > > >> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < >> >> > > > > > > kirpic...@google.com.invalid> wrote: >> >> > > > > > > >> >> > > > > > > > Hm, I guess you're right - for outputs it could be indeed >> >> quite >> >> > > > > > valuable >> >> > > > > > > to >> >> > > > > > > > output to them without plumbing (e.g. outputting errors). >> >> Could >> >> > > be >> >> > > > > done >> >> > > > > > > > perhaps via TupleTag.output()? (assuming the same TupleTag >> >> can >> >> > > not >> >> > > > be >> >> > > > > > > > reused to tag multiple PCollection's) >> >> > > > > > > > >> >> > > > > > > > For now I sent a PR for side input support >> >> > > > > > > > https://github.com/apache/beam/pull/3814 . >> >> > > > > > > > >> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik >> >> > > > <lc...@google.com.invalid >> >> > > > > > >> >> > > > > > > > wrote: >> >> > > > > > > > >> >> > > > > > > > > I disagree, state may not care where it is used as well >> >> > since a >> >> > > > > > person >> >> > > > > > > > may >> >> > > > > > > > > call a function which needs to store/retrieve state and >> >> > instead >> >> > > > of >> >> > > > > > > having >> >> > > > > > > > > the DoFn declare the StateSpec and then pass in the >> state >> >> > > > > > > implementation >> >> > > > > > > > > down into the function everywhere. Similarly for >> outputs, >> >> the >> >> > > > > > internal >> >> > > > > > > > > functions could take the TupleTag and request an output >> >> > manager >> >> > > > or >> >> > > > > > take >> >> > > > > > > > an >> >> > > > > > > > > "output" reference which give functions the ability to >> >> > produce >> >> > > > > output >> >> > > > > > > > > directly without needing to pass everything that is >> needed >> >> to >> >> > > be >> >> > > > > > output >> >> > > > > > > > > back to the caller. >> >> > > > > > > > > >> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < >> >> > > > > > > > > kirpic...@google.com.invalid> wrote: >> >> > > > > > > > > >> >> > > > > > > > > > Hm, I think of these things (state, side outputs >> etc.), >> >> > only >> >> > > > side >> >> > > > > > > > inputs >> >> > > > > > > > > > make sense to access in arbitrary user callbacks >> without >> >> > > > explicit >> >> > > > > > > > > knowledge >> >> > > > > > > > > > of the surrounding transform - so only side inputs >> can be >> >> > > > > implicit >> >> > > > > > > like >> >> > > > > > > > > > this. >> >> > > > > > > > > > >> >> > > > > > > > > > Ultimately we'll probably end up removing >> ProcessContext, >> >> > and >> >> > > > > > keeping >> >> > > > > > > > > only >> >> > > > > > > > > > annotations (on fields / methods / parameters). In >> that >> >> > > world, >> >> > > > a >> >> > > > > > > field >> >> > > > > > > > > > annotation could be used (like per my previous email) >> to >> >> > > > > statically >> >> > > > > > > > > specify >> >> > > > > > > > > > which side inputs will be needed - while the value >> could >> >> > > still >> >> > > > be >> >> > > > > > > > > accessed >> >> > > > > > > > > > via .get(), just like state cells are accessed via >> >> .read() >> >> > > and >> >> > > > > > > > .write(): >> >> > > > > > > > > > i.e., #get() is not a new method of access. >> >> > > > > > > > > > >> >> > > > > > > > > > Overall, it seems like I should proceed with the >> idea. I >> >> > > filed >> >> > > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. >> >> > > > > > > > > > >> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik >> >> > > > > > <lc...@google.com.invalid >> >> > > > > > > > >> >> > > > > > > > > > wrote: >> >> > > > > > > > > > >> >> > > > > > > > > > > For API consistency reasons, it would be good if we >> did >> >> > > this >> >> > > > > > > > > holistically >> >> > > > > > > > > > > and expanded this approach to state, side outputs, >> ... >> >> so >> >> > > > that >> >> > > > > a >> >> > > > > > > > person >> >> > > > > > > > > > can >> >> > > > > > > > > > > always call Something.get() to return something that >> >> they >> >> > > can >> >> > > > > > > access >> >> > > > > > > > > > > implementation wise. It will be confusing for our >> users >> >> > to >> >> > > > have >> >> > > > > > > many >> >> > > > > > > > > > > variations in our style of how all these concepts >> are >> >> > used >> >> > > > > > > > > > (ProcessContext >> >> > > > > > > > > > > / Annotations / #get()) >> >> > > > > > > > > > > >> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < >> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote: >> >> > > > > > > > > > > >> >> > > > > > > > > > > > Also, I think my approach is compatible with >> >> > annotations >> >> > > > and >> >> > > > > > > future >> >> > > > > > > > > > > removal >> >> > > > > > > > > > > > of .withSideInputs if we annotate a field: >> >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...; >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > class MyDoFn { >> >> > > > > > > > > > > > @SideInput >> >> > > > > > > > > > > > PCollectionView<Foo> foo = foo; >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > ...foo.get()... >> >> > > > > > > > > > > > } >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > We can extract the accessed views from the DoFn >> >> > instance >> >> > > > > using >> >> > > > > > > > > > > reflection. >> >> > > > > > > > > > > > Still not compatible with lambdas, but compatible >> >> > > > > automatically >> >> > > > > > > > with >> >> > > > > > > > > > all >> >> > > > > > > > > > > > anonymous classes. >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < >> >> > > > > > > > kirpic...@google.com> >> >> > > > > > > > > > > > wrote: >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > > Hi Luke, >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > I know this (annotations) is the pattern we were >> >> > > > > considering >> >> > > > > > > for >> >> > > > > > > > > side >> >> > > > > > > > > > > > > inputs, but I no longer think it is the best >> way to >> >> > > > access >> >> > > > > > > them. >> >> > > > > > > > > > > > > Annotations help getting rid of the >> >> .withSideInputs() >> >> > > > call, >> >> > > > > > but >> >> > > > > > > > > this >> >> > > > > > > > > > is >> >> > > > > > > > > > > > > where their advantage ends. >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > The advantages of the proposed approach are >> that it >> >> > > > > > > automatically >> >> > > > > > > > > > works >> >> > > > > > > > > > > > > with all existing callback or lambda code. No >> need >> >> to >> >> > > > > further >> >> > > > > > > > > develop >> >> > > > > > > > > > > the >> >> > > > > > > > > > > > > reflection machinery to support side input >> >> > annotations >> >> > > - >> >> > > > > and >> >> > > > > > > > > > especially >> >> > > > > > > > > > > > to >> >> > > > > > > > > > > > > support arbitrary user interfaces, no need to >> >> change >> >> > > > > existing >> >> > > > > > > > > > > transforms, >> >> > > > > > > > > > > > > no need for transform authors to even know that >> the >> >> > > > > machinery >> >> > > > > > > > > exists >> >> > > > > > > > > > to >> >> > > > > > > > > > > > > make side inputs usable in their transforms >> (and no >> >> > > need >> >> > > > > for >> >> > > > > > > > > authors >> >> > > > > > > > > > to >> >> > > > > > > > > > > > > think about whether or not they should support >> side >> >> > > > > inputs). >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > Moreover, like Reuven says, annotations don't >> work >> >> > with >> >> > > > > > lambdas >> >> > > > > > > > at >> >> > > > > > > > > > all: >> >> > > > > > > > > > > > > creating a lambda with a flexible set of >> annotation >> >> > > > > arguments >> >> > > > > > > > > appears >> >> > > > > > > > > > > to >> >> > > > > > > > > > > > be >> >> > > > > > > > > > > > > currently impossible, and even capturing the >> >> > > annotations >> >> > > > on >> >> > > > > > > > > arguments >> >> > > > > > > > > > > of >> >> > > > > > > > > > > > a >> >> > > > > > > > > > > > > lambda is I believe also impossible because the >> >> Java >> >> > > > > compiler >> >> > > > > > > > drops >> >> > > > > > > > > > > them >> >> > > > > > > > > > > > in >> >> > > > > > > > > > > > > the generated class or method handle. >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik >> >> > > > > > > > > <lc...@google.com.invalid >> >> > > > > > > > > > > >> >> > > > > > > > > > > > > wrote: >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> I believe we should follow the pattern that >> state >> >> > uses >> >> > > > and >> >> > > > > > > add a >> >> > > > > > > > > > type >> >> > > > > > > > > > > > >> annotation to link the side input definition to >> >> its >> >> > > > usage >> >> > > > > > > > > directly. >> >> > > > > > > > > > > This >> >> > > > > > > > > > > > >> would allow us to know that the side input was >> >> > > > definitely >> >> > > > > > > being >> >> > > > > > > > > > > accessed >> >> > > > > > > > > > > > >> and perform validation during graph >> construction >> >> for >> >> > > any >> >> > > > > > used >> >> > > > > > > > but >> >> > > > > > > > > > > > >> unspecified side inputs. >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> Code snippet: >> >> > > > > > > > > > > > >> final PCollectionView<String> foo = >> >> > > > > > pipeline.apply("fooName", >> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.< >> >> String>asSingleton()); >> >> > > > > > > > > > > > >> PCollection<String> output = pipeline >> >> > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) >> >> > > > > > > > > > > > >> .apply(MapElements.via( >> >> > > > > > > > > > > > >> new SimpleFunction<Integer, String>() { >> >> > > > > > > > > > > > >> @Override >> >> > > > > > > > > > > > >> public String apply(Integer input, >> >> > > > > > > > @SideInput("fooName") >> >> > > > > > > > > > > > String >> >> > > > > > > > > > > > >> fooValue) { >> >> > > > > > > > > > > > >> return fooValue + " " + input; >> >> > > > > > > > > > > > >> } >> >> > > > > > > > > > > > >> }).withSideInputs(foo));* >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene >> Kirpichov < >> >> > > > > > > > > > > > >> kirpic...@google.com.invalid> wrote: >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > >> > Sure, here's how a modified (passing) >> >> MapElements >> >> > > unit >> >> > > > > > test >> >> > > > > > > > > looks >> >> > > > > > > > > > > > like, >> >> > > > > > > > > > > > >> > with usage of side inputs: >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> > @Test >> >> > > > > > > > > > > > >> > @Category(NeedsRunner.class) >> >> > > > > > > > > > > > >> > public void testMapBasicWithSideInput() >> throws >> >> > > > > > Exception { >> >> > > > > > > > > > > > >> > * final PCollectionView<String> foo =* >> >> > > > > > > > > > > > >> > * pipeline.apply("foo", >> >> > > > > > > > > > > > >> > >> >> > > Create.of("foo")).apply(View.<String>asSingleton());* >> >> > > > > > > > > > > > >> > PCollection<String> output = pipeline >> >> > > > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) >> >> > > > > > > > > > > > >> > .apply(MapElements.via( >> >> > > > > > > > > > > > >> > new SimpleFunction<Integer, >> >> String>() >> >> > { >> >> > > > > > > > > > > > >> > @Override >> >> > > > > > > > > > > > >> > public String apply(Integer >> >> input) { >> >> > > > > > > > > > > > >> > return* foo.get() *+ " " + >> >> input; >> >> > > > > > > > > > > > >> > } >> >> > > > > > > > > > > > >> > }) >> >> > > > > > > > > > > > >> > *.withSideInputs(foo));* >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> > PAssert.that(output). >> >> containsInAnyOrder("foo >> >> > 1", >> >> > > > > "foo >> >> > > > > > > 2", >> >> > > > > > > > > > "foo >> >> > > > > > > > > > > > 3"); >> >> > > > > > > > > > > > >> > pipeline.run(); >> >> > > > > > > > > > > > >> > } >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax >> >> > > > > > > > > > <re...@google.com.invalid >> >> > > > > > > > > > > > >> >> > > > > > > > > > > > >> > wrote: >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> > > Can you provide a code snippet showing how >> >> this >> >> > > > would >> >> > > > > > > look? >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene >> >> > Kirpichov < >> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote: >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > > > TL;DR Introduce method >> >> PCollectionView.get(), >> >> > > > > > > implemented >> >> > > > > > > > > as: >> >> > > > > > > > > > > get >> >> > > > > > > > > > > > >> > > > thread-local ProcessContext and do >> >> > > > > c.sideInput(this). >> >> > > > > > > As a >> >> > > > > > > > > > > result, >> >> > > > > > > > > > > > >> any >> >> > > > > > > > > > > > >> > > user >> >> > > > > > > > > > > > >> > > > lambdas such as MapElements can use side >> >> > inputs. >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > Quite a few transforms have user-code >> >> > callbacks >> >> > > or >> >> > > > > > > > lambdas: >> >> > > > > > > > > > > ParDo >> >> > > > > > > > > > > > >> > (DoFn), >> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the >> DynamicDestinations >> >> > > > classes >> >> > > > > > in >> >> > > > > > > > > > various >> >> > > > > > > > > > > > IOs, >> >> > > > > > > > > > > > >> > > > combine fns, the PollFn callback in >> Watch, >> >> > etc. >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have >> >> > built-in >> >> > > > > > support >> >> > > > > > > > for >> >> > > > > > > > > > side >> >> > > > > > > > > > > > >> > inputs; >> >> > > > > > > > > > > > >> > > > for DynamicDestinations it is plumbed >> >> > > explicitly; >> >> > > > > > others >> >> > > > > > > > > don't >> >> > > > > > > > > > > > have >> >> > > > > > > > > > > > >> > > access >> >> > > > > > > > > > > > >> > > > (e.g. you can't access side inputs from >> >> > > > > > > > Map/FlatMapElements >> >> > > > > > > > > > > > because >> >> > > > > > > > > > > > >> > they >> >> > > > > > > > > > > > >> > > > don't have a ProcessContext or any >> context >> >> for >> >> > > > that >> >> > > > > > > > matter). >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > I think it's important to solve this, >> >> > especially >> >> > > > as >> >> > > > > > > Java 8 >> >> > > > > > > > > > > becomes >> >> > > > > > > > > > > > >> > > people's >> >> > > > > > > > > > > > >> > > > default choice: users will want to use >> side >> >> > > inputs >> >> > > > > in >> >> > > > > > > > > > > > >> > > Map/FlatMapElements. >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > It also appears to be quite easy to >> >> implement: >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > Runner part: >> >> > > > > > > > > > > > >> > > > - introduce a SideInputAccessor interface >> >> > > > > > > > > > > > >> > > > - make .get() on a PCollectionView get it >> >> > from a >> >> > > > > > > > > thread-local >> >> > > > > > > > > > > > >> > > > SideInputAccessor >> >> > > > > > > > > > > > >> > > > - make runners set the thread-local >> >> > > > > SideInputAccessor >> >> > > > > > > any >> >> > > > > > > > > time >> >> > > > > > > > > > > the >> >> > > > > > > > > > > > >> > runner >> >> > > > > > > > > > > > >> > > > is evaluating something in a context >> where >> >> > side >> >> > > > > inputs >> >> > > > > > > are >> >> > > > > > > > > > > > >> available, >> >> > > > > > > > > > > > >> > > e.g. >> >> > > > > > > > > > > > >> > > > a ProcessElement method, or applying a >> >> > > CombineFn - >> >> > > > > the >> >> > > > > > > set >> >> > > > > > > > > of >> >> > > > > > > > > > > such >> >> > > > > > > > > > > > >> > places >> >> > > > > > > > > > > > >> > > > will be quite small. I believe only >> runners >> >> > (but >> >> > > > not >> >> > > > > > > > > > transforms) >> >> > > > > > > > > > > > >> will >> >> > > > > > > > > > > > >> > > ever >> >> > > > > > > > > > > > >> > > > need to set this thread-local >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > Transform part: >> >> > > > > > > > > > > > >> > > > - Transforms that take user-code lambdas >> and >> >> > > want >> >> > > > to >> >> > > > > > let >> >> > > > > > > > > them >> >> > > > > > > > > > > > access >> >> > > > > > > > > > > > >> > side >> >> > > > > > > > > > > > >> > > > inputs still will need to be configurable >> >> > with a >> >> > > > > > method >> >> > > > > > > > like >> >> > > > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and will >> >> need >> >> > > to >> >> > > > > > plumb >> >> > > > > > > > > those >> >> > > > > > > > > > > down >> >> > > > > > > > > > > > >> to >> >> > > > > > > > > > > > >> > the >> >> > > > > > > > > > > > >> > > > primitive DoFn's or CombineFn's - >> >> information >> >> > on >> >> > > > > > *which* >> >> > > > > > > > > side >> >> > > > > > > > > > > > inputs >> >> > > > > > > > > > > > >> > will >> >> > > > > > > > > > > > >> > > > be accessed, of course, still needs to >> be in >> >> > the >> >> > > > > > graph. >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > > I quickly prototyped this in direct >> runner >> >> and >> >> > > > > > > > MapElements, >> >> > > > > > > > > > and >> >> > > > > > > > > > > it >> >> > > > > > > > > > > > >> > worked >> >> > > > > > > > > > > > >> > > > with no issues - I'm wondering if there's >> >> > > > something >> >> > > > > > > subtle >> >> > > > > > > > > > that >> >> > > > > > > > > > > > I'm >> >> > > > > > > > > > > > >> > > > missing, or is it actually a good idea? >> >> > > > > > > > > > > > >> > > > >> >> > > > > > > > > > > > >> > > >> >> > > > > > > > > > > > >> > >> >> > > > > > > > > > > > >> >> >> > > > > > > > > > > > > >> >> > > > > > > > > > > > >> >> > > > > > > > > > > >> >> > > > > > > > > > >> >> > > > > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> >> > > > > >> >> > > > >> >> > > >> >> > >> >> >>