Hi Luke, I think my proposal is very similar in style to the one in A New DoFn <https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8jsWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko>, so I disagree that it is introducing a divergence from that discussion. That code proposes:
class MyNewDoFn extends NewDoFn<Foo, Baz> { @ProcessElement public void process( ProcessContext context, @SideInput Supplier<Frob> frob, @SideInput Supplier<Bizzle> bizzle) { … frob.get() … bizzle.get() … } } PCollection<Foo> input = … (and it still proposes .withSideInputs() for wiring) The similarities are: - Using @SideInput to declare that something is a side input (this code proposes that on a parameter, my code proposes it on a field, because fields allow retaining a compile-time fixed signature for interface methods - less of a concern for DoFn, but big concern for other user callbacks, which are not considered in that proposal) - Accessing via .get() - the code proposes using a Supplier, but I don't think that's materially different from calling .get() on the PCollectionView directly. The differences are: - The proposal in the doc allows wiring different side inputs to the same Supplier, but I'm not convinced that this is important - you can just as easily call the constructor of your DoFn passing different PCollectionView's for it to capture. - My proposal allows getting rid of .withSideInputs() entirely, because the DoFn captures the PCollectionView so you don't need to specify it explicitly for wiring. On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik <lc...@google.com.invalid> wrote: > My concern with the proposal is not the specifics of how it will work and > more about it being yet another way on how our API is to be used even > though we have a proposal [1] of an API style we were working towards in > Java and Python. I would rather re-open that discussion now about what we > want that API to look like for our major features and work towards > consistency (or not if there is a strong argument as to why some feature > should have a different style). > > 1: https://s.apache.org/a-new-dofn > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles <k...@google.com.invalid> > wrote: > > > +0.75 because I'd like to bring up invalid pipelines. > > > > I had proposed side inputs as parameters to DoFn in > > https://s.apache.org/a-new-dofn (specifically at [1]) so the only place > > they are specified is in the graph construction, making the DoFn more > > reusable and errors impossible. I've actually been noodling my way > towards > > this in a branch :-) > > > > Eugene's proposal is a sort of converse, where the side inputs are values > > captured in the closure and not parameters, yet the only place they are > > specified is in the DoFn. > > > > I see no conflict between these two. It is very natural to have both the > > capability to accept parameters and the ability to capture variables in > the > > closure. Supporting both is totally standard in up-to-date programming > > languages. > > > > Today we have the worse of both worlds: PCollectionView behaves as > > something captured in the closure/constructor, but must still be > explicitly > > wired up. > > > > But if I use PCollectionView.get() and have not wired it up in any way, > > what happens? Just like today, you can try to .sideInput(...) a thing > that > > is not available. With side inputs as parameters, this is not possible. > If > > you want to treat them as captured in a closure, while avoiding errors, > it > > seems like you might need to do some low-level magic, like the > > serialization-based detection that Luke has suggested before (there are > > known downsides that we haven't explored, like overcapture). > > > > Kenn > > > > [1] > > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < > > kirpic...@google.com.invalid> wrote: > > > > > Hm, I guess you're right - for outputs it could be indeed quite > valuable > > to > > > output to them without plumbing (e.g. outputting errors). Could be done > > > perhaps via TupleTag.output()? (assuming the same TupleTag can not be > > > reused to tag multiple PCollection's) > > > > > > For now I sent a PR for side input support > > > https://github.com/apache/beam/pull/3814 . > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik <lc...@google.com.invalid> > > > wrote: > > > > > > > I disagree, state may not care where it is used as well since a > person > > > may > > > > call a function which needs to store/retrieve state and instead of > > having > > > > the DoFn declare the StateSpec and then pass in the state > > implementation > > > > down into the function everywhere. Similarly for outputs, the > internal > > > > functions could take the TupleTag and request an output manager or > take > > > an > > > > "output" reference which give functions the ability to produce output > > > > directly without needing to pass everything that is needed to be > output > > > > back to the caller. > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > Hm, I think of these things (state, side outputs etc.), only side > > > inputs > > > > > make sense to access in arbitrary user callbacks without explicit > > > > knowledge > > > > > of the surrounding transform - so only side inputs can be implicit > > like > > > > > this. > > > > > > > > > > Ultimately we'll probably end up removing ProcessContext, and > keeping > > > > only > > > > > annotations (on fields / methods / parameters). In that world, a > > field > > > > > annotation could be used (like per my previous email) to statically > > > > specify > > > > > which side inputs will be needed - while the value could still be > > > > accessed > > > > > via .get(), just like state cells are accessed via .read() and > > > .write(): > > > > > i.e., #get() is not a new method of access. > > > > > > > > > > Overall, it seems like I should proceed with the idea. I filed > > > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik > <lc...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > > > For API consistency reasons, it would be good if we did this > > > > holistically > > > > > > and expanded this approach to state, side outputs, ... so that a > > > person > > > > > can > > > > > > always call Something.get() to return something that they can > > access > > > > > > implementation wise. It will be confusing for our users to have > > many > > > > > > variations in our style of how all these concepts are used > > > > > (ProcessContext > > > > > > / Annotations / #get()) > > > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > > > > > > kirpic...@google.com.invalid> wrote: > > > > > > > > > > > > > Also, I think my approach is compatible with annotations and > > future > > > > > > removal > > > > > > > of .withSideInputs if we annotate a field: > > > > > > > final PCollectionView<Foo> foo = ...; > > > > > > > > > > > > > > class MyDoFn { > > > > > > > @SideInput > > > > > > > PCollectionView<Foo> foo = foo; > > > > > > > > > > > > > > ...foo.get()... > > > > > > > } > > > > > > > > > > > > > > We can extract the accessed views from the DoFn instance using > > > > > > reflection. > > > > > > > Still not compatible with lambdas, but compatible automatically > > > with > > > > > all > > > > > > > anonymous classes. > > > > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > > > kirpic...@google.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > > > > > I know this (annotations) is the pattern we were considering > > for > > > > side > > > > > > > > inputs, but I no longer think it is the best way to access > > them. > > > > > > > > Annotations help getting rid of the .withSideInputs() call, > but > > > > this > > > > > is > > > > > > > > where their advantage ends. > > > > > > > > > > > > > > > > The advantages of the proposed approach are that it > > automatically > > > > > works > > > > > > > > with all existing callback or lambda code. No need to further > > > > develop > > > > > > the > > > > > > > > reflection machinery to support side input annotations - and > > > > > especially > > > > > > > to > > > > > > > > support arbitrary user interfaces, no need to change existing > > > > > > transforms, > > > > > > > > no need for transform authors to even know that the machinery > > > > exists > > > > > to > > > > > > > > make side inputs usable in their transforms (and no need for > > > > authors > > > > > to > > > > > > > > think about whether or not they should support side inputs). > > > > > > > > > > > > > > > > Moreover, like Reuven says, annotations don't work with > lambdas > > > at > > > > > all: > > > > > > > > creating a lambda with a flexible set of annotation arguments > > > > appears > > > > > > to > > > > > > > be > > > > > > > > currently impossible, and even capturing the annotations on > > > > arguments > > > > > > of > > > > > > > a > > > > > > > > lambda is I believe also impossible because the Java compiler > > > drops > > > > > > them > > > > > > > in > > > > > > > > the generated class or method handle. > > > > > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > > > > <lc...@google.com.invalid > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> I believe we should follow the pattern that state uses and > > add a > > > > > type > > > > > > > >> annotation to link the side input definition to its usage > > > > directly. > > > > > > This > > > > > > > >> would allow us to know that the side input was definitely > > being > > > > > > accessed > > > > > > > >> and perform validation during graph construction for any > used > > > but > > > > > > > >> unspecified side inputs. > > > > > > > >> > > > > > > > >> Code snippet: > > > > > > > >> final PCollectionView<String> foo = > pipeline.apply("fooName", > > > > > > > >> Create.of("foo")).apply(View.<String>asSingleton()); > > > > > > > >> PCollection<String> output = pipeline > > > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > > > >> .apply(MapElements.via( > > > > > > > >> new SimpleFunction<Integer, String>() { > > > > > > > >> @Override > > > > > > > >> public String apply(Integer input, > > > @SideInput("fooName") > > > > > > > String > > > > > > > >> fooValue) { > > > > > > > >> return fooValue + " " + input; > > > > > > > >> } > > > > > > > >> }).withSideInputs(foo));* > > > > > > > >> > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > > > > > > >> kirpic...@google.com.invalid> wrote: > > > > > > > >> > > > > > > > >> > Sure, here's how a modified (passing) MapElements unit > test > > > > looks > > > > > > > like, > > > > > > > >> > with usage of side inputs: > > > > > > > >> > > > > > > > > >> > @Test > > > > > > > >> > @Category(NeedsRunner.class) > > > > > > > >> > public void testMapBasicWithSideInput() throws > Exception { > > > > > > > >> > * final PCollectionView<String> foo =* > > > > > > > >> > * pipeline.apply("foo", > > > > > > > >> > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > > > >> > PCollection<String> output = pipeline > > > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > > > >> > .apply(MapElements.via( > > > > > > > >> > new SimpleFunction<Integer, String>() { > > > > > > > >> > @Override > > > > > > > >> > public String apply(Integer input) { > > > > > > > >> > return* foo.get() *+ " " + input; > > > > > > > >> > } > > > > > > > >> > }) > > > > > > > >> > *.withSideInputs(foo));* > > > > > > > >> > > > > > > > > >> > PAssert.that(output).containsInAnyOrder("foo 1", "foo > > 2", > > > > > "foo > > > > > > > 3"); > > > > > > > >> > pipeline.run(); > > > > > > > >> > } > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > > > > > <re...@google.com.invalid > > > > > > > > > > > > > > >> > wrote: > > > > > > > >> > > > > > > > > >> > > Can you provide a code snippet showing how this would > > look? > > > > > > > >> > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > > > > > > > >> > > kirpic...@google.com.invalid> wrote: > > > > > > > >> > > > > > > > > > >> > > > TL;DR Introduce method PCollectionView.get(), > > implemented > > > > as: > > > > > > get > > > > > > > >> > > > thread-local ProcessContext and do c.sideInput(this). > > As a > > > > > > result, > > > > > > > >> any > > > > > > > >> > > user > > > > > > > >> > > > lambdas such as MapElements can use side inputs. > > > > > > > >> > > > > > > > > > > >> > > > Quite a few transforms have user-code callbacks or > > > lambdas: > > > > > > ParDo > > > > > > > >> > (DoFn), > > > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations classes > in > > > > > various > > > > > > > IOs, > > > > > > > >> > > > combine fns, the PollFn callback in Watch, etc. > > > > > > > >> > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have built-in > support > > > for > > > > > side > > > > > > > >> > inputs; > > > > > > > >> > > > for DynamicDestinations it is plumbed explicitly; > others > > > > don't > > > > > > > have > > > > > > > >> > > access > > > > > > > >> > > > (e.g. you can't access side inputs from > > > Map/FlatMapElements > > > > > > > because > > > > > > > >> > they > > > > > > > >> > > > don't have a ProcessContext or any context for that > > > matter). > > > > > > > >> > > > > > > > > > > >> > > > I think it's important to solve this, especially as > > Java 8 > > > > > > becomes > > > > > > > >> > > people's > > > > > > > >> > > > default choice: users will want to use side inputs in > > > > > > > >> > > Map/FlatMapElements. > > > > > > > >> > > > > > > > > > > >> > > > It also appears to be quite easy to implement: > > > > > > > >> > > > > > > > > > > >> > > > Runner part: > > > > > > > >> > > > - introduce a SideInputAccessor interface > > > > > > > >> > > > - make .get() on a PCollectionView get it from a > > > > thread-local > > > > > > > >> > > > SideInputAccessor > > > > > > > >> > > > - make runners set the thread-local SideInputAccessor > > any > > > > time > > > > > > the > > > > > > > >> > runner > > > > > > > >> > > > is evaluating something in a context where side inputs > > are > > > > > > > >> available, > > > > > > > >> > > e.g. > > > > > > > >> > > > a ProcessElement method, or applying a CombineFn - the > > set > > > > of > > > > > > such > > > > > > > >> > places > > > > > > > >> > > > will be quite small. I believe only runners (but not > > > > > transforms) > > > > > > > >> will > > > > > > > >> > > ever > > > > > > > >> > > > need to set this thread-local > > > > > > > >> > > > > > > > > > > >> > > > Transform part: > > > > > > > >> > > > - Transforms that take user-code lambdas and want to > let > > > > them > > > > > > > access > > > > > > > >> > side > > > > > > > >> > > > inputs still will need to be configurable with a > method > > > like > > > > > > > >> > > > .withSideInputs(view1, view2...) and will need to > plumb > > > > those > > > > > > down > > > > > > > >> to > > > > > > > >> > the > > > > > > > >> > > > primitive DoFn's or CombineFn's - information on > *which* > > > > side > > > > > > > inputs > > > > > > > >> > will > > > > > > > >> > > > be accessed, of course, still needs to be in the > graph. > > > > > > > >> > > > > > > > > > > >> > > > I quickly prototyped this in direct runner and > > > MapElements, > > > > > and > > > > > > it > > > > > > > >> > worked > > > > > > > >> > > > with no issues - I'm wondering if there's something > > subtle > > > > > that > > > > > > > I'm > > > > > > > >> > > > missing, or is it actually a good idea? > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >