One possible issue with this is that updating a thread local is likely to be much more expensive than passing an additional argument. Also, not all code called from within the DoFn will necessarily be in the same thread (eg., sometimes we create a pool of threads for doing work). It may be *more* confusing for this to sometimes work magically and sometimes fail horribly. Also, requiring the PCollectionView to be passed to user code that accesses it is nice because it makes *very clear* that the side input needs to be provided from the DoFn to that particular utility. If it is accessed via "spooky action at a distance" we lose that piece of "free" documentation, which may lead to extensive misuse of these utility methods.
On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov <kirpic...@google.com.invalid> wrote: > Hi, > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles <k...@google.com.invalid> > wrote: > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > > kirpic...@google.com.invalid> wrote: > > > > > > > > The differences are: > > > - The proposal in the doc allows wiring different side inputs to the > same > > > Supplier, but I'm not convinced that this is important - you can just > as > > > easily call the constructor of your DoFn passing different > > > PCollectionView's for it to capture. > > > > > > > I disagree with this bit about it being "just as easy". Passing the > needed > > PCollectionViews to your constructor (or even having a constructor) is a > > pain. Every time I have to do it, it adds a ton of boilerplate that feels > > like pure noise. To make a DoFn reusable it must be made into a named > class > > with a constructor, versus inlined with no constructor. > > Hm, why? You can have the DoFn be an anonymous class capturing the > PCollectionView into a @SideInput field as a closure. > > > > A generous analogy > > is is that it is "just" manual closure conversion/currying, changing > > f(side, main) to f(side)(main). But in practice in Beam the second one > has > > much more boilerplate. > > > > Also, Beam is worse. We present the user with higher-order functions, > which > > is where the actual annoyance comes in. When you want to pardo(f) you > have > > to write pardo(f(side))(side, main). Your proposal is to support > > pardo(f(side))(main) and mine is to support pardo(f)(side, main). I still > > propose that we support both (as they get implemented). If you buy in to > my > > analogy, then there's decades of precedent and the burden of proof falls > > heavily on whoever doesn't want to support both. > > > I see your point. I think the proposal is compatible with what you're > suggesting too - in DoFn we could have @SideInput *parameters* of type > PCollectionView, with the same semantics as a field. > > > > > > - My proposal allows getting rid of .withSideInputs() entirely, because > the > > > DoFn captures the PCollectionView so you don't need to specify it > > > explicitly for wiring. > > > > > > > I've decided to change to full +1 (whatever that means compared to 0.75 > :-) > > to adding support for @SideInput fields, because the benefits outweigh > this > > failure mode: > > > > new DoFn { > > // forgot the annotation > > private final PCollectionView whatever; > > > > @ProcessElement public void process(...) { > > whatever.get(); // crash during execution > > } > > } > > > > But ideas to mitigate that would be cool. > > Hm, can't think of anything less hacky than "prohibit having fields of type > PCollectionView that are not public, final, and annotated with @SideInput" > - not sure we'd want to go down this road. I suppose a good error message > in .get() would be sufficient, saying "Did you forget to specify a > requirement for this side input via .withSideInputs() or by annotating the > field as @SideInput" or something like that. > > > > > > > Kenn > > > > > > > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik <lc...@google.com.invalid> > > > wrote: > > > > > > > My concern with the proposal is not the specifics of how it will work > > and > > > > more about it being yet another way on how our API is to be used even > > > > though we have a proposal [1] of an API style we were working towards > > in > > > > Java and Python. I would rather re-open that discussion now about > what > > we > > > > want that API to look like for our major features and work towards > > > > consistency (or not if there is a strong argument as to why some > > feature > > > > should have a different style). > > > > > > > > 1: https://s.apache.org/a-new-dofn > > > > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > > <k...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > +0.75 because I'd like to bring up invalid pipelines. > > > > > > > > > > I had proposed side inputs as parameters to DoFn in > > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so the only > > > place > > > > > they are specified is in the graph construction, making the DoFn > more > > > > > reusable and errors impossible. I've actually been noodling my way > > > > towards > > > > > this in a branch :-) > > > > > > > > > > Eugene's proposal is a sort of converse, where the side inputs are > > > values > > > > > captured in the closure and not parameters, yet the only place they > > are > > > > > specified is in the DoFn. > > > > > > > > > > I see no conflict between these two. It is very natural to have > both > > > the > > > > > capability to accept parameters and the ability to capture > variables > > in > > > > the > > > > > closure. Supporting both is totally standard in up-to-date > > programming > > > > > languages. > > > > > > > > > > Today we have the worse of both worlds: PCollectionView behaves as > > > > > something captured in the closure/constructor, but must still be > > > > explicitly > > > > > wired up. > > > > > > > > > > But if I use PCollectionView.get() and have not wired it up in any > > way, > > > > > what happens? Just like today, you can try to .sideInput(...) a > thing > > > > that > > > > > is not available. With side inputs as parameters, this is not > > possible. > > > > If > > > > > you want to treat them as captured in a closure, while avoiding > > errors, > > > > it > > > > > seems like you might need to do some low-level magic, like the > > > > > serialization-based detection that Luke has suggested before (there > > are > > > > > known downsides that we haven't explored, like overcapture). > > > > > > > > > > Kenn > > > > > > > > > > [1] > > > > > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > Hm, I guess you're right - for outputs it could be indeed quite > > > > valuable > > > > > to > > > > > > output to them without plumbing (e.g. outputting errors). Could > be > > > done > > > > > > perhaps via TupleTag.output()? (assuming the same TupleTag can > not > > be > > > > > > reused to tag multiple PCollection's) > > > > > > > > > > > > For now I sent a PR for side input support > > > > > > https://github.com/apache/beam/pull/3814 . > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > > <lc...@google.com.invalid > > > > > > > > > > wrote: > > > > > > > > > > > > > I disagree, state may not care where it is used as well since a > > > > person > > > > > > may > > > > > > > call a function which needs to store/retrieve state and instead > > of > > > > > having > > > > > > > the DoFn declare the StateSpec and then pass in the state > > > > > implementation > > > > > > > down into the function everywhere. Similarly for outputs, the > > > > internal > > > > > > > functions could take the TupleTag and request an output manager > > or > > > > take > > > > > > an > > > > > > > "output" reference which give functions the ability to produce > > > output > > > > > > > directly without needing to pass everything that is needed to > be > > > > output > > > > > > > back to the caller. > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > > Hm, I think of these things (state, side outputs etc.), only > > side > > > > > > inputs > > > > > > > > make sense to access in arbitrary user callbacks without > > explicit > > > > > > > knowledge > > > > > > > > of the surrounding transform - so only side inputs can be > > > implicit > > > > > like > > > > > > > > this. > > > > > > > > > > > > > > > > Ultimately we'll probably end up removing ProcessContext, and > > > > keeping > > > > > > > only > > > > > > > > annotations (on fields / methods / parameters). In that > world, > > a > > > > > field > > > > > > > > annotation could be used (like per my previous email) to > > > statically > > > > > > > specify > > > > > > > > which side inputs will be needed - while the value could > still > > be > > > > > > > accessed > > > > > > > > via .get(), just like state cells are accessed via .read() > and > > > > > > .write(): > > > > > > > > i.e., #get() is not a new method of access. > > > > > > > > > > > > > > > > Overall, it seems like I should proceed with the idea. I > filed > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > For API consistency reasons, it would be good if we did > this > > > > > > > holistically > > > > > > > > > and expanded this approach to state, side outputs, ... so > > that > > > a > > > > > > person > > > > > > > > can > > > > > > > > > always call Something.get() to return something that they > can > > > > > access > > > > > > > > > implementation wise. It will be confusing for our users to > > have > > > > > many > > > > > > > > > variations in our style of how all these concepts are used > > > > > > > > (ProcessContext > > > > > > > > > / Annotations / #get()) > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > > > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > > > > > > Also, I think my approach is compatible with annotations > > and > > > > > future > > > > > > > > > removal > > > > > > > > > > of .withSideInputs if we annotate a field: > > > > > > > > > > final PCollectionView<Foo> foo = ...; > > > > > > > > > > > > > > > > > > > > class MyDoFn { > > > > > > > > > > @SideInput > > > > > > > > > > PCollectionView<Foo> foo = foo; > > > > > > > > > > > > > > > > > > > > ...foo.get()... > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > We can extract the accessed views from the DoFn instance > > > using > > > > > > > > > reflection. > > > > > > > > > > Still not compatible with lambdas, but compatible > > > automatically > > > > > > with > > > > > > > > all > > > > > > > > > > anonymous classes. > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > > > > > > kirpic...@google.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > > > > > > > I know this (annotations) is the pattern we were > > > considering > > > > > for > > > > > > > side > > > > > > > > > > > inputs, but I no longer think it is the best way to > > access > > > > > them. > > > > > > > > > > > Annotations help getting rid of the .withSideInputs() > > call, > > > > but > > > > > > > this > > > > > > > > is > > > > > > > > > > > where their advantage ends. > > > > > > > > > > > > > > > > > > > > > > The advantages of the proposed approach are that it > > > > > automatically > > > > > > > > works > > > > > > > > > > > with all existing callback or lambda code. No need to > > > further > > > > > > > develop > > > > > > > > > the > > > > > > > > > > > reflection machinery to support side input annotations > - > > > and > > > > > > > > especially > > > > > > > > > > to > > > > > > > > > > > support arbitrary user interfaces, no need to change > > > existing > > > > > > > > > transforms, > > > > > > > > > > > no need for transform authors to even know that the > > > machinery > > > > > > > exists > > > > > > > > to > > > > > > > > > > > make side inputs usable in their transforms (and no > need > > > for > > > > > > > authors > > > > > > > > to > > > > > > > > > > > think about whether or not they should support side > > > inputs). > > > > > > > > > > > > > > > > > > > > > > Moreover, like Reuven says, annotations don't work with > > > > lambdas > > > > > > at > > > > > > > > all: > > > > > > > > > > > creating a lambda with a flexible set of annotation > > > arguments > > > > > > > appears > > > > > > > > > to > > > > > > > > > > be > > > > > > > > > > > currently impossible, and even capturing the > annotations > > on > > > > > > > arguments > > > > > > > > > of > > > > > > > > > > a > > > > > > > > > > > lambda is I believe also impossible because the Java > > > compiler > > > > > > drops > > > > > > > > > them > > > > > > > > > > in > > > > > > > > > > > the generated class or method handle. > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > > > > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> I believe we should follow the pattern that state uses > > and > > > > > add a > > > > > > > > type > > > > > > > > > > >> annotation to link the side input definition to its > > usage > > > > > > > directly. > > > > > > > > > This > > > > > > > > > > >> would allow us to know that the side input was > > definitely > > > > > being > > > > > > > > > accessed > > > > > > > > > > >> and perform validation during graph construction for > any > > > > used > > > > > > but > > > > > > > > > > >> unspecified side inputs. > > > > > > > > > > >> > > > > > > > > > > >> Code snippet: > > > > > > > > > > >> final PCollectionView<String> foo = > > > > pipeline.apply("fooName", > > > > > > > > > > >> Create.of("foo")).apply(View.<String>asSingleton()); > > > > > > > > > > >> PCollection<String> output = pipeline > > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > > > > > > >> .apply(MapElements.via( > > > > > > > > > > >> new SimpleFunction<Integer, String>() { > > > > > > > > > > >> @Override > > > > > > > > > > >> public String apply(Integer input, > > > > > > @SideInput("fooName") > > > > > > > > > > String > > > > > > > > > > >> fooValue) { > > > > > > > > > > >> return fooValue + " " + input; > > > > > > > > > > >> } > > > > > > > > > > >> }).withSideInputs(foo));* > > > > > > > > > > >> > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > > > > > > > > > >> kirpic...@google.com.invalid> wrote: > > > > > > > > > > >> > > > > > > > > > > >> > Sure, here's how a modified (passing) MapElements > unit > > > > test > > > > > > > looks > > > > > > > > > > like, > > > > > > > > > > >> > with usage of side inputs: > > > > > > > > > > >> > > > > > > > > > > > >> > @Test > > > > > > > > > > >> > @Category(NeedsRunner.class) > > > > > > > > > > >> > public void testMapBasicWithSideInput() throws > > > > Exception { > > > > > > > > > > >> > * final PCollectionView<String> foo =* > > > > > > > > > > >> > * pipeline.apply("foo", > > > > > > > > > > >> > > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > > > > > > >> > PCollection<String> output = pipeline > > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > > > > > > >> > .apply(MapElements.via( > > > > > > > > > > >> > new SimpleFunction<Integer, String>() { > > > > > > > > > > >> > @Override > > > > > > > > > > >> > public String apply(Integer input) { > > > > > > > > > > >> > return* foo.get() *+ " " + input; > > > > > > > > > > >> > } > > > > > > > > > > >> > }) > > > > > > > > > > >> > *.withSideInputs(foo));* > > > > > > > > > > >> > > > > > > > > > > > >> > PAssert.that(output).containsInAnyOrder("foo 1", > > > "foo > > > > > 2", > > > > > > > > "foo > > > > > > > > > > 3"); > > > > > > > > > > >> > pipeline.run(); > > > > > > > > > > >> > } > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > > > > > > > > <re...@google.com.invalid > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > Can you provide a code snippet showing how this > > would > > > > > look? > > > > > > > > > > >> > > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > >> > > > > > > > > > > > > >> > > > TL;DR Introduce method PCollectionView.get(), > > > > > implemented > > > > > > > as: > > > > > > > > > get > > > > > > > > > > >> > > > thread-local ProcessContext and do > > > c.sideInput(this). > > > > > As a > > > > > > > > > result, > > > > > > > > > > >> any > > > > > > > > > > >> > > user > > > > > > > > > > >> > > > lambdas such as MapElements can use side inputs. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Quite a few transforms have user-code callbacks > or > > > > > > lambdas: > > > > > > > > > ParDo > > > > > > > > > > >> > (DoFn), > > > > > > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations > > classes > > > > in > > > > > > > > various > > > > > > > > > > IOs, > > > > > > > > > > >> > > > combine fns, the PollFn callback in Watch, etc. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have built-in > > > > support > > > > > > for > > > > > > > > side > > > > > > > > > > >> > inputs; > > > > > > > > > > >> > > > for DynamicDestinations it is plumbed > explicitly; > > > > others > > > > > > > don't > > > > > > > > > > have > > > > > > > > > > >> > > access > > > > > > > > > > >> > > > (e.g. you can't access side inputs from > > > > > > Map/FlatMapElements > > > > > > > > > > because > > > > > > > > > > >> > they > > > > > > > > > > >> > > > don't have a ProcessContext or any context for > > that > > > > > > matter). > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > I think it's important to solve this, especially > > as > > > > > Java 8 > > > > > > > > > becomes > > > > > > > > > > >> > > people's > > > > > > > > > > >> > > > default choice: users will want to use side > inputs > > > in > > > > > > > > > > >> > > Map/FlatMapElements. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > It also appears to be quite easy to implement: > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Runner part: > > > > > > > > > > >> > > > - introduce a SideInputAccessor interface > > > > > > > > > > >> > > > - make .get() on a PCollectionView get it from a > > > > > > > thread-local > > > > > > > > > > >> > > > SideInputAccessor > > > > > > > > > > >> > > > - make runners set the thread-local > > > SideInputAccessor > > > > > any > > > > > > > time > > > > > > > > > the > > > > > > > > > > >> > runner > > > > > > > > > > >> > > > is evaluating something in a context where side > > > inputs > > > > > are > > > > > > > > > > >> available, > > > > > > > > > > >> > > e.g. > > > > > > > > > > >> > > > a ProcessElement method, or applying a > CombineFn - > > > the > > > > > set > > > > > > > of > > > > > > > > > such > > > > > > > > > > >> > places > > > > > > > > > > >> > > > will be quite small. I believe only runners (but > > not > > > > > > > > transforms) > > > > > > > > > > >> will > > > > > > > > > > >> > > ever > > > > > > > > > > >> > > > need to set this thread-local > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Transform part: > > > > > > > > > > >> > > > - Transforms that take user-code lambdas and > want > > to > > > > let > > > > > > > them > > > > > > > > > > access > > > > > > > > > > >> > side > > > > > > > > > > >> > > > inputs still will need to be configurable with a > > > > method > > > > > > like > > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and will need > to > > > > plumb > > > > > > > those > > > > > > > > > down > > > > > > > > > > >> to > > > > > > > > > > >> > the > > > > > > > > > > >> > > > primitive DoFn's or CombineFn's - information on > > > > *which* > > > > > > > side > > > > > > > > > > inputs > > > > > > > > > > >> > will > > > > > > > > > > >> > > > be accessed, of course, still needs to be in the > > > > graph. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > I quickly prototyped this in direct runner and > > > > > > MapElements, > > > > > > > > and > > > > > > > > > it > > > > > > > > > > >> > worked > > > > > > > > > > >> > > > with no issues - I'm wondering if there's > > something > > > > > subtle > > > > > > > > that > > > > > > > > > > I'm > > > > > > > > > > >> > > > missing, or is it actually a good idea? > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >