My concern with the proposal is not the specifics of how it will work and more about it being yet another way on how our API is to be used even though we have a proposal [1] of an API style we were working towards in Java and Python. I would rather re-open that discussion now about what we want that API to look like for our major features and work towards consistency (or not if there is a strong argument as to why some feature should have a different style).
1: https://s.apache.org/a-new-dofn On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles <k...@google.com.invalid> wrote: > +0.75 because I'd like to bring up invalid pipelines. > > I had proposed side inputs as parameters to DoFn in > https://s.apache.org/a-new-dofn (specifically at [1]) so the only place > they are specified is in the graph construction, making the DoFn more > reusable and errors impossible. I've actually been noodling my way towards > this in a branch :-) > > Eugene's proposal is a sort of converse, where the side inputs are values > captured in the closure and not parameters, yet the only place they are > specified is in the DoFn. > > I see no conflict between these two. It is very natural to have both the > capability to accept parameters and the ability to capture variables in the > closure. Supporting both is totally standard in up-to-date programming > languages. > > Today we have the worse of both worlds: PCollectionView behaves as > something captured in the closure/constructor, but must still be explicitly > wired up. > > But if I use PCollectionView.get() and have not wired it up in any way, > what happens? Just like today, you can try to .sideInput(...) a thing that > is not available. With side inputs as parameters, this is not possible. If > you want to treat them as captured in a closure, while avoiding errors, it > seems like you might need to do some low-level magic, like the > serialization-based detection that Luke has suggested before (there are > known downsides that we haven't explored, like overcapture). > > Kenn > > [1] > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Hm, I guess you're right - for outputs it could be indeed quite valuable > to > > output to them without plumbing (e.g. outputting errors). Could be done > > perhaps via TupleTag.output()? (assuming the same TupleTag can not be > > reused to tag multiple PCollection's) > > > > For now I sent a PR for side input support > > https://github.com/apache/beam/pull/3814 . > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik <lc...@google.com.invalid> > > wrote: > > > > > I disagree, state may not care where it is used as well since a person > > may > > > call a function which needs to store/retrieve state and instead of > having > > > the DoFn declare the StateSpec and then pass in the state > implementation > > > down into the function everywhere. Similarly for outputs, the internal > > > functions could take the TupleTag and request an output manager or take > > an > > > "output" reference which give functions the ability to produce output > > > directly without needing to pass everything that is needed to be output > > > back to the caller. > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > > > kirpic...@google.com.invalid> wrote: > > > > > > > Hm, I think of these things (state, side outputs etc.), only side > > inputs > > > > make sense to access in arbitrary user callbacks without explicit > > > knowledge > > > > of the surrounding transform - so only side inputs can be implicit > like > > > > this. > > > > > > > > Ultimately we'll probably end up removing ProcessContext, and keeping > > > only > > > > annotations (on fields / methods / parameters). In that world, a > field > > > > annotation could be used (like per my previous email) to statically > > > specify > > > > which side inputs will be needed - while the value could still be > > > accessed > > > > via .get(), just like state cells are accessed via .read() and > > .write(): > > > > i.e., #get() is not a new method of access. > > > > > > > > Overall, it seems like I should proceed with the idea. I filed > > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik <lc...@google.com.invalid > > > > > > wrote: > > > > > > > > > For API consistency reasons, it would be good if we did this > > > holistically > > > > > and expanded this approach to state, side outputs, ... so that a > > person > > > > can > > > > > always call Something.get() to return something that they can > access > > > > > implementation wise. It will be confusing for our users to have > many > > > > > variations in our style of how all these concepts are used > > > > (ProcessContext > > > > > / Annotations / #get()) > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > Also, I think my approach is compatible with annotations and > future > > > > > removal > > > > > > of .withSideInputs if we annotate a field: > > > > > > final PCollectionView<Foo> foo = ...; > > > > > > > > > > > > class MyDoFn { > > > > > > @SideInput > > > > > > PCollectionView<Foo> foo = foo; > > > > > > > > > > > > ...foo.get()... > > > > > > } > > > > > > > > > > > > We can extract the accessed views from the DoFn instance using > > > > > reflection. > > > > > > Still not compatible with lambdas, but compatible automatically > > with > > > > all > > > > > > anonymous classes. > > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > > kirpic...@google.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > I know this (annotations) is the pattern we were considering > for > > > side > > > > > > > inputs, but I no longer think it is the best way to access > them. > > > > > > > Annotations help getting rid of the .withSideInputs() call, but > > > this > > > > is > > > > > > > where their advantage ends. > > > > > > > > > > > > > > The advantages of the proposed approach are that it > automatically > > > > works > > > > > > > with all existing callback or lambda code. No need to further > > > develop > > > > > the > > > > > > > reflection machinery to support side input annotations - and > > > > especially > > > > > > to > > > > > > > support arbitrary user interfaces, no need to change existing > > > > > transforms, > > > > > > > no need for transform authors to even know that the machinery > > > exists > > > > to > > > > > > > make side inputs usable in their transforms (and no need for > > > authors > > > > to > > > > > > > think about whether or not they should support side inputs). > > > > > > > > > > > > > > Moreover, like Reuven says, annotations don't work with lambdas > > at > > > > all: > > > > > > > creating a lambda with a flexible set of annotation arguments > > > appears > > > > > to > > > > > > be > > > > > > > currently impossible, and even capturing the annotations on > > > arguments > > > > > of > > > > > > a > > > > > > > lambda is I believe also impossible because the Java compiler > > drops > > > > > them > > > > > > in > > > > > > > the generated class or method handle. > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > > > <lc...@google.com.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> I believe we should follow the pattern that state uses and > add a > > > > type > > > > > > >> annotation to link the side input definition to its usage > > > directly. > > > > > This > > > > > > >> would allow us to know that the side input was definitely > being > > > > > accessed > > > > > > >> and perform validation during graph construction for any used > > but > > > > > > >> unspecified side inputs. > > > > > > >> > > > > > > >> Code snippet: > > > > > > >> final PCollectionView<String> foo = pipeline.apply("fooName", > > > > > > >> Create.of("foo")).apply(View.<String>asSingleton()); > > > > > > >> PCollection<String> output = pipeline > > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > > >> .apply(MapElements.via( > > > > > > >> new SimpleFunction<Integer, String>() { > > > > > > >> @Override > > > > > > >> public String apply(Integer input, > > @SideInput("fooName") > > > > > > String > > > > > > >> fooValue) { > > > > > > >> return fooValue + " " + input; > > > > > > >> } > > > > > > >> }).withSideInputs(foo));* > > > > > > >> > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > > > > > >> kirpic...@google.com.invalid> wrote: > > > > > > >> > > > > > > >> > Sure, here's how a modified (passing) MapElements unit test > > > looks > > > > > > like, > > > > > > >> > with usage of side inputs: > > > > > > >> > > > > > > > >> > @Test > > > > > > >> > @Category(NeedsRunner.class) > > > > > > >> > public void testMapBasicWithSideInput() throws Exception { > > > > > > >> > * final PCollectionView<String> foo =* > > > > > > >> > * pipeline.apply("foo", > > > > > > >> > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > > >> > PCollection<String> output = pipeline > > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > > >> > .apply(MapElements.via( > > > > > > >> > new SimpleFunction<Integer, String>() { > > > > > > >> > @Override > > > > > > >> > public String apply(Integer input) { > > > > > > >> > return* foo.get() *+ " " + input; > > > > > > >> > } > > > > > > >> > }) > > > > > > >> > *.withSideInputs(foo));* > > > > > > >> > > > > > > > >> > PAssert.that(output).containsInAnyOrder("foo 1", "foo > 2", > > > > "foo > > > > > > 3"); > > > > > > >> > pipeline.run(); > > > > > > >> > } > > > > > > >> > > > > > > > >> > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > > > > <re...@google.com.invalid > > > > > > > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> > > Can you provide a code snippet showing how this would > look? > > > > > > >> > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > > > > > > >> > > kirpic...@google.com.invalid> wrote: > > > > > > >> > > > > > > > > >> > > > TL;DR Introduce method PCollectionView.get(), > implemented > > > as: > > > > > get > > > > > > >> > > > thread-local ProcessContext and do c.sideInput(this). > As a > > > > > result, > > > > > > >> any > > > > > > >> > > user > > > > > > >> > > > lambdas such as MapElements can use side inputs. > > > > > > >> > > > > > > > > > >> > > > Quite a few transforms have user-code callbacks or > > lambdas: > > > > > ParDo > > > > > > >> > (DoFn), > > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations classes in > > > > various > > > > > > IOs, > > > > > > >> > > > combine fns, the PollFn callback in Watch, etc. > > > > > > >> > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have built-in support > > for > > > > side > > > > > > >> > inputs; > > > > > > >> > > > for DynamicDestinations it is plumbed explicitly; others > > > don't > > > > > > have > > > > > > >> > > access > > > > > > >> > > > (e.g. you can't access side inputs from > > Map/FlatMapElements > > > > > > because > > > > > > >> > they > > > > > > >> > > > don't have a ProcessContext or any context for that > > matter). > > > > > > >> > > > > > > > > > >> > > > I think it's important to solve this, especially as > Java 8 > > > > > becomes > > > > > > >> > > people's > > > > > > >> > > > default choice: users will want to use side inputs in > > > > > > >> > > Map/FlatMapElements. > > > > > > >> > > > > > > > > > >> > > > It also appears to be quite easy to implement: > > > > > > >> > > > > > > > > > >> > > > Runner part: > > > > > > >> > > > - introduce a SideInputAccessor interface > > > > > > >> > > > - make .get() on a PCollectionView get it from a > > > thread-local > > > > > > >> > > > SideInputAccessor > > > > > > >> > > > - make runners set the thread-local SideInputAccessor > any > > > time > > > > > the > > > > > > >> > runner > > > > > > >> > > > is evaluating something in a context where side inputs > are > > > > > > >> available, > > > > > > >> > > e.g. > > > > > > >> > > > a ProcessElement method, or applying a CombineFn - the > set > > > of > > > > > such > > > > > > >> > places > > > > > > >> > > > will be quite small. I believe only runners (but not > > > > transforms) > > > > > > >> will > > > > > > >> > > ever > > > > > > >> > > > need to set this thread-local > > > > > > >> > > > > > > > > > >> > > > Transform part: > > > > > > >> > > > - Transforms that take user-code lambdas and want to let > > > them > > > > > > access > > > > > > >> > side > > > > > > >> > > > inputs still will need to be configurable with a method > > like > > > > > > >> > > > .withSideInputs(view1, view2...) and will need to plumb > > > those > > > > > down > > > > > > >> to > > > > > > >> > the > > > > > > >> > > > primitive DoFn's or CombineFn's - information on *which* > > > side > > > > > > inputs > > > > > > >> > will > > > > > > >> > > > be accessed, of course, still needs to be in the graph. > > > > > > >> > > > > > > > > > >> > > > I quickly prototyped this in direct runner and > > MapElements, > > > > and > > > > > it > > > > > > >> > worked > > > > > > >> > > > with no issues - I'm wondering if there's something > subtle > > > > that > > > > > > I'm > > > > > > >> > > > missing, or is it actually a good idea? > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >