Hm, I think of these things (state, side outputs etc.), only side inputs make sense to access in arbitrary user callbacks without explicit knowledge of the surrounding transform - so only side inputs can be implicit like this.
Ultimately we'll probably end up removing ProcessContext, and keeping only annotations (on fields / methods / parameters). In that world, a field annotation could be used (like per my previous email) to statically specify which side inputs will be needed - while the value could still be accessed via .get(), just like state cells are accessed via .read() and .write(): i.e., #get() is not a new method of access. Overall, it seems like I should proceed with the idea. I filed https://issues.apache.org/jira/browse/BEAM-2844. On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik <lc...@google.com.invalid> wrote: > For API consistency reasons, it would be good if we did this holistically > and expanded this approach to state, side outputs, ... so that a person can > always call Something.get() to return something that they can access > implementation wise. It will be confusing for our users to have many > variations in our style of how all these concepts are used (ProcessContext > / Annotations / #get()) > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Also, I think my approach is compatible with annotations and future > removal > > of .withSideInputs if we annotate a field: > > final PCollectionView<Foo> foo = ...; > > > > class MyDoFn { > > @SideInput > > PCollectionView<Foo> foo = foo; > > > > ...foo.get()... > > } > > > > We can extract the accessed views from the DoFn instance using > reflection. > > Still not compatible with lambdas, but compatible automatically with all > > anonymous classes. > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov <kirpic...@google.com> > > wrote: > > > > > Hi Luke, > > > > > > I know this (annotations) is the pattern we were considering for side > > > inputs, but I no longer think it is the best way to access them. > > > Annotations help getting rid of the .withSideInputs() call, but this is > > > where their advantage ends. > > > > > > The advantages of the proposed approach are that it automatically works > > > with all existing callback or lambda code. No need to further develop > the > > > reflection machinery to support side input annotations - and especially > > to > > > support arbitrary user interfaces, no need to change existing > transforms, > > > no need for transform authors to even know that the machinery exists to > > > make side inputs usable in their transforms (and no need for authors to > > > think about whether or not they should support side inputs). > > > > > > Moreover, like Reuven says, annotations don't work with lambdas at all: > > > creating a lambda with a flexible set of annotation arguments appears > to > > be > > > currently impossible, and even capturing the annotations on arguments > of > > a > > > lambda is I believe also impossible because the Java compiler drops > them > > in > > > the generated class or method handle. > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik <lc...@google.com.invalid> > > > wrote: > > > > > >> I believe we should follow the pattern that state uses and add a type > > >> annotation to link the side input definition to its usage directly. > This > > >> would allow us to know that the side input was definitely being > accessed > > >> and perform validation during graph construction for any used but > > >> unspecified side inputs. > > >> > > >> Code snippet: > > >> final PCollectionView<String> foo = pipeline.apply("fooName", > > >> Create.of("foo")).apply(View.<String>asSingleton()); > > >> PCollection<String> output = pipeline > > >> .apply(Create.of(1, 2, 3)) > > >> .apply(MapElements.via( > > >> new SimpleFunction<Integer, String>() { > > >> @Override > > >> public String apply(Integer input, @SideInput("fooName") > > String > > >> fooValue) { > > >> return fooValue + " " + input; > > >> } > > >> }).withSideInputs(foo));* > > >> > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > >> kirpic...@google.com.invalid> wrote: > > >> > > >> > Sure, here's how a modified (passing) MapElements unit test looks > > like, > > >> > with usage of side inputs: > > >> > > > >> > @Test > > >> > @Category(NeedsRunner.class) > > >> > public void testMapBasicWithSideInput() throws Exception { > > >> > * final PCollectionView<String> foo =* > > >> > * pipeline.apply("foo", > > >> > Create.of("foo")).apply(View.<String>asSingleton());* > > >> > PCollection<String> output = pipeline > > >> > .apply(Create.of(1, 2, 3)) > > >> > .apply(MapElements.via( > > >> > new SimpleFunction<Integer, String>() { > > >> > @Override > > >> > public String apply(Integer input) { > > >> > return* foo.get() *+ " " + input; > > >> > } > > >> > }) > > >> > *.withSideInputs(foo));* > > >> > > > >> > PAssert.that(output).containsInAnyOrder("foo 1", "foo 2", "foo > > 3"); > > >> > pipeline.run(); > > >> > } > > >> > > > >> > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax <re...@google.com.invalid > > > > >> > wrote: > > >> > > > >> > > Can you provide a code snippet showing how this would look? > > >> > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > > >> > > kirpic...@google.com.invalid> wrote: > > >> > > > > >> > > > TL;DR Introduce method PCollectionView.get(), implemented as: > get > > >> > > > thread-local ProcessContext and do c.sideInput(this). As a > result, > > >> any > > >> > > user > > >> > > > lambdas such as MapElements can use side inputs. > > >> > > > > > >> > > > Quite a few transforms have user-code callbacks or lambdas: > ParDo > > >> > (DoFn), > > >> > > > Map/FlatMapElements, the DynamicDestinations classes in various > > IOs, > > >> > > > combine fns, the PollFn callback in Watch, etc. > > >> > > > > > >> > > > Of these, only DoFn and CombineFn have built-in support for side > > >> > inputs; > > >> > > > for DynamicDestinations it is plumbed explicitly; others don't > > have > > >> > > access > > >> > > > (e.g. you can't access side inputs from Map/FlatMapElements > > because > > >> > they > > >> > > > don't have a ProcessContext or any context for that matter). > > >> > > > > > >> > > > I think it's important to solve this, especially as Java 8 > becomes > > >> > > people's > > >> > > > default choice: users will want to use side inputs in > > >> > > Map/FlatMapElements. > > >> > > > > > >> > > > It also appears to be quite easy to implement: > > >> > > > > > >> > > > Runner part: > > >> > > > - introduce a SideInputAccessor interface > > >> > > > - make .get() on a PCollectionView get it from a thread-local > > >> > > > SideInputAccessor > > >> > > > - make runners set the thread-local SideInputAccessor any time > the > > >> > runner > > >> > > > is evaluating something in a context where side inputs are > > >> available, > > >> > > e.g. > > >> > > > a ProcessElement method, or applying a CombineFn - the set of > such > > >> > places > > >> > > > will be quite small. I believe only runners (but not transforms) > > >> will > > >> > > ever > > >> > > > need to set this thread-local > > >> > > > > > >> > > > Transform part: > > >> > > > - Transforms that take user-code lambdas and want to let them > > access > > >> > side > > >> > > > inputs still will need to be configurable with a method like > > >> > > > .withSideInputs(view1, view2...) and will need to plumb those > down > > >> to > > >> > the > > >> > > > primitive DoFn's or CombineFn's - information on *which* side > > inputs > > >> > will > > >> > > > be accessed, of course, still needs to be in the graph. > > >> > > > > > >> > > > I quickly prototyped this in direct runner and MapElements, and > it > > >> > worked > > >> > > > with no issues - I'm wondering if there's something subtle that > > I'm > > >> > > > missing, or is it actually a good idea? > > >> > > > > > >> > > > > >> > > > >> > > > > > >