Sure, here's how a modified (passing) MapElements unit test looks like, with usage of side inputs:
@Test @Category(NeedsRunner.class) public void testMapBasicWithSideInput() throws Exception { * final PCollectionView<String> foo =* * pipeline.apply("foo", Create.of("foo")).apply(View.<String>asSingleton());* PCollection<String> output = pipeline .apply(Create.of(1, 2, 3)) .apply(MapElements.via( new SimpleFunction<Integer, String>() { @Override public String apply(Integer input) { return* foo.get() *+ " " + input; } }) *.withSideInputs(foo));* PAssert.that(output).containsInAnyOrder("foo 1", "foo 2", "foo 3"); pipeline.run(); } On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax <re...@google.com.invalid> wrote: > Can you provide a code snippet showing how this would look? > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > TL;DR Introduce method PCollectionView.get(), implemented as: get > > thread-local ProcessContext and do c.sideInput(this). As a result, any > user > > lambdas such as MapElements can use side inputs. > > > > Quite a few transforms have user-code callbacks or lambdas: ParDo (DoFn), > > Map/FlatMapElements, the DynamicDestinations classes in various IOs, > > combine fns, the PollFn callback in Watch, etc. > > > > Of these, only DoFn and CombineFn have built-in support for side inputs; > > for DynamicDestinations it is plumbed explicitly; others don't have > access > > (e.g. you can't access side inputs from Map/FlatMapElements because they > > don't have a ProcessContext or any context for that matter). > > > > I think it's important to solve this, especially as Java 8 becomes > people's > > default choice: users will want to use side inputs in > Map/FlatMapElements. > > > > It also appears to be quite easy to implement: > > > > Runner part: > > - introduce a SideInputAccessor interface > > - make .get() on a PCollectionView get it from a thread-local > > SideInputAccessor > > - make runners set the thread-local SideInputAccessor any time the runner > > is evaluating something in a context where side inputs are available, > e.g. > > a ProcessElement method, or applying a CombineFn - the set of such places > > will be quite small. I believe only runners (but not transforms) will > ever > > need to set this thread-local > > > > Transform part: > > - Transforms that take user-code lambdas and want to let them access side > > inputs still will need to be configurable with a method like > > .withSideInputs(view1, view2...) and will need to plumb those down to the > > primitive DoFn's or CombineFn's - information on *which* side inputs will > > be accessed, of course, still needs to be in the graph. > > > > I quickly prototyped this in direct runner and MapElements, and it worked > > with no issues - I'm wondering if there's something subtle that I'm > > missing, or is it actually a good idea? > > >