Hi Robert, Given the anticipated usage of this proposal in Java, I'm not sure the Python approach you quoted is the right one.
The main reason: I see how it works with Map/FlatMap, but what about cases like FileIO.write(), parameterized by several lambdas (element -> destination, destination -> filename policy, destination -> sink), where different lambdas may want to access different side inputs? It feels excessive to make each of the lambdas take all of the side inputs in the same order; moreover, if the composite transform internally needs to pass some more side inputs to the DoFn's executing these lambdas, it will need to manipulate the argument lists in nontrivial ways to make sure it passes them only the side inputs the user asked for, and in the proper order. Another reason is, I think with Java's type system it's impossible to have a NewDoFn-style API for lambdas, because annotations on lambda arguments are dropped when the lambda is converted to the respective single-method interface - a lambda is subject to a lot more type erasure than anonymous class. On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw <rober...@google.com.invalid> wrote: > +1 to reducing the amount of boilerplate for dealing with side inputs. > > I prefer the "NewDoFn" style of side inputs for consistency. The > primary drawback seems to be lambda's incompatibility with > annotations. This is solved in Python by letting all the first > annotated argument of the process method be the main input, and > subsequent ones be the side input. For example > > main_pcoll | beam.Map( > lambda main_input_elem, side_input_value: main_input_elem + > side_input_value, > side_input_pvalue) > > For multiple side inputs they are mapped positionally (though Python > has the advantage that arguments can be passed by keyword as well to > enhance readability when there are many of them, and we allow that for > side inputs). Note that side_input_pvalue is not referenced anywhere > else, so we don't even have to store it and pass it around (one > typically writes pvalue.AsList(some_pcoll) inline here). When the > concrete PCollectionView is used to access the value this means that > it must be passed separately to both the ParDo and the callback > (unless we can infer it, which I don't think we can do in all (many?) > cases). > > There's no reason we couldn't do this, or something very similar, in > Java as well. > > On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax <re...@google.com.invalid> > wrote: > > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov < > > kirpic...@google.com.invalid> wrote: > > > >> Hi, > >> > >> I agree with these concerns to an extent, however I think the advantage > of > >> transparently letting any user code access side inputs, especially > >> including lambdas, is so great that we should find a way to address > these > >> concerns within the constraints of the pattern I'm proposing. See more > >> below. > >> > >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers > <bchamb...@google.com.invalid > >> > > >> wrote: > >> > >> > One possible issue with this is that updating a thread local is > likely to > >> > be much more expensive than passing an additional argument. > >> > >> This is an implementation detail that can be fixed - Luke made a > suggestion > >> on the PR to set up the side input context once per bundle rather than > once > >> per element. > >> > > > > However remember that bundles might be small. Dataflow streaming runner > > creates small bundles by design. The Flink runner creates single-element > > bundles. > > > > > >> > >> > >> > Also, not all > >> > code called from within the DoFn will necessarily be in the same > thread > >> > (eg., sometimes we create a pool of threads for doing work). > >> > >> I think we already require that c.output() can not be done from multiple > >> threads; and I don't think we document c.sideInput() to be thread-safe > - it > >> may be reasonable to declare that it isn't and has to be accessed from > the > >> same thread as the ProcessElement call. If we want to relax this, then > >> there might be ways to deal with that too, e.g. provide utilities for > the > >> user to capture the "user code context" and restoring it inside a > thread. > >> This would likely be valuable for other purposes, such as making those > >> extra threads visible to our profiling utilities. > >> > > > > This seems fair, but we should be be very careful about our > documentation. > > And +1 to adding utilities to make multi-threaded work easier to manage. > > > >> > >> > >> > It may be > >> > *more* confusing for this to sometimes work magically and sometimes > fail > >> > horribly. Also, requiring the PCollectionView to be passed to user > code > >> > that accesses it is nice because it makes *very clear* that the side > >> input > >> > needs to be provided from the DoFn to that particular utility. If it > is > >> > accessed via "spooky action at a distance" we lose that piece of > "free" > >> > documentation, which may lead to extensive misuse of these utility > >> methods. > >> > > >> I'd like to understand this concern better - from this description it's > not > >> clear to me. The pattern I'm proposing is that, when you're authoring a > >> PTransform that is configured by any user callbacks, then: > >> - you should provide a builder method .withSideInputs(...) > >> - you should propagate those side inputs to all your internal DoFn's > that > >> invoke the user code > >> - in return the user callbacks will be allowed to access those > particular > >> side inputs > >> This seems like a simple enough model to me to understand, both from a > >> user's perspective and from a transform author's perspective. Steps 1 > and 2 > >> may eventually be automated by annotation analysis or other means (e.g. > SDK > >> giving a way to provide given side inputs automatically to everything > >> inside a composite transform rather than to individual DoFn's). > >> > >> > >> > > >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov > >> > <kirpic...@google.com.invalid> wrote: > >> > > >> > > Hi, > >> > > > >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles > <k...@google.com.invalid > >> > > >> > > wrote: > >> > > > >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > >> > > > kirpic...@google.com.invalid> wrote: > >> > > > > >> > > > > > >> > > > > The differences are: > >> > > > > - The proposal in the doc allows wiring different side inputs to > >> the > >> > > same > >> > > > > Supplier, but I'm not convinced that this is important - you can > >> just > >> > > as > >> > > > > easily call the constructor of your DoFn passing different > >> > > > > PCollectionView's for it to capture. > >> > > > > > >> > > > > >> > > > I disagree with this bit about it being "just as easy". Passing > the > >> > > needed > >> > > > PCollectionViews to your constructor (or even having a > constructor) > >> is > >> > a > >> > > > pain. Every time I have to do it, it adds a ton of boilerplate > that > >> > feels > >> > > > like pure noise. To make a DoFn reusable it must be made into a > named > >> > > class > >> > > > with a constructor, versus inlined with no constructor. > >> > > > >> > > Hm, why? You can have the DoFn be an anonymous class capturing the > >> > > PCollectionView into a @SideInput field as a closure. > >> > > > >> > > > >> > > > A generous analogy > >> > > > is is that it is "just" manual closure conversion/currying, > changing > >> > > > f(side, main) to f(side)(main). But in practice in Beam the second > >> one > >> > > has > >> > > > much more boilerplate. > >> > > > > >> > > > Also, Beam is worse. We present the user with higher-order > functions, > >> > > which > >> > > > is where the actual annoyance comes in. When you want to pardo(f) > you > >> > > have > >> > > > to write pardo(f(side))(side, main). Your proposal is to support > >> > > > pardo(f(side))(main) and mine is to support pardo(f)(side, main). > I > >> > still > >> > > > propose that we support both (as they get implemented). If you > buy in > >> > to > >> > > my > >> > > > analogy, then there's decades of precedent and the burden of proof > >> > falls > >> > > > heavily on whoever doesn't want to support both. > >> > > > > >> > > I see your point. I think the proposal is compatible with what > you're > >> > > suggesting too - in DoFn we could have @SideInput *parameters* of > type > >> > > PCollectionView, with the same semantics as a field. > >> > > > >> > > > >> > > > > >> > > > - My proposal allows getting rid of .withSideInputs() entirely, > >> because > >> > > the > >> > > > > DoFn captures the PCollectionView so you don't need to specify > it > >> > > > > explicitly for wiring. > >> > > > > > >> > > > > >> > > > I've decided to change to full +1 (whatever that means compared to > >> 0.75 > >> > > :-) > >> > > > to adding support for @SideInput fields, because the benefits > >> outweigh > >> > > this > >> > > > failure mode: > >> > > > > >> > > > new DoFn { > >> > > > // forgot the annotation > >> > > > private final PCollectionView whatever; > >> > > > > >> > > > @ProcessElement public void process(...) { > >> > > > whatever.get(); // crash during execution > >> > > > } > >> > > > } > >> > > > > >> > > > But ideas to mitigate that would be cool. > >> > > > >> > > Hm, can't think of anything less hacky than "prohibit having fields > of > >> > type > >> > > PCollectionView that are not public, final, and annotated with > >> > @SideInput" > >> > > - not sure we'd want to go down this road. I suppose a good error > >> message > >> > > in .get() would be sufficient, saying "Did you forget to specify a > >> > > requirement for this side input via .withSideInputs() or by > annotating > >> > the > >> > > field as @SideInput" or something like that. > >> > > > >> > > > > >> > > > >> > > > >> > > > Kenn > >> > > > > >> > > > > >> > > > > > >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik > >> <lc...@google.com.invalid > >> > > > >> > > > > wrote: > >> > > > > > >> > > > > > My concern with the proposal is not the specifics of how it > will > >> > work > >> > > > and > >> > > > > > more about it being yet another way on how our API is to be > used > >> > even > >> > > > > > though we have a proposal [1] of an API style we were working > >> > towards > >> > > > in > >> > > > > > Java and Python. I would rather re-open that discussion now > about > >> > > what > >> > > > we > >> > > > > > want that API to look like for our major features and work > >> towards > >> > > > > > consistency (or not if there is a strong argument as to why > some > >> > > > feature > >> > > > > > should have a different style). > >> > > > > > > >> > > > > > 1: https://s.apache.org/a-new-dofn > >> > > > > > > >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > >> > > > <k...@google.com.invalid > >> > > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > +0.75 because I'd like to bring up invalid pipelines. > >> > > > > > > > >> > > > > > > I had proposed side inputs as parameters to DoFn in > >> > > > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so > the > >> > only > >> > > > > place > >> > > > > > > they are specified is in the graph construction, making the > >> DoFn > >> > > more > >> > > > > > > reusable and errors impossible. I've actually been noodling > my > >> > way > >> > > > > > towards > >> > > > > > > this in a branch :-) > >> > > > > > > > >> > > > > > > Eugene's proposal is a sort of converse, where the side > inputs > >> > are > >> > > > > values > >> > > > > > > captured in the closure and not parameters, yet the only > place > >> > they > >> > > > are > >> > > > > > > specified is in the DoFn. > >> > > > > > > > >> > > > > > > I see no conflict between these two. It is very natural to > have > >> > > both > >> > > > > the > >> > > > > > > capability to accept parameters and the ability to capture > >> > > variables > >> > > > in > >> > > > > > the > >> > > > > > > closure. Supporting both is totally standard in up-to-date > >> > > > programming > >> > > > > > > languages. > >> > > > > > > > >> > > > > > > Today we have the worse of both worlds: PCollectionView > behaves > >> > as > >> > > > > > > something captured in the closure/constructor, but must > still > >> be > >> > > > > > explicitly > >> > > > > > > wired up. > >> > > > > > > > >> > > > > > > But if I use PCollectionView.get() and have not wired it up > in > >> > any > >> > > > way, > >> > > > > > > what happens? Just like today, you can try to > .sideInput(...) a > >> > > thing > >> > > > > > that > >> > > > > > > is not available. With side inputs as parameters, this is > not > >> > > > possible. > >> > > > > > If > >> > > > > > > you want to treat them as captured in a closure, while > avoiding > >> > > > errors, > >> > > > > > it > >> > > > > > > seems like you might need to do some low-level magic, like > the > >> > > > > > > serialization-based detection that Luke has suggested before > >> > (there > >> > > > are > >> > > > > > > known downsides that we haven't explored, like overcapture). > >> > > > > > > > >> > > > > > > Kenn > >> > > > > > > > >> > > > > > > [1] > >> > > > > > > > >> > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > >> > > > > > > > >> > > > > > > > >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > >> > > > > > > kirpic...@google.com.invalid> wrote: > >> > > > > > > > >> > > > > > > > Hm, I guess you're right - for outputs it could be indeed > >> quite > >> > > > > > valuable > >> > > > > > > to > >> > > > > > > > output to them without plumbing (e.g. outputting errors). > >> Could > >> > > be > >> > > > > done > >> > > > > > > > perhaps via TupleTag.output()? (assuming the same TupleTag > >> can > >> > > not > >> > > > be > >> > > > > > > > reused to tag multiple PCollection's) > >> > > > > > > > > >> > > > > > > > For now I sent a PR for side input support > >> > > > > > > > https://github.com/apache/beam/pull/3814 . > >> > > > > > > > > >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > >> > > > <lc...@google.com.invalid > >> > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > I disagree, state may not care where it is used as well > >> > since a > >> > > > > > person > >> > > > > > > > may > >> > > > > > > > > call a function which needs to store/retrieve state and > >> > instead > >> > > > of > >> > > > > > > having > >> > > > > > > > > the DoFn declare the StateSpec and then pass in the > state > >> > > > > > > implementation > >> > > > > > > > > down into the function everywhere. Similarly for > outputs, > >> the > >> > > > > > internal > >> > > > > > > > > functions could take the TupleTag and request an output > >> > manager > >> > > > or > >> > > > > > take > >> > > > > > > > an > >> > > > > > > > > "output" reference which give functions the ability to > >> > produce > >> > > > > output > >> > > > > > > > > directly without needing to pass everything that is > needed > >> to > >> > > be > >> > > > > > output > >> > > > > > > > > back to the caller. > >> > > > > > > > > > >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > >> > > > > > > > > kirpic...@google.com.invalid> wrote: > >> > > > > > > > > > >> > > > > > > > > > Hm, I think of these things (state, side outputs > etc.), > >> > only > >> > > > side > >> > > > > > > > inputs > >> > > > > > > > > > make sense to access in arbitrary user callbacks > without > >> > > > explicit > >> > > > > > > > > knowledge > >> > > > > > > > > > of the surrounding transform - so only side inputs > can be > >> > > > > implicit > >> > > > > > > like > >> > > > > > > > > > this. > >> > > > > > > > > > > >> > > > > > > > > > Ultimately we'll probably end up removing > ProcessContext, > >> > and > >> > > > > > keeping > >> > > > > > > > > only > >> > > > > > > > > > annotations (on fields / methods / parameters). In > that > >> > > world, > >> > > > a > >> > > > > > > field > >> > > > > > > > > > annotation could be used (like per my previous email) > to > >> > > > > statically > >> > > > > > > > > specify > >> > > > > > > > > > which side inputs will be needed - while the value > could > >> > > still > >> > > > be > >> > > > > > > > > accessed > >> > > > > > > > > > via .get(), just like state cells are accessed via > >> .read() > >> > > and > >> > > > > > > > .write(): > >> > > > > > > > > > i.e., #get() is not a new method of access. > >> > > > > > > > > > > >> > > > > > > > > > Overall, it seems like I should proceed with the > idea. I > >> > > filed > >> > > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > >> > > > > > > > > > > >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > >> > > > > > <lc...@google.com.invalid > >> > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > For API consistency reasons, it would be good if we > did > >> > > this > >> > > > > > > > > holistically > >> > > > > > > > > > > and expanded this approach to state, side outputs, > ... > >> so > >> > > > that > >> > > > > a > >> > > > > > > > person > >> > > > > > > > > > can > >> > > > > > > > > > > always call Something.get() to return something that > >> they > >> > > can > >> > > > > > > access > >> > > > > > > > > > > implementation wise. It will be confusing for our > users > >> > to > >> > > > have > >> > > > > > > many > >> > > > > > > > > > > variations in our style of how all these concepts > are > >> > used > >> > > > > > > > > > (ProcessContext > >> > > > > > > > > > > / Annotations / #get()) > >> > > > > > > > > > > > >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > >> > > > > > > > > > > kirpic...@google.com.invalid> wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Also, I think my approach is compatible with > >> > annotations > >> > > > and > >> > > > > > > future > >> > > > > > > > > > > removal > >> > > > > > > > > > > > of .withSideInputs if we annotate a field: > >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...; > >> > > > > > > > > > > > > >> > > > > > > > > > > > class MyDoFn { > >> > > > > > > > > > > > @SideInput > >> > > > > > > > > > > > PCollectionView<Foo> foo = foo; > >> > > > > > > > > > > > > >> > > > > > > > > > > > ...foo.get()... > >> > > > > > > > > > > > } > >> > > > > > > > > > > > > >> > > > > > > > > > > > We can extract the accessed views from the DoFn > >> > instance > >> > > > > using > >> > > > > > > > > > > reflection. > >> > > > > > > > > > > > Still not compatible with lambdas, but compatible > >> > > > > automatically > >> > > > > > > > with > >> > > > > > > > > > all > >> > > > > > > > > > > > anonymous classes. > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > >> > > > > > > > kirpic...@google.com> > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi Luke, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > I know this (annotations) is the pattern we were > >> > > > > considering > >> > > > > > > for > >> > > > > > > > > side > >> > > > > > > > > > > > > inputs, but I no longer think it is the best > way to > >> > > > access > >> > > > > > > them. > >> > > > > > > > > > > > > Annotations help getting rid of the > >> .withSideInputs() > >> > > > call, > >> > > > > > but > >> > > > > > > > > this > >> > > > > > > > > > is > >> > > > > > > > > > > > > where their advantage ends. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > The advantages of the proposed approach are > that it > >> > > > > > > automatically > >> > > > > > > > > > works > >> > > > > > > > > > > > > with all existing callback or lambda code. No > need > >> to > >> > > > > further > >> > > > > > > > > develop > >> > > > > > > > > > > the > >> > > > > > > > > > > > > reflection machinery to support side input > >> > annotations > >> > > - > >> > > > > and > >> > > > > > > > > > especially > >> > > > > > > > > > > > to > >> > > > > > > > > > > > > support arbitrary user interfaces, no need to > >> change > >> > > > > existing > >> > > > > > > > > > > transforms, > >> > > > > > > > > > > > > no need for transform authors to even know that > the > >> > > > > machinery > >> > > > > > > > > exists > >> > > > > > > > > > to > >> > > > > > > > > > > > > make side inputs usable in their transforms > (and no > >> > > need > >> > > > > for > >> > > > > > > > > authors > >> > > > > > > > > > to > >> > > > > > > > > > > > > think about whether or not they should support > side > >> > > > > inputs). > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Moreover, like Reuven says, annotations don't > work > >> > with > >> > > > > > lambdas > >> > > > > > > > at > >> > > > > > > > > > all: > >> > > > > > > > > > > > > creating a lambda with a flexible set of > annotation > >> > > > > arguments > >> > > > > > > > > appears > >> > > > > > > > > > > to > >> > > > > > > > > > > > be > >> > > > > > > > > > > > > currently impossible, and even capturing the > >> > > annotations > >> > > > on > >> > > > > > > > > arguments > >> > > > > > > > > > > of > >> > > > > > > > > > > > a > >> > > > > > > > > > > > > lambda is I believe also impossible because the > >> Java > >> > > > > compiler > >> > > > > > > > drops > >> > > > > > > > > > > them > >> > > > > > > > > > > > in > >> > > > > > > > > > > > > the generated class or method handle. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > >> > > > > > > > > <lc...@google.com.invalid > >> > > > > > > > > > > > >> > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > >> I believe we should follow the pattern that > state > >> > uses > >> > > > and > >> > > > > > > add a > >> > > > > > > > > > type > >> > > > > > > > > > > > >> annotation to link the side input definition to > >> its > >> > > > usage > >> > > > > > > > > directly. > >> > > > > > > > > > > This > >> > > > > > > > > > > > >> would allow us to know that the side input was > >> > > > definitely > >> > > > > > > being > >> > > > > > > > > > > accessed > >> > > > > > > > > > > > >> and perform validation during graph > construction > >> for > >> > > any > >> > > > > > used > >> > > > > > > > but > >> > > > > > > > > > > > >> unspecified side inputs. > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> Code snippet: > >> > > > > > > > > > > > >> final PCollectionView<String> foo = > >> > > > > > pipeline.apply("fooName", > >> > > > > > > > > > > > >> Create.of("foo")).apply(View.< > >> String>asSingleton()); > >> > > > > > > > > > > > >> PCollection<String> output = pipeline > >> > > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > >> > > > > > > > > > > > >> .apply(MapElements.via( > >> > > > > > > > > > > > >> new SimpleFunction<Integer, String>() { > >> > > > > > > > > > > > >> @Override > >> > > > > > > > > > > > >> public String apply(Integer input, > >> > > > > > > > @SideInput("fooName") > >> > > > > > > > > > > > String > >> > > > > > > > > > > > >> fooValue) { > >> > > > > > > > > > > > >> return fooValue + " " + input; > >> > > > > > > > > > > > >> } > >> > > > > > > > > > > > >> }).withSideInputs(foo));* > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene > Kirpichov < > >> > > > > > > > > > > > >> kirpic...@google.com.invalid> wrote: > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > >> > Sure, here's how a modified (passing) > >> MapElements > >> > > unit > >> > > > > > test > >> > > > > > > > > looks > >> > > > > > > > > > > > like, > >> > > > > > > > > > > > >> > with usage of side inputs: > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > >> > @Test > >> > > > > > > > > > > > >> > @Category(NeedsRunner.class) > >> > > > > > > > > > > > >> > public void testMapBasicWithSideInput() > throws > >> > > > > > Exception { > >> > > > > > > > > > > > >> > * final PCollectionView<String> foo =* > >> > > > > > > > > > > > >> > * pipeline.apply("foo", > >> > > > > > > > > > > > >> > > >> > > Create.of("foo")).apply(View.<String>asSingleton());* > >> > > > > > > > > > > > >> > PCollection<String> output = pipeline > >> > > > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) > >> > > > > > > > > > > > >> > .apply(MapElements.via( > >> > > > > > > > > > > > >> > new SimpleFunction<Integer, > >> String>() > >> > { > >> > > > > > > > > > > > >> > @Override > >> > > > > > > > > > > > >> > public String apply(Integer > >> input) { > >> > > > > > > > > > > > >> > return* foo.get() *+ " " + > >> input; > >> > > > > > > > > > > > >> > } > >> > > > > > > > > > > > >> > }) > >> > > > > > > > > > > > >> > *.withSideInputs(foo));* > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > >> > PAssert.that(output). > >> containsInAnyOrder("foo > >> > 1", > >> > > > > "foo > >> > > > > > > 2", > >> > > > > > > > > > "foo > >> > > > > > > > > > > > 3"); > >> > > > > > > > > > > > >> > pipeline.run(); > >> > > > > > > > > > > > >> > } > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > >> > > > > > > > > > <re...@google.com.invalid > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > wrote: > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > >> > > Can you provide a code snippet showing how > >> this > >> > > > would > >> > > > > > > look? > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene > >> > Kirpichov < > >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote: > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > TL;DR Introduce method > >> PCollectionView.get(), > >> > > > > > > implemented > >> > > > > > > > > as: > >> > > > > > > > > > > get > >> > > > > > > > > > > > >> > > > thread-local ProcessContext and do > >> > > > > c.sideInput(this). > >> > > > > > > As a > >> > > > > > > > > > > result, > >> > > > > > > > > > > > >> any > >> > > > > > > > > > > > >> > > user > >> > > > > > > > > > > > >> > > > lambdas such as MapElements can use side > >> > inputs. > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > Quite a few transforms have user-code > >> > callbacks > >> > > or > >> > > > > > > > lambdas: > >> > > > > > > > > > > ParDo > >> > > > > > > > > > > > >> > (DoFn), > >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the > DynamicDestinations > >> > > > classes > >> > > > > > in > >> > > > > > > > > > various > >> > > > > > > > > > > > IOs, > >> > > > > > > > > > > > >> > > > combine fns, the PollFn callback in > Watch, > >> > etc. > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have > >> > built-in > >> > > > > > support > >> > > > > > > > for > >> > > > > > > > > > side > >> > > > > > > > > > > > >> > inputs; > >> > > > > > > > > > > > >> > > > for DynamicDestinations it is plumbed > >> > > explicitly; > >> > > > > > others > >> > > > > > > > > don't > >> > > > > > > > > > > > have > >> > > > > > > > > > > > >> > > access > >> > > > > > > > > > > > >> > > > (e.g. you can't access side inputs from > >> > > > > > > > Map/FlatMapElements > >> > > > > > > > > > > > because > >> > > > > > > > > > > > >> > they > >> > > > > > > > > > > > >> > > > don't have a ProcessContext or any > context > >> for > >> > > > that > >> > > > > > > > matter). > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > I think it's important to solve this, > >> > especially > >> > > > as > >> > > > > > > Java 8 > >> > > > > > > > > > > becomes > >> > > > > > > > > > > > >> > > people's > >> > > > > > > > > > > > >> > > > default choice: users will want to use > side > >> > > inputs > >> > > > > in > >> > > > > > > > > > > > >> > > Map/FlatMapElements. > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > It also appears to be quite easy to > >> implement: > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > Runner part: > >> > > > > > > > > > > > >> > > > - introduce a SideInputAccessor interface > >> > > > > > > > > > > > >> > > > - make .get() on a PCollectionView get it > >> > from a > >> > > > > > > > > thread-local > >> > > > > > > > > > > > >> > > > SideInputAccessor > >> > > > > > > > > > > > >> > > > - make runners set the thread-local > >> > > > > SideInputAccessor > >> > > > > > > any > >> > > > > > > > > time > >> > > > > > > > > > > the > >> > > > > > > > > > > > >> > runner > >> > > > > > > > > > > > >> > > > is evaluating something in a context > where > >> > side > >> > > > > inputs > >> > > > > > > are > >> > > > > > > > > > > > >> available, > >> > > > > > > > > > > > >> > > e.g. > >> > > > > > > > > > > > >> > > > a ProcessElement method, or applying a > >> > > CombineFn - > >> > > > > the > >> > > > > > > set > >> > > > > > > > > of > >> > > > > > > > > > > such > >> > > > > > > > > > > > >> > places > >> > > > > > > > > > > > >> > > > will be quite small. I believe only > runners > >> > (but > >> > > > not > >> > > > > > > > > > transforms) > >> > > > > > > > > > > > >> will > >> > > > > > > > > > > > >> > > ever > >> > > > > > > > > > > > >> > > > need to set this thread-local > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > Transform part: > >> > > > > > > > > > > > >> > > > - Transforms that take user-code lambdas > and > >> > > want > >> > > > to > >> > > > > > let > >> > > > > > > > > them > >> > > > > > > > > > > > access > >> > > > > > > > > > > > >> > side > >> > > > > > > > > > > > >> > > > inputs still will need to be configurable > >> > with a > >> > > > > > method > >> > > > > > > > like > >> > > > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and will > >> need > >> > > to > >> > > > > > plumb > >> > > > > > > > > those > >> > > > > > > > > > > down > >> > > > > > > > > > > > >> to > >> > > > > > > > > > > > >> > the > >> > > > > > > > > > > > >> > > > primitive DoFn's or CombineFn's - > >> information > >> > on > >> > > > > > *which* > >> > > > > > > > > side > >> > > > > > > > > > > > inputs > >> > > > > > > > > > > > >> > will > >> > > > > > > > > > > > >> > > > be accessed, of course, still needs to > be in > >> > the > >> > > > > > graph. > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > I quickly prototyped this in direct > runner > >> and > >> > > > > > > > MapElements, > >> > > > > > > > > > and > >> > > > > > > > > > > it > >> > > > > > > > > > > > >> > worked > >> > > > > > > > > > > > >> > > > with no issues - I'm wondering if there's > >> > > > something > >> > > > > > > subtle > >> > > > > > > > > > that > >> > > > > > > > > > > > I'm > >> > > > > > > > > > > > >> > > > missing, or is it actually a good idea? > >> > > > > > > > > > > > >> > > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > >> > > > > > > > > > > > >> > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> >