Sure, here's how a modified (passing) MapElements unit test looks like,
with usage of side inputs:

  @Test
  @Category(NeedsRunner.class)
  public void testMapBasicWithSideInput() throws Exception {
   * final PCollectionView<String> foo =*
*        pipeline.apply("foo",
Create.of("foo")).apply(View.<String>asSingleton());*
    PCollection<String> output = pipeline
        .apply(Create.of(1, 2, 3))
        .apply(MapElements.via(
            new SimpleFunction<Integer, String>() {
              @Override
              public String apply(Integer input) {
                return* foo.get() *+ " " + input;
              }
            })
        *.withSideInputs(foo));*

    PAssert.that(output).containsInAnyOrder("foo 1", "foo 2", "foo 3");
    pipeline.run();
  }


On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax <re...@google.com.invalid> wrote:

> Can you provide a code snippet showing how this would look?
>
> On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > TL;DR Introduce method PCollectionView.get(), implemented as: get
> > thread-local ProcessContext and do c.sideInput(this). As a result, any
> user
> > lambdas such as MapElements can use side inputs.
> >
> > Quite a few transforms have user-code callbacks or lambdas: ParDo (DoFn),
> > Map/FlatMapElements, the DynamicDestinations classes in various IOs,
> > combine fns, the PollFn callback in Watch, etc.
> >
> > Of these, only DoFn and CombineFn have built-in support for side inputs;
> > for DynamicDestinations it is plumbed explicitly; others don't have
> access
> > (e.g. you can't access side inputs from Map/FlatMapElements because they
> > don't have a ProcessContext or any context for that matter).
> >
> > I think it's important to solve this, especially as Java 8 becomes
> people's
> > default choice: users will want to use side inputs in
> Map/FlatMapElements.
> >
> > It also appears to be quite easy to implement:
> >
> > Runner part:
> > - introduce a SideInputAccessor interface
> > - make .get() on a PCollectionView get it from a thread-local
> > SideInputAccessor
> > - make runners set the thread-local SideInputAccessor any time the runner
> > is evaluating something in a context where side inputs are available,
> e.g.
> > a ProcessElement method, or applying a CombineFn - the set of such places
> > will be quite small. I believe only runners (but not transforms) will
> ever
> > need to set this thread-local
> >
> > Transform part:
> > - Transforms that take user-code lambdas and want to let them access side
> > inputs still will need to be configurable with a method like
> > .withSideInputs(view1, view2...) and will need to plumb those down to the
> > primitive DoFn's or CombineFn's - information on *which* side inputs will
> > be accessed, of course, still needs to be in the graph.
> >
> > I quickly prototyped this in direct runner and MapElements, and it worked
> > with no issues - I'm wondering if there's something subtle that I'm
> > missing, or is it actually a good idea?
> >
>

Reply via email to