Hm, I think of these things (state, side outputs etc.), only side inputs
make sense to access in arbitrary user callbacks without explicit knowledge
of the surrounding transform - so only side inputs can be implicit like
this.

Ultimately we'll probably end up removing ProcessContext, and keeping only
annotations (on fields / methods / parameters). In that world, a field
annotation could be used (like per my previous email) to statically specify
which side inputs will be needed - while the value could still be accessed
via .get(), just like state cells are accessed via .read() and .write():
i.e., #get() is not a new method of access.

Overall, it seems like I should proceed with the idea. I filed
https://issues.apache.org/jira/browse/BEAM-2844.

On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik <lc...@google.com.invalid> wrote:

> For API consistency reasons, it would be good if we did this holistically
> and expanded this approach to state, side outputs, ... so that a person can
> always call Something.get() to return something that they can access
> implementation wise. It will be confusing for our users to have many
> variations in our style of how all these concepts are used (ProcessContext
> / Annotations / #get())
>
> On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Also, I think my approach is compatible with annotations and future
> removal
> > of .withSideInputs if we annotate a field:
> > final PCollectionView<Foo> foo = ...;
> >
> > class MyDoFn {
> >   @SideInput
> >   PCollectionView<Foo> foo = foo;
> >
> >   ...foo.get()...
> > }
> >
> > We can extract the accessed views from the DoFn instance using
> reflection.
> > Still not compatible with lambdas, but compatible automatically with all
> > anonymous classes.
> >
> > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov <kirpic...@google.com>
> > wrote:
> >
> > > Hi Luke,
> > >
> > > I know this (annotations) is the pattern we were considering for side
> > > inputs, but I no longer think it is the best way to access them.
> > > Annotations help getting rid of the .withSideInputs() call, but this is
> > > where their advantage ends.
> > >
> > > The advantages of the proposed approach are that it automatically works
> > > with all existing callback or lambda code. No need to further develop
> the
> > > reflection machinery to support side input annotations - and especially
> > to
> > > support arbitrary user interfaces, no need to change existing
> transforms,
> > > no need for transform authors to even know that the machinery exists to
> > > make side inputs usable in their transforms (and no need for authors to
> > > think about whether or not they should support side inputs).
> > >
> > > Moreover, like Reuven says, annotations don't work with lambdas at all:
> > > creating a lambda with a flexible set of annotation arguments appears
> to
> > be
> > > currently impossible, and even capturing the annotations on arguments
> of
> > a
> > > lambda is I believe also impossible because the Java compiler drops
> them
> > in
> > > the generated class or method handle.
> > >
> > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik <lc...@google.com.invalid>
> > > wrote:
> > >
> > >> I believe we should follow the pattern that state uses and add a type
> > >> annotation to link the side input definition to its usage directly.
> This
> > >> would allow us to know that the side input was definitely being
> accessed
> > >> and perform validation during graph construction for any used but
> > >> unspecified side inputs.
> > >>
> > >> Code snippet:
> > >> final PCollectionView<String> foo = pipeline.apply("fooName",
> > >> Create.of("foo")).apply(View.<String>asSingleton());
> > >> PCollection<String> output = pipeline
> > >>     .apply(Create.of(1, 2, 3))
> > >>     .apply(MapElements.via(
> > >>         new SimpleFunction<Integer, String>() {
> > >>           @Override
> > >>           public String apply(Integer input, @SideInput("fooName")
> > String
> > >> fooValue) {
> > >>             return fooValue + " " + input;
> > >>           }
> > >>         }).withSideInputs(foo));*
> > >>
> > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov <
> > >> kirpic...@google.com.invalid> wrote:
> > >>
> > >> > Sure, here's how a modified (passing) MapElements unit test looks
> > like,
> > >> > with usage of side inputs:
> > >> >
> > >> >   @Test
> > >> >   @Category(NeedsRunner.class)
> > >> >   public void testMapBasicWithSideInput() throws Exception {
> > >> >    * final PCollectionView<String> foo =*
> > >> > *        pipeline.apply("foo",
> > >> > Create.of("foo")).apply(View.<String>asSingleton());*
> > >> >     PCollection<String> output = pipeline
> > >> >         .apply(Create.of(1, 2, 3))
> > >> >         .apply(MapElements.via(
> > >> >             new SimpleFunction<Integer, String>() {
> > >> >               @Override
> > >> >               public String apply(Integer input) {
> > >> >                 return* foo.get() *+ " " + input;
> > >> >               }
> > >> >             })
> > >> >         *.withSideInputs(foo));*
> > >> >
> > >> >     PAssert.that(output).containsInAnyOrder("foo 1", "foo 2", "foo
> > 3");
> > >> >     pipeline.run();
> > >> >   }
> > >> >
> > >> >
> > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax <re...@google.com.invalid
> >
> > >> > wrote:
> > >> >
> > >> > > Can you provide a code snippet showing how this would look?
> > >> > >
> > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov <
> > >> > > kirpic...@google.com.invalid> wrote:
> > >> > >
> > >> > > > TL;DR Introduce method PCollectionView.get(), implemented as:
> get
> > >> > > > thread-local ProcessContext and do c.sideInput(this). As a
> result,
> > >> any
> > >> > > user
> > >> > > > lambdas such as MapElements can use side inputs.
> > >> > > >
> > >> > > > Quite a few transforms have user-code callbacks or lambdas:
> ParDo
> > >> > (DoFn),
> > >> > > > Map/FlatMapElements, the DynamicDestinations classes in various
> > IOs,
> > >> > > > combine fns, the PollFn callback in Watch, etc.
> > >> > > >
> > >> > > > Of these, only DoFn and CombineFn have built-in support for side
> > >> > inputs;
> > >> > > > for DynamicDestinations it is plumbed explicitly; others don't
> > have
> > >> > > access
> > >> > > > (e.g. you can't access side inputs from Map/FlatMapElements
> > because
> > >> > they
> > >> > > > don't have a ProcessContext or any context for that matter).
> > >> > > >
> > >> > > > I think it's important to solve this, especially as Java 8
> becomes
> > >> > > people's
> > >> > > > default choice: users will want to use side inputs in
> > >> > > Map/FlatMapElements.
> > >> > > >
> > >> > > > It also appears to be quite easy to implement:
> > >> > > >
> > >> > > > Runner part:
> > >> > > > - introduce a SideInputAccessor interface
> > >> > > > - make .get() on a PCollectionView get it from a thread-local
> > >> > > > SideInputAccessor
> > >> > > > - make runners set the thread-local SideInputAccessor any time
> the
> > >> > runner
> > >> > > > is evaluating something in a context where side inputs are
> > >> available,
> > >> > > e.g.
> > >> > > > a ProcessElement method, or applying a CombineFn - the set of
> such
> > >> > places
> > >> > > > will be quite small. I believe only runners (but not transforms)
> > >> will
> > >> > > ever
> > >> > > > need to set this thread-local
> > >> > > >
> > >> > > > Transform part:
> > >> > > > - Transforms that take user-code lambdas and want to let them
> > access
> > >> > side
> > >> > > > inputs still will need to be configurable with a method like
> > >> > > > .withSideInputs(view1, view2...) and will need to plumb those
> down
> > >> to
> > >> > the
> > >> > > > primitive DoFn's or CombineFn's - information on *which* side
> > inputs
> > >> > will
> > >> > > > be accessed, of course, still needs to be in the graph.
> > >> > > >
> > >> > > > I quickly prototyped this in direct runner and MapElements, and
> it
> > >> > worked
> > >> > > > with no issues - I'm wondering if there's something subtle that
> > I'm
> > >> > > > missing, or is it actually a good idea?
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>

Reply via email to