TL;DR Introduce method PCollectionView.get(), implemented as: get
thread-local ProcessContext and do c.sideInput(this). As a result, any user
lambdas such as MapElements can use side inputs.

Quite a few transforms have user-code callbacks or lambdas: ParDo (DoFn),
Map/FlatMapElements, the DynamicDestinations classes in various IOs,
combine fns, the PollFn callback in Watch, etc.

Of these, only DoFn and CombineFn have built-in support for side inputs;
for DynamicDestinations it is plumbed explicitly; others don't have access
(e.g. you can't access side inputs from Map/FlatMapElements because they
don't have a ProcessContext or any context for that matter).

I think it's important to solve this, especially as Java 8 becomes people's
default choice: users will want to use side inputs in Map/FlatMapElements.

It also appears to be quite easy to implement:

Runner part:
- introduce a SideInputAccessor interface
- make .get() on a PCollectionView get it from a thread-local
SideInputAccessor
- make runners set the thread-local SideInputAccessor any time the runner
is evaluating something in a context where side inputs are available, e.g.
a ProcessElement method, or applying a CombineFn - the set of such places
will be quite small. I believe only runners (but not transforms) will ever
need to set this thread-local

Transform part:
- Transforms that take user-code lambdas and want to let them access side
inputs still will need to be configurable with a method like
.withSideInputs(view1, view2...) and will need to plumb those down to the
primitive DoFn's or CombineFn's - information on *which* side inputs will
be accessed, of course, still needs to be in the graph.

I quickly prototyped this in direct runner and MapElements, and it worked
with no issues - I'm wondering if there's something subtle that I'm
missing, or is it actually a good idea?

Reply via email to