Hi, On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles <k...@google.com.invalid> wrote:
> On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > > > The differences are: > > - The proposal in the doc allows wiring different side inputs to the same > > Supplier, but I'm not convinced that this is important - you can just as > > easily call the constructor of your DoFn passing different > > PCollectionView's for it to capture. > > > > I disagree with this bit about it being "just as easy". Passing the needed > PCollectionViews to your constructor (or even having a constructor) is a > pain. Every time I have to do it, it adds a ton of boilerplate that feels > like pure noise. To make a DoFn reusable it must be made into a named class > with a constructor, versus inlined with no constructor. Hm, why? You can have the DoFn be an anonymous class capturing the PCollectionView into a @SideInput field as a closure. > A generous analogy > is is that it is "just" manual closure conversion/currying, changing > f(side, main) to f(side)(main). But in practice in Beam the second one has > much more boilerplate. > > Also, Beam is worse. We present the user with higher-order functions, which > is where the actual annoyance comes in. When you want to pardo(f) you have > to write pardo(f(side))(side, main). Your proposal is to support > pardo(f(side))(main) and mine is to support pardo(f)(side, main). I still > propose that we support both (as they get implemented). If you buy in to my > analogy, then there's decades of precedent and the burden of proof falls > heavily on whoever doesn't want to support both. > I see your point. I think the proposal is compatible with what you're suggesting too - in DoFn we could have @SideInput *parameters* of type PCollectionView, with the same semantics as a field. > > - My proposal allows getting rid of .withSideInputs() entirely, because the > > DoFn captures the PCollectionView so you don't need to specify it > > explicitly for wiring. > > > > I've decided to change to full +1 (whatever that means compared to 0.75 :-) > to adding support for @SideInput fields, because the benefits outweigh this > failure mode: > > new DoFn { > // forgot the annotation > private final PCollectionView whatever; > > @ProcessElement public void process(...) { > whatever.get(); // crash during execution > } > } > > But ideas to mitigate that would be cool. Hm, can't think of anything less hacky than "prohibit having fields of type PCollectionView that are not public, final, and annotated with @SideInput" - not sure we'd want to go down this road. I suppose a good error message in .get() would be sufficient, saying "Did you forget to specify a requirement for this side input via .withSideInputs() or by annotating the field as @SideInput" or something like that. > > Kenn > > > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik <lc...@google.com.invalid> > > wrote: > > > > > My concern with the proposal is not the specifics of how it will work > and > > > more about it being yet another way on how our API is to be used even > > > though we have a proposal [1] of an API style we were working towards > in > > > Java and Python. I would rather re-open that discussion now about what > we > > > want that API to look like for our major features and work towards > > > consistency (or not if there is a strong argument as to why some > feature > > > should have a different style). > > > > > > 1: https://s.apache.org/a-new-dofn > > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles > <k...@google.com.invalid > > > > > > wrote: > > > > > > > +0.75 because I'd like to bring up invalid pipelines. > > > > > > > > I had proposed side inputs as parameters to DoFn in > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so the only > > place > > > > they are specified is in the graph construction, making the DoFn more > > > > reusable and errors impossible. I've actually been noodling my way > > > towards > > > > this in a branch :-) > > > > > > > > Eugene's proposal is a sort of converse, where the side inputs are > > values > > > > captured in the closure and not parameters, yet the only place they > are > > > > specified is in the DoFn. > > > > > > > > I see no conflict between these two. It is very natural to have both > > the > > > > capability to accept parameters and the ability to capture variables > in > > > the > > > > closure. Supporting both is totally standard in up-to-date > programming > > > > languages. > > > > > > > > Today we have the worse of both worlds: PCollectionView behaves as > > > > something captured in the closure/constructor, but must still be > > > explicitly > > > > wired up. > > > > > > > > But if I use PCollectionView.get() and have not wired it up in any > way, > > > > what happens? Just like today, you can try to .sideInput(...) a thing > > > that > > > > is not available. With side inputs as parameters, this is not > possible. > > > If > > > > you want to treat them as captured in a closure, while avoiding > errors, > > > it > > > > seems like you might need to do some low-level magic, like the > > > > serialization-based detection that Luke has suggested before (there > are > > > > known downsides that we haven't explored, like overcapture). > > > > > > > > Kenn > > > > > > > > [1] > > > > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > > > > > > > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > Hm, I guess you're right - for outputs it could be indeed quite > > > valuable > > > > to > > > > > output to them without plumbing (e.g. outputting errors). Could be > > done > > > > > perhaps via TupleTag.output()? (assuming the same TupleTag can not > be > > > > > reused to tag multiple PCollection's) > > > > > > > > > > For now I sent a PR for side input support > > > > > https://github.com/apache/beam/pull/3814 . > > > > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik > <lc...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > > > I disagree, state may not care where it is used as well since a > > > person > > > > > may > > > > > > call a function which needs to store/retrieve state and instead > of > > > > having > > > > > > the DoFn declare the StateSpec and then pass in the state > > > > implementation > > > > > > down into the function everywhere. Similarly for outputs, the > > > internal > > > > > > functions could take the TupleTag and request an output manager > or > > > take > > > > > an > > > > > > "output" reference which give functions the ability to produce > > output > > > > > > directly without needing to pass everything that is needed to be > > > output > > > > > > back to the caller. > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > Hm, I think of these things (state, side outputs etc.), only > side > > > > > inputs > > > > > > > make sense to access in arbitrary user callbacks without > explicit > > > > > > knowledge > > > > > > > of the surrounding transform - so only side inputs can be > > implicit > > > > like > > > > > > > this. > > > > > > > > > > > > > > Ultimately we'll probably end up removing ProcessContext, and > > > keeping > > > > > > only > > > > > > > annotations (on fields / methods / parameters). In that world, > a > > > > field > > > > > > > annotation could be used (like per my previous email) to > > statically > > > > > > specify > > > > > > > which side inputs will be needed - while the value could still > be > > > > > > accessed > > > > > > > via .get(), just like state cells are accessed via .read() and > > > > > .write(): > > > > > > > i.e., #get() is not a new method of access. > > > > > > > > > > > > > > Overall, it seems like I should proceed with the idea. I filed > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > > > <lc...@google.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > For API consistency reasons, it would be good if we did this > > > > > > holistically > > > > > > > > and expanded this approach to state, side outputs, ... so > that > > a > > > > > person > > > > > > > can > > > > > > > > always call Something.get() to return something that they can > > > > access > > > > > > > > implementation wise. It will be confusing for our users to > have > > > > many > > > > > > > > variations in our style of how all these concepts are used > > > > > > > (ProcessContext > > > > > > > > / Annotations / #get()) > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > > > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > > > > > Also, I think my approach is compatible with annotations > and > > > > future > > > > > > > > removal > > > > > > > > > of .withSideInputs if we annotate a field: > > > > > > > > > final PCollectionView<Foo> foo = ...; > > > > > > > > > > > > > > > > > > class MyDoFn { > > > > > > > > > @SideInput > > > > > > > > > PCollectionView<Foo> foo = foo; > > > > > > > > > > > > > > > > > > ...foo.get()... > > > > > > > > > } > > > > > > > > > > > > > > > > > > We can extract the accessed views from the DoFn instance > > using > > > > > > > > reflection. > > > > > > > > > Still not compatible with lambdas, but compatible > > automatically > > > > > with > > > > > > > all > > > > > > > > > anonymous classes. > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > > > > > kirpic...@google.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > > > > > I know this (annotations) is the pattern we were > > considering > > > > for > > > > > > side > > > > > > > > > > inputs, but I no longer think it is the best way to > access > > > > them. > > > > > > > > > > Annotations help getting rid of the .withSideInputs() > call, > > > but > > > > > > this > > > > > > > is > > > > > > > > > > where their advantage ends. > > > > > > > > > > > > > > > > > > > > The advantages of the proposed approach are that it > > > > automatically > > > > > > > works > > > > > > > > > > with all existing callback or lambda code. No need to > > further > > > > > > develop > > > > > > > > the > > > > > > > > > > reflection machinery to support side input annotations - > > and > > > > > > > especially > > > > > > > > > to > > > > > > > > > > support arbitrary user interfaces, no need to change > > existing > > > > > > > > transforms, > > > > > > > > > > no need for transform authors to even know that the > > machinery > > > > > > exists > > > > > > > to > > > > > > > > > > make side inputs usable in their transforms (and no need > > for > > > > > > authors > > > > > > > to > > > > > > > > > > think about whether or not they should support side > > inputs). > > > > > > > > > > > > > > > > > > > > Moreover, like Reuven says, annotations don't work with > > > lambdas > > > > > at > > > > > > > all: > > > > > > > > > > creating a lambda with a flexible set of annotation > > arguments > > > > > > appears > > > > > > > > to > > > > > > > > > be > > > > > > > > > > currently impossible, and even capturing the annotations > on > > > > > > arguments > > > > > > > > of > > > > > > > > > a > > > > > > > > > > lambda is I believe also impossible because the Java > > compiler > > > > > drops > > > > > > > > them > > > > > > > > > in > > > > > > > > > > the generated class or method handle. > > > > > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > > > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> I believe we should follow the pattern that state uses > and > > > > add a > > > > > > > type > > > > > > > > > >> annotation to link the side input definition to its > usage > > > > > > directly. > > > > > > > > This > > > > > > > > > >> would allow us to know that the side input was > definitely > > > > being > > > > > > > > accessed > > > > > > > > > >> and perform validation during graph construction for any > > > used > > > > > but > > > > > > > > > >> unspecified side inputs. > > > > > > > > > >> > > > > > > > > > >> Code snippet: > > > > > > > > > >> final PCollectionView<String> foo = > > > pipeline.apply("fooName", > > > > > > > > > >> Create.of("foo")).apply(View.<String>asSingleton()); > > > > > > > > > >> PCollection<String> output = pipeline > > > > > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > > > > > >> .apply(MapElements.via( > > > > > > > > > >> new SimpleFunction<Integer, String>() { > > > > > > > > > >> @Override > > > > > > > > > >> public String apply(Integer input, > > > > > @SideInput("fooName") > > > > > > > > > String > > > > > > > > > >> fooValue) { > > > > > > > > > >> return fooValue + " " + input; > > > > > > > > > >> } > > > > > > > > > >> }).withSideInputs(foo));* > > > > > > > > > >> > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > > > > > > > > >> kirpic...@google.com.invalid> wrote: > > > > > > > > > >> > > > > > > > > > >> > Sure, here's how a modified (passing) MapElements unit > > > test > > > > > > looks > > > > > > > > > like, > > > > > > > > > >> > with usage of side inputs: > > > > > > > > > >> > > > > > > > > > > >> > @Test > > > > > > > > > >> > @Category(NeedsRunner.class) > > > > > > > > > >> > public void testMapBasicWithSideInput() throws > > > Exception { > > > > > > > > > >> > * final PCollectionView<String> foo =* > > > > > > > > > >> > * pipeline.apply("foo", > > > > > > > > > >> > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > > > > > >> > PCollection<String> output = pipeline > > > > > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > > > > > >> > .apply(MapElements.via( > > > > > > > > > >> > new SimpleFunction<Integer, String>() { > > > > > > > > > >> > @Override > > > > > > > > > >> > public String apply(Integer input) { > > > > > > > > > >> > return* foo.get() *+ " " + input; > > > > > > > > > >> > } > > > > > > > > > >> > }) > > > > > > > > > >> > *.withSideInputs(foo));* > > > > > > > > > >> > > > > > > > > > > >> > PAssert.that(output).containsInAnyOrder("foo 1", > > "foo > > > > 2", > > > > > > > "foo > > > > > > > > > 3"); > > > > > > > > > >> > pipeline.run(); > > > > > > > > > >> > } > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > > > > > > > <re...@google.com.invalid > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > >> > > Can you provide a code snippet showing how this > would > > > > look? > > > > > > > > > >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote: > > > > > > > > > >> > > > > > > > > > > > >> > > > TL;DR Introduce method PCollectionView.get(), > > > > implemented > > > > > > as: > > > > > > > > get > > > > > > > > > >> > > > thread-local ProcessContext and do > > c.sideInput(this). > > > > As a > > > > > > > > result, > > > > > > > > > >> any > > > > > > > > > >> > > user > > > > > > > > > >> > > > lambdas such as MapElements can use side inputs. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Quite a few transforms have user-code callbacks or > > > > > lambdas: > > > > > > > > ParDo > > > > > > > > > >> > (DoFn), > > > > > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations > classes > > > in > > > > > > > various > > > > > > > > > IOs, > > > > > > > > > >> > > > combine fns, the PollFn callback in Watch, etc. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have built-in > > > support > > > > > for > > > > > > > side > > > > > > > > > >> > inputs; > > > > > > > > > >> > > > for DynamicDestinations it is plumbed explicitly; > > > others > > > > > > don't > > > > > > > > > have > > > > > > > > > >> > > access > > > > > > > > > >> > > > (e.g. you can't access side inputs from > > > > > Map/FlatMapElements > > > > > > > > > because > > > > > > > > > >> > they > > > > > > > > > >> > > > don't have a ProcessContext or any context for > that > > > > > matter). > > > > > > > > > >> > > > > > > > > > > > > >> > > > I think it's important to solve this, especially > as > > > > Java 8 > > > > > > > > becomes > > > > > > > > > >> > > people's > > > > > > > > > >> > > > default choice: users will want to use side inputs > > in > > > > > > > > > >> > > Map/FlatMapElements. > > > > > > > > > >> > > > > > > > > > > > > >> > > > It also appears to be quite easy to implement: > > > > > > > > > >> > > > > > > > > > > > > >> > > > Runner part: > > > > > > > > > >> > > > - introduce a SideInputAccessor interface > > > > > > > > > >> > > > - make .get() on a PCollectionView get it from a > > > > > > thread-local > > > > > > > > > >> > > > SideInputAccessor > > > > > > > > > >> > > > - make runners set the thread-local > > SideInputAccessor > > > > any > > > > > > time > > > > > > > > the > > > > > > > > > >> > runner > > > > > > > > > >> > > > is evaluating something in a context where side > > inputs > > > > are > > > > > > > > > >> available, > > > > > > > > > >> > > e.g. > > > > > > > > > >> > > > a ProcessElement method, or applying a CombineFn - > > the > > > > set > > > > > > of > > > > > > > > such > > > > > > > > > >> > places > > > > > > > > > >> > > > will be quite small. I believe only runners (but > not > > > > > > > transforms) > > > > > > > > > >> will > > > > > > > > > >> > > ever > > > > > > > > > >> > > > need to set this thread-local > > > > > > > > > >> > > > > > > > > > > > > >> > > > Transform part: > > > > > > > > > >> > > > - Transforms that take user-code lambdas and want > to > > > let > > > > > > them > > > > > > > > > access > > > > > > > > > >> > side > > > > > > > > > >> > > > inputs still will need to be configurable with a > > > method > > > > > like > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and will need to > > > plumb > > > > > > those > > > > > > > > down > > > > > > > > > >> to > > > > > > > > > >> > the > > > > > > > > > >> > > > primitive DoFn's or CombineFn's - information on > > > *which* > > > > > > side > > > > > > > > > inputs > > > > > > > > > >> > will > > > > > > > > > >> > > > be accessed, of course, still needs to be in the > > > graph. > > > > > > > > > >> > > > > > > > > > > > > >> > > > I quickly prototyped this in direct runner and > > > > > MapElements, > > > > > > > and > > > > > > > > it > > > > > > > > > >> > worked > > > > > > > > > >> > > > with no issues - I'm wondering if there's > something > > > > subtle > > > > > > > that > > > > > > > > > I'm > > > > > > > > > >> > > > missing, or is it actually a good idea? > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >