I disagree, state may not care where it is used as well since a person may
call a function which needs to store/retrieve state and instead of having
the DoFn declare the StateSpec and then pass in the state implementation
down into the function everywhere. Similarly for outputs, the internal
functions could take the TupleTag and request an output manager or take an
"output" reference which give functions the ability to produce output
directly without needing to pass everything that is needed to be output
back to the caller.

On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov <
kirpic...@google.com.invalid> wrote:

> Hm, I think of these things (state, side outputs etc.), only side inputs
> make sense to access in arbitrary user callbacks without explicit knowledge
> of the surrounding transform - so only side inputs can be implicit like
> this.
>
> Ultimately we'll probably end up removing ProcessContext, and keeping only
> annotations (on fields / methods / parameters). In that world, a field
> annotation could be used (like per my previous email) to statically specify
> which side inputs will be needed - while the value could still be accessed
> via .get(), just like state cells are accessed via .read() and .write():
> i.e., #get() is not a new method of access.
>
> Overall, it seems like I should proceed with the idea. I filed
> https://issues.apache.org/jira/browse/BEAM-2844.
>
> On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > For API consistency reasons, it would be good if we did this holistically
> > and expanded this approach to state, side outputs, ... so that a person
> can
> > always call Something.get() to return something that they can access
> > implementation wise. It will be confusing for our users to have many
> > variations in our style of how all these concepts are used
> (ProcessContext
> > / Annotations / #get())
> >
> > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Also, I think my approach is compatible with annotations and future
> > removal
> > > of .withSideInputs if we annotate a field:
> > > final PCollectionView<Foo> foo = ...;
> > >
> > > class MyDoFn {
> > >   @SideInput
> > >   PCollectionView<Foo> foo = foo;
> > >
> > >   ...foo.get()...
> > > }
> > >
> > > We can extract the accessed views from the DoFn instance using
> > reflection.
> > > Still not compatible with lambdas, but compatible automatically with
> all
> > > anonymous classes.
> > >
> > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov <kirpic...@google.com>
> > > wrote:
> > >
> > > > Hi Luke,
> > > >
> > > > I know this (annotations) is the pattern we were considering for side
> > > > inputs, but I no longer think it is the best way to access them.
> > > > Annotations help getting rid of the .withSideInputs() call, but this
> is
> > > > where their advantage ends.
> > > >
> > > > The advantages of the proposed approach are that it automatically
> works
> > > > with all existing callback or lambda code. No need to further develop
> > the
> > > > reflection machinery to support side input annotations - and
> especially
> > > to
> > > > support arbitrary user interfaces, no need to change existing
> > transforms,
> > > > no need for transform authors to even know that the machinery exists
> to
> > > > make side inputs usable in their transforms (and no need for authors
> to
> > > > think about whether or not they should support side inputs).
> > > >
> > > > Moreover, like Reuven says, annotations don't work with lambdas at
> all:
> > > > creating a lambda with a flexible set of annotation arguments appears
> > to
> > > be
> > > > currently impossible, and even capturing the annotations on arguments
> > of
> > > a
> > > > lambda is I believe also impossible because the Java compiler drops
> > them
> > > in
> > > > the generated class or method handle.
> > > >
> > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik <lc...@google.com.invalid
> >
> > > > wrote:
> > > >
> > > >> I believe we should follow the pattern that state uses and add a
> type
> > > >> annotation to link the side input definition to its usage directly.
> > This
> > > >> would allow us to know that the side input was definitely being
> > accessed
> > > >> and perform validation during graph construction for any used but
> > > >> unspecified side inputs.
> > > >>
> > > >> Code snippet:
> > > >> final PCollectionView<String> foo = pipeline.apply("fooName",
> > > >> Create.of("foo")).apply(View.<String>asSingleton());
> > > >> PCollection<String> output = pipeline
> > > >>     .apply(Create.of(1, 2, 3))
> > > >>     .apply(MapElements.via(
> > > >>         new SimpleFunction<Integer, String>() {
> > > >>           @Override
> > > >>           public String apply(Integer input, @SideInput("fooName")
> > > String
> > > >> fooValue) {
> > > >>             return fooValue + " " + input;
> > > >>           }
> > > >>         }).withSideInputs(foo));*
> > > >>
> > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov <
> > > >> kirpic...@google.com.invalid> wrote:
> > > >>
> > > >> > Sure, here's how a modified (passing) MapElements unit test looks
> > > like,
> > > >> > with usage of side inputs:
> > > >> >
> > > >> >   @Test
> > > >> >   @Category(NeedsRunner.class)
> > > >> >   public void testMapBasicWithSideInput() throws Exception {
> > > >> >    * final PCollectionView<String> foo =*
> > > >> > *        pipeline.apply("foo",
> > > >> > Create.of("foo")).apply(View.<String>asSingleton());*
> > > >> >     PCollection<String> output = pipeline
> > > >> >         .apply(Create.of(1, 2, 3))
> > > >> >         .apply(MapElements.via(
> > > >> >             new SimpleFunction<Integer, String>() {
> > > >> >               @Override
> > > >> >               public String apply(Integer input) {
> > > >> >                 return* foo.get() *+ " " + input;
> > > >> >               }
> > > >> >             })
> > > >> >         *.withSideInputs(foo));*
> > > >> >
> > > >> >     PAssert.that(output).containsInAnyOrder("foo 1", "foo 2",
> "foo
> > > 3");
> > > >> >     pipeline.run();
> > > >> >   }
> > > >> >
> > > >> >
> > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax
> <re...@google.com.invalid
> > >
> > > >> > wrote:
> > > >> >
> > > >> > > Can you provide a code snippet showing how this would look?
> > > >> > >
> > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov <
> > > >> > > kirpic...@google.com.invalid> wrote:
> > > >> > >
> > > >> > > > TL;DR Introduce method PCollectionView.get(), implemented as:
> > get
> > > >> > > > thread-local ProcessContext and do c.sideInput(this). As a
> > result,
> > > >> any
> > > >> > > user
> > > >> > > > lambdas such as MapElements can use side inputs.
> > > >> > > >
> > > >> > > > Quite a few transforms have user-code callbacks or lambdas:
> > ParDo
> > > >> > (DoFn),
> > > >> > > > Map/FlatMapElements, the DynamicDestinations classes in
> various
> > > IOs,
> > > >> > > > combine fns, the PollFn callback in Watch, etc.
> > > >> > > >
> > > >> > > > Of these, only DoFn and CombineFn have built-in support for
> side
> > > >> > inputs;
> > > >> > > > for DynamicDestinations it is plumbed explicitly; others don't
> > > have
> > > >> > > access
> > > >> > > > (e.g. you can't access side inputs from Map/FlatMapElements
> > > because
> > > >> > they
> > > >> > > > don't have a ProcessContext or any context for that matter).
> > > >> > > >
> > > >> > > > I think it's important to solve this, especially as Java 8
> > becomes
> > > >> > > people's
> > > >> > > > default choice: users will want to use side inputs in
> > > >> > > Map/FlatMapElements.
> > > >> > > >
> > > >> > > > It also appears to be quite easy to implement:
> > > >> > > >
> > > >> > > > Runner part:
> > > >> > > > - introduce a SideInputAccessor interface
> > > >> > > > - make .get() on a PCollectionView get it from a thread-local
> > > >> > > > SideInputAccessor
> > > >> > > > - make runners set the thread-local SideInputAccessor any time
> > the
> > > >> > runner
> > > >> > > > is evaluating something in a context where side inputs are
> > > >> available,
> > > >> > > e.g.
> > > >> > > > a ProcessElement method, or applying a CombineFn - the set of
> > such
> > > >> > places
> > > >> > > > will be quite small. I believe only runners (but not
> transforms)
> > > >> will
> > > >> > > ever
> > > >> > > > need to set this thread-local
> > > >> > > >
> > > >> > > > Transform part:
> > > >> > > > - Transforms that take user-code lambdas and want to let them
> > > access
> > > >> > side
> > > >> > > > inputs still will need to be configurable with a method like
> > > >> > > > .withSideInputs(view1, view2...) and will need to plumb those
> > down
> > > >> to
> > > >> > the
> > > >> > > > primitive DoFn's or CombineFn's - information on *which* side
> > > inputs
> > > >> > will
> > > >> > > > be accessed, of course, still needs to be in the graph.
> > > >> > > >
> > > >> > > > I quickly prototyped this in direct runner and MapElements,
> and
> > it
> > > >> > worked
> > > >> > > > with no issues - I'm wondering if there's something subtle
> that
> > > I'm
> > > >> > > > missing, or is it actually a good idea?
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to