Eugene's original email mentioned the use of lambdas, and I'm not sure
whether the parameter type-annotation will work in the lambda case. I do
think that type annotations are better in the other examples.

BTW, why use the annotation just for validation? If side inputs are
specified via parameter annotations, can't we use that to build the graph
and eliminate the need for withSideInputs entirely?

Reuven

On Tue, Sep 5, 2017 at 6:57 AM, Lukasz Cwik <lc...@google.com.invalid>
wrote:

> I believe we should follow the pattern that state uses and add a type
> annotation to link the side input definition to its usage directly. This
> would allow us to know that the side input was definitely being accessed
> and perform validation during graph construction for any used but
> unspecified side inputs.
>
> Code snippet:
> final PCollectionView<String> foo = pipeline.apply("fooName",
> Create.of("foo")).apply(View.<String>asSingleton());
> PCollection<String> output = pipeline
>     .apply(Create.of(1, 2, 3))
>     .apply(MapElements.via(
>         new SimpleFunction<Integer, String>() {
>           @Override
>           public String apply(Integer input, @SideInput("fooName") String
> fooValue) {
>             return fooValue + " " + input;
>           }
>         }).withSideInputs(foo));*
>
> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Sure, here's how a modified (passing) MapElements unit test looks like,
> > with usage of side inputs:
> >
> >   @Test
> >   @Category(NeedsRunner.class)
> >   public void testMapBasicWithSideInput() throws Exception {
> >    * final PCollectionView<String> foo =*
> > *        pipeline.apply("foo",
> > Create.of("foo")).apply(View.<String>asSingleton());*
> >     PCollection<String> output = pipeline
> >         .apply(Create.of(1, 2, 3))
> >         .apply(MapElements.via(
> >             new SimpleFunction<Integer, String>() {
> >               @Override
> >               public String apply(Integer input) {
> >                 return* foo.get() *+ " " + input;
> >               }
> >             })
> >         *.withSideInputs(foo));*
> >
> >     PAssert.that(output).containsInAnyOrder("foo 1", "foo 2", "foo 3");
> >     pipeline.run();
> >   }
> >
> >
> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax <re...@google.com.invalid>
> > wrote:
> >
> > > Can you provide a code snippet showing how this would look?
> > >
> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > > TL;DR Introduce method PCollectionView.get(), implemented as: get
> > > > thread-local ProcessContext and do c.sideInput(this). As a result,
> any
> > > user
> > > > lambdas such as MapElements can use side inputs.
> > > >
> > > > Quite a few transforms have user-code callbacks or lambdas: ParDo
> > (DoFn),
> > > > Map/FlatMapElements, the DynamicDestinations classes in various IOs,
> > > > combine fns, the PollFn callback in Watch, etc.
> > > >
> > > > Of these, only DoFn and CombineFn have built-in support for side
> > inputs;
> > > > for DynamicDestinations it is plumbed explicitly; others don't have
> > > access
> > > > (e.g. you can't access side inputs from Map/FlatMapElements because
> > they
> > > > don't have a ProcessContext or any context for that matter).
> > > >
> > > > I think it's important to solve this, especially as Java 8 becomes
> > > people's
> > > > default choice: users will want to use side inputs in
> > > Map/FlatMapElements.
> > > >
> > > > It also appears to be quite easy to implement:
> > > >
> > > > Runner part:
> > > > - introduce a SideInputAccessor interface
> > > > - make .get() on a PCollectionView get it from a thread-local
> > > > SideInputAccessor
> > > > - make runners set the thread-local SideInputAccessor any time the
> > runner
> > > > is evaluating something in a context where side inputs are available,
> > > e.g.
> > > > a ProcessElement method, or applying a CombineFn - the set of such
> > places
> > > > will be quite small. I believe only runners (but not transforms) will
> > > ever
> > > > need to set this thread-local
> > > >
> > > > Transform part:
> > > > - Transforms that take user-code lambdas and want to let them access
> > side
> > > > inputs still will need to be configurable with a method like
> > > > .withSideInputs(view1, view2...) and will need to plumb those down to
> > the
> > > > primitive DoFn's or CombineFn's - information on *which* side inputs
> > will
> > > > be accessed, of course, still needs to be in the graph.
> > > >
> > > > I quickly prototyped this in direct runner and MapElements, and it
> > worked
> > > > with no issues - I'm wondering if there's something subtle that I'm
> > > > missing, or is it actually a good idea?
> > > >
> > >
> >
>

Reply via email to