The scope of the side input proposal expanded to include discussion of state and outputs. I didn't want to pollute the other thread with this, but I do want to emphasize what is different about these.
PCollectionView: read-only, time-evolving, per window, automatic GC based on WindowMappingFn(s), independent of key and transform, with only data-dependency for happens-before. It is a value, basically. Adding a get() method with indirection to some mutated-into-place context might not be too bad. State: read/write, per transform+key+window, trivial concurrency control, automatic GC. It is a naive mutable field, basically. Instead of StateSpec separate from State, you could probably put the read/write methods on the declaration and hack a mutated-into-place context. A bit gross but maybe there's a usable API in that idea somewhere. Output: write-only, obviously comes from a particular transform. It is a control-inverted return value, basically. Omitted from the new DoFn proposal for simplicity, but make sense as intrinsic read-only fields of the DoFn if they are not dynamic. One could imagine designs in which a PCollection is just a Receiver/Consumer so you can capture it in your closure and write to it without plumbing (and likely with concurrency control) but that's entirely unexplored AFAIK. So I agree with Eugene's initial point that only side inputs make sense independent of transform, until more investigation has taken place. Kenn On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov < [email protected]> wrote: > Hm, I guess you're right - for outputs it could be indeed quite valuable to > output to them without plumbing (e.g. outputting errors). Could be done > perhaps via TupleTag.output()? (assuming the same TupleTag can not be > reused to tag multiple PCollection's) > > For now I sent a PR for side input support > https://github.com/apache/beam/pull/3814 . > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik <[email protected]> > wrote: > > > I disagree, state may not care where it is used as well since a person > may > > call a function which needs to store/retrieve state and instead of having > > the DoFn declare the StateSpec and then pass in the state implementation > > down into the function everywhere. Similarly for outputs, the internal > > functions could take the TupleTag and request an output manager or take > an > > "output" reference which give functions the ability to produce output > > directly without needing to pass everything that is needed to be output > > back to the caller. > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov < > > [email protected]> wrote: > > > > > Hm, I think of these things (state, side outputs etc.), only side > inputs > > > make sense to access in arbitrary user callbacks without explicit > > knowledge > > > of the surrounding transform - so only side inputs can be implicit like > > > this. > > > > > > Ultimately we'll probably end up removing ProcessContext, and keeping > > only > > > annotations (on fields / methods / parameters). In that world, a field > > > annotation could be used (like per my previous email) to statically > > specify > > > which side inputs will be needed - while the value could still be > > accessed > > > via .get(), just like state cells are accessed via .read() and > .write(): > > > i.e., #get() is not a new method of access. > > > > > > Overall, it seems like I should proceed with the idea. I filed > > > https://issues.apache.org/jira/browse/BEAM-2844. > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik <[email protected]> > > > wrote: > > > > > > > For API consistency reasons, it would be good if we did this > > holistically > > > > and expanded this approach to state, side outputs, ... so that a > person > > > can > > > > always call Something.get() to return something that they can access > > > > implementation wise. It will be confusing for our users to have many > > > > variations in our style of how all these concepts are used > > > (ProcessContext > > > > / Annotations / #get()) > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov < > > > > [email protected]> wrote: > > > > > > > > > Also, I think my approach is compatible with annotations and future > > > > removal > > > > > of .withSideInputs if we annotate a field: > > > > > final PCollectionView<Foo> foo = ...; > > > > > > > > > > class MyDoFn { > > > > > @SideInput > > > > > PCollectionView<Foo> foo = foo; > > > > > > > > > > ...foo.get()... > > > > > } > > > > > > > > > > We can extract the accessed views from the DoFn instance using > > > > reflection. > > > > > Still not compatible with lambdas, but compatible automatically > with > > > all > > > > > anonymous classes. > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov < > [email protected]> > > > > > wrote: > > > > > > > > > > > Hi Luke, > > > > > > > > > > > > I know this (annotations) is the pattern we were considering for > > side > > > > > > inputs, but I no longer think it is the best way to access them. > > > > > > Annotations help getting rid of the .withSideInputs() call, but > > this > > > is > > > > > > where their advantage ends. > > > > > > > > > > > > The advantages of the proposed approach are that it automatically > > > works > > > > > > with all existing callback or lambda code. No need to further > > develop > > > > the > > > > > > reflection machinery to support side input annotations - and > > > especially > > > > > to > > > > > > support arbitrary user interfaces, no need to change existing > > > > transforms, > > > > > > no need for transform authors to even know that the machinery > > exists > > > to > > > > > > make side inputs usable in their transforms (and no need for > > authors > > > to > > > > > > think about whether or not they should support side inputs). > > > > > > > > > > > > Moreover, like Reuven says, annotations don't work with lambdas > at > > > all: > > > > > > creating a lambda with a flexible set of annotation arguments > > appears > > > > to > > > > > be > > > > > > currently impossible, and even capturing the annotations on > > arguments > > > > of > > > > > a > > > > > > lambda is I believe also impossible because the Java compiler > drops > > > > them > > > > > in > > > > > > the generated class or method handle. > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik > > <[email protected] > > > > > > > > > > wrote: > > > > > > > > > > > >> I believe we should follow the pattern that state uses and add a > > > type > > > > > >> annotation to link the side input definition to its usage > > directly. > > > > This > > > > > >> would allow us to know that the side input was definitely being > > > > accessed > > > > > >> and perform validation during graph construction for any used > but > > > > > >> unspecified side inputs. > > > > > >> > > > > > >> Code snippet: > > > > > >> final PCollectionView<String> foo = pipeline.apply("fooName", > > > > > >> Create.of("foo")).apply(View.<String>asSingleton()); > > > > > >> PCollection<String> output = pipeline > > > > > >> .apply(Create.of(1, 2, 3)) > > > > > >> .apply(MapElements.via( > > > > > >> new SimpleFunction<Integer, String>() { > > > > > >> @Override > > > > > >> public String apply(Integer input, > @SideInput("fooName") > > > > > String > > > > > >> fooValue) { > > > > > >> return fooValue + " " + input; > > > > > >> } > > > > > >> }).withSideInputs(foo));* > > > > > >> > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov < > > > > > >> [email protected]> wrote: > > > > > >> > > > > > >> > Sure, here's how a modified (passing) MapElements unit test > > looks > > > > > like, > > > > > >> > with usage of side inputs: > > > > > >> > > > > > > >> > @Test > > > > > >> > @Category(NeedsRunner.class) > > > > > >> > public void testMapBasicWithSideInput() throws Exception { > > > > > >> > * final PCollectionView<String> foo =* > > > > > >> > * pipeline.apply("foo", > > > > > >> > Create.of("foo")).apply(View.<String>asSingleton());* > > > > > >> > PCollection<String> output = pipeline > > > > > >> > .apply(Create.of(1, 2, 3)) > > > > > >> > .apply(MapElements.via( > > > > > >> > new SimpleFunction<Integer, String>() { > > > > > >> > @Override > > > > > >> > public String apply(Integer input) { > > > > > >> > return* foo.get() *+ " " + input; > > > > > >> > } > > > > > >> > }) > > > > > >> > *.withSideInputs(foo));* > > > > > >> > > > > > > >> > PAssert.that(output).containsInAnyOrder("foo 1", "foo 2", > > > "foo > > > > > 3"); > > > > > >> > pipeline.run(); > > > > > >> > } > > > > > >> > > > > > > >> > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax > > > <[email protected] > > > > > > > > > > >> > wrote: > > > > > >> > > > > > > >> > > Can you provide a code snippet showing how this would look? > > > > > >> > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov < > > > > > >> > > [email protected]> wrote: > > > > > >> > > > > > > > >> > > > TL;DR Introduce method PCollectionView.get(), implemented > > as: > > > > get > > > > > >> > > > thread-local ProcessContext and do c.sideInput(this). As a > > > > result, > > > > > >> any > > > > > >> > > user > > > > > >> > > > lambdas such as MapElements can use side inputs. > > > > > >> > > > > > > > > >> > > > Quite a few transforms have user-code callbacks or > lambdas: > > > > ParDo > > > > > >> > (DoFn), > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations classes in > > > various > > > > > IOs, > > > > > >> > > > combine fns, the PollFn callback in Watch, etc. > > > > > >> > > > > > > > > >> > > > Of these, only DoFn and CombineFn have built-in support > for > > > side > > > > > >> > inputs; > > > > > >> > > > for DynamicDestinations it is plumbed explicitly; others > > don't > > > > > have > > > > > >> > > access > > > > > >> > > > (e.g. you can't access side inputs from > Map/FlatMapElements > > > > > because > > > > > >> > they > > > > > >> > > > don't have a ProcessContext or any context for that > matter). > > > > > >> > > > > > > > > >> > > > I think it's important to solve this, especially as Java 8 > > > > becomes > > > > > >> > > people's > > > > > >> > > > default choice: users will want to use side inputs in > > > > > >> > > Map/FlatMapElements. > > > > > >> > > > > > > > > >> > > > It also appears to be quite easy to implement: > > > > > >> > > > > > > > > >> > > > Runner part: > > > > > >> > > > - introduce a SideInputAccessor interface > > > > > >> > > > - make .get() on a PCollectionView get it from a > > thread-local > > > > > >> > > > SideInputAccessor > > > > > >> > > > - make runners set the thread-local SideInputAccessor any > > time > > > > the > > > > > >> > runner > > > > > >> > > > is evaluating something in a context where side inputs are > > > > > >> available, > > > > > >> > > e.g. > > > > > >> > > > a ProcessElement method, or applying a CombineFn - the set > > of > > > > such > > > > > >> > places > > > > > >> > > > will be quite small. I believe only runners (but not > > > transforms) > > > > > >> will > > > > > >> > > ever > > > > > >> > > > need to set this thread-local > > > > > >> > > > > > > > > >> > > > Transform part: > > > > > >> > > > - Transforms that take user-code lambdas and want to let > > them > > > > > access > > > > > >> > side > > > > > >> > > > inputs still will need to be configurable with a method > like > > > > > >> > > > .withSideInputs(view1, view2...) and will need to plumb > > those > > > > down > > > > > >> to > > > > > >> > the > > > > > >> > > > primitive DoFn's or CombineFn's - information on *which* > > side > > > > > inputs > > > > > >> > will > > > > > >> > > > be accessed, of course, still needs to be in the graph. > > > > > >> > > > > > > > > >> > > > I quickly prototyped this in direct runner and > MapElements, > > > and > > > > it > > > > > >> > worked > > > > > >> > > > with no issues - I'm wondering if there's something subtle > > > that > > > > > I'm > > > > > >> > > > missing, or is it actually a good idea? > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >
