Hi Luke,
I think my proposal is very similar in style to the one in A New DoFn
<https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8jsWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko>,
so I disagree that it is introducing a divergence from that discussion.
That code proposes:

class MyNewDoFn extends NewDoFn<Foo, Baz> {
  @ProcessElement
  public void process(
      ProcessContext context,
      @SideInput Supplier<Frob> frob,
      @SideInput Supplier<Bizzle> bizzle) {
    … frob.get() … bizzle.get() …
  }
}

PCollection<Foo> input = …

(and it still proposes .withSideInputs() for wiring)

The similarities are:
- Using @SideInput to declare that something is a side input (this code
proposes that on a parameter, my code proposes it on a field, because
fields allow retaining a compile-time fixed signature for interface methods
- less of a concern for DoFn, but big concern for other user callbacks,
which are not considered in that proposal)
- Accessing via .get() - the code proposes using a Supplier, but I don't
think that's materially different from calling .get() on the
PCollectionView directly.

The differences are:
- The proposal in the doc allows wiring different side inputs to the same
Supplier, but I'm not convinced that this is important - you can just as
easily call the constructor of your DoFn passing different
PCollectionView's for it to capture.
- My proposal allows getting rid of .withSideInputs() entirely, because the
DoFn captures the PCollectionView so you don't need to specify it
explicitly for wiring.

On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik <lc...@google.com.invalid> wrote:

> My concern with the proposal is not the specifics of how it will work and
> more about it being yet another way on how our API is to be used even
> though we have a proposal [1] of an API style we were working towards in
> Java and Python. I would rather re-open that discussion now about what we
> want that API to look like for our major features and work towards
> consistency (or not if there is a strong argument as to why some feature
> should have a different style).
>
> 1: https://s.apache.org/a-new-dofn
>
> On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > +0.75 because I'd like to bring up invalid pipelines.
> >
> > I had proposed side inputs as parameters to DoFn in
> > https://s.apache.org/a-new-dofn (specifically at [1]) so the only place
> > they are specified is in the graph construction, making the DoFn more
> > reusable and errors impossible. I've actually been noodling my way
> towards
> > this in a branch :-)
> >
> > Eugene's proposal is a sort of converse, where the side inputs are values
> > captured in the closure and not parameters, yet the only place they are
> > specified is in the DoFn.
> >
> > I see no conflict between these two. It is very natural to have both the
> > capability to accept parameters and the ability to capture variables in
> the
> > closure. Supporting both is totally standard in up-to-date programming
> > languages.
> >
> > Today we have the worse of both worlds: PCollectionView behaves as
> > something captured in the closure/constructor, but must still be
> explicitly
> > wired up.
> >
> > But if I use PCollectionView.get() and have not wired it up in any way,
> > what happens? Just like today, you can try to .sideInput(...) a thing
> that
> > is not available. With side inputs as parameters, this is not possible.
> If
> > you want to treat them as captured in a closure, while avoiding errors,
> it
> > seems like you might need to do some low-level magic, like the
> > serialization-based detection that Luke has suggested before (there are
> > known downsides that we haven't explored, like overcapture).
> >
> > Kenn
> >
> > [1]
> > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j
> > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko
> >
> >
> > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > > Hm, I guess you're right - for outputs it could be indeed quite
> valuable
> > to
> > > output to them without plumbing (e.g. outputting errors). Could be done
> > > perhaps via TupleTag.output()? (assuming the same TupleTag can not be
> > > reused to tag multiple PCollection's)
> > >
> > > For now I sent a PR for side input support
> > > https://github.com/apache/beam/pull/3814 .
> > >
> > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik <lc...@google.com.invalid>
> > > wrote:
> > >
> > > > I disagree, state may not care where it is used as well since a
> person
> > > may
> > > > call a function which needs to store/retrieve state and instead of
> > having
> > > > the DoFn declare the StateSpec and then pass in the state
> > implementation
> > > > down into the function everywhere. Similarly for outputs, the
> internal
> > > > functions could take the TupleTag and request an output manager or
> take
> > > an
> > > > "output" reference which give functions the ability to produce output
> > > > directly without needing to pass everything that is needed to be
> output
> > > > back to the caller.
> > > >
> > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov <
> > > > kirpic...@google.com.invalid> wrote:
> > > >
> > > > > Hm, I think of these things (state, side outputs etc.), only side
> > > inputs
> > > > > make sense to access in arbitrary user callbacks without explicit
> > > > knowledge
> > > > > of the surrounding transform - so only side inputs can be implicit
> > like
> > > > > this.
> > > > >
> > > > > Ultimately we'll probably end up removing ProcessContext, and
> keeping
> > > > only
> > > > > annotations (on fields / methods / parameters). In that world, a
> > field
> > > > > annotation could be used (like per my previous email) to statically
> > > > specify
> > > > > which side inputs will be needed - while the value could still be
> > > > accessed
> > > > > via .get(), just like state cells are accessed via .read() and
> > > .write():
> > > > > i.e., #get() is not a new method of access.
> > > > >
> > > > > Overall, it seems like I should proceed with the idea. I filed
> > > > > https://issues.apache.org/jira/browse/BEAM-2844.
> > > > >
> > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik
> <lc...@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > > > For API consistency reasons, it would be good if we did this
> > > > holistically
> > > > > > and expanded this approach to state, side outputs, ... so that a
> > > person
> > > > > can
> > > > > > always call Something.get() to return something that they can
> > access
> > > > > > implementation wise. It will be confusing for our users to have
> > many
> > > > > > variations in our style of how all these concepts are used
> > > > > (ProcessContext
> > > > > > / Annotations / #get())
> > > > > >
> > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov <
> > > > > > kirpic...@google.com.invalid> wrote:
> > > > > >
> > > > > > > Also, I think my approach is compatible with annotations and
> > future
> > > > > > removal
> > > > > > > of .withSideInputs if we annotate a field:
> > > > > > > final PCollectionView<Foo> foo = ...;
> > > > > > >
> > > > > > > class MyDoFn {
> > > > > > >   @SideInput
> > > > > > >   PCollectionView<Foo> foo = foo;
> > > > > > >
> > > > > > >   ...foo.get()...
> > > > > > > }
> > > > > > >
> > > > > > > We can extract the accessed views from the DoFn instance using
> > > > > > reflection.
> > > > > > > Still not compatible with lambdas, but compatible automatically
> > > with
> > > > > all
> > > > > > > anonymous classes.
> > > > > > >
> > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov <
> > > kirpic...@google.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Luke,
> > > > > > > >
> > > > > > > > I know this (annotations) is the pattern we were considering
> > for
> > > > side
> > > > > > > > inputs, but I no longer think it is the best way to access
> > them.
> > > > > > > > Annotations help getting rid of the .withSideInputs() call,
> but
> > > > this
> > > > > is
> > > > > > > > where their advantage ends.
> > > > > > > >
> > > > > > > > The advantages of the proposed approach are that it
> > automatically
> > > > > works
> > > > > > > > with all existing callback or lambda code. No need to further
> > > > develop
> > > > > > the
> > > > > > > > reflection machinery to support side input annotations - and
> > > > > especially
> > > > > > > to
> > > > > > > > support arbitrary user interfaces, no need to change existing
> > > > > > transforms,
> > > > > > > > no need for transform authors to even know that the machinery
> > > > exists
> > > > > to
> > > > > > > > make side inputs usable in their transforms (and no need for
> > > > authors
> > > > > to
> > > > > > > > think about whether or not they should support side inputs).
> > > > > > > >
> > > > > > > > Moreover, like Reuven says, annotations don't work with
> lambdas
> > > at
> > > > > all:
> > > > > > > > creating a lambda with a flexible set of annotation arguments
> > > > appears
> > > > > > to
> > > > > > > be
> > > > > > > > currently impossible, and even capturing the annotations on
> > > > arguments
> > > > > > of
> > > > > > > a
> > > > > > > > lambda is I believe also impossible because the Java compiler
> > > drops
> > > > > > them
> > > > > > > in
> > > > > > > > the generated class or method handle.
> > > > > > > >
> > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik
> > > > <lc...@google.com.invalid
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> I believe we should follow the pattern that state uses and
> > add a
> > > > > type
> > > > > > > >> annotation to link the side input definition to its usage
> > > > directly.
> > > > > > This
> > > > > > > >> would allow us to know that the side input was definitely
> > being
> > > > > > accessed
> > > > > > > >> and perform validation during graph construction for any
> used
> > > but
> > > > > > > >> unspecified side inputs.
> > > > > > > >>
> > > > > > > >> Code snippet:
> > > > > > > >> final PCollectionView<String> foo =
> pipeline.apply("fooName",
> > > > > > > >> Create.of("foo")).apply(View.<String>asSingleton());
> > > > > > > >> PCollection<String> output = pipeline
> > > > > > > >>     .apply(Create.of(1, 2, 3))
> > > > > > > >>     .apply(MapElements.via(
> > > > > > > >>         new SimpleFunction<Integer, String>() {
> > > > > > > >>           @Override
> > > > > > > >>           public String apply(Integer input,
> > > @SideInput("fooName")
> > > > > > > String
> > > > > > > >> fooValue) {
> > > > > > > >>             return fooValue + " " + input;
> > > > > > > >>           }
> > > > > > > >>         }).withSideInputs(foo));*
> > > > > > > >>
> > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene Kirpichov <
> > > > > > > >> kirpic...@google.com.invalid> wrote:
> > > > > > > >>
> > > > > > > >> > Sure, here's how a modified (passing) MapElements unit
> test
> > > > looks
> > > > > > > like,
> > > > > > > >> > with usage of side inputs:
> > > > > > > >> >
> > > > > > > >> >   @Test
> > > > > > > >> >   @Category(NeedsRunner.class)
> > > > > > > >> >   public void testMapBasicWithSideInput() throws
> Exception {
> > > > > > > >> >    * final PCollectionView<String> foo =*
> > > > > > > >> > *        pipeline.apply("foo",
> > > > > > > >> > Create.of("foo")).apply(View.<String>asSingleton());*
> > > > > > > >> >     PCollection<String> output = pipeline
> > > > > > > >> >         .apply(Create.of(1, 2, 3))
> > > > > > > >> >         .apply(MapElements.via(
> > > > > > > >> >             new SimpleFunction<Integer, String>() {
> > > > > > > >> >               @Override
> > > > > > > >> >               public String apply(Integer input) {
> > > > > > > >> >                 return* foo.get() *+ " " + input;
> > > > > > > >> >               }
> > > > > > > >> >             })
> > > > > > > >> >         *.withSideInputs(foo));*
> > > > > > > >> >
> > > > > > > >> >     PAssert.that(output).containsInAnyOrder("foo 1", "foo
> > 2",
> > > > > "foo
> > > > > > > 3");
> > > > > > > >> >     pipeline.run();
> > > > > > > >> >   }
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax
> > > > > <re...@google.com.invalid
> > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Can you provide a code snippet showing how this would
> > look?
> > > > > > > >> > >
> > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene Kirpichov <
> > > > > > > >> > > kirpic...@google.com.invalid> wrote:
> > > > > > > >> > >
> > > > > > > >> > > > TL;DR Introduce method PCollectionView.get(),
> > implemented
> > > > as:
> > > > > > get
> > > > > > > >> > > > thread-local ProcessContext and do c.sideInput(this).
> > As a
> > > > > > result,
> > > > > > > >> any
> > > > > > > >> > > user
> > > > > > > >> > > > lambdas such as MapElements can use side inputs.
> > > > > > > >> > > >
> > > > > > > >> > > > Quite a few transforms have user-code callbacks or
> > > lambdas:
> > > > > > ParDo
> > > > > > > >> > (DoFn),
> > > > > > > >> > > > Map/FlatMapElements, the DynamicDestinations classes
> in
> > > > > various
> > > > > > > IOs,
> > > > > > > >> > > > combine fns, the PollFn callback in Watch, etc.
> > > > > > > >> > > >
> > > > > > > >> > > > Of these, only DoFn and CombineFn have built-in
> support
> > > for
> > > > > side
> > > > > > > >> > inputs;
> > > > > > > >> > > > for DynamicDestinations it is plumbed explicitly;
> others
> > > > don't
> > > > > > > have
> > > > > > > >> > > access
> > > > > > > >> > > > (e.g. you can't access side inputs from
> > > Map/FlatMapElements
> > > > > > > because
> > > > > > > >> > they
> > > > > > > >> > > > don't have a ProcessContext or any context for that
> > > matter).
> > > > > > > >> > > >
> > > > > > > >> > > > I think it's important to solve this, especially as
> > Java 8
> > > > > > becomes
> > > > > > > >> > > people's
> > > > > > > >> > > > default choice: users will want to use side inputs in
> > > > > > > >> > > Map/FlatMapElements.
> > > > > > > >> > > >
> > > > > > > >> > > > It also appears to be quite easy to implement:
> > > > > > > >> > > >
> > > > > > > >> > > > Runner part:
> > > > > > > >> > > > - introduce a SideInputAccessor interface
> > > > > > > >> > > > - make .get() on a PCollectionView get it from a
> > > > thread-local
> > > > > > > >> > > > SideInputAccessor
> > > > > > > >> > > > - make runners set the thread-local SideInputAccessor
> > any
> > > > time
> > > > > > the
> > > > > > > >> > runner
> > > > > > > >> > > > is evaluating something in a context where side inputs
> > are
> > > > > > > >> available,
> > > > > > > >> > > e.g.
> > > > > > > >> > > > a ProcessElement method, or applying a CombineFn - the
> > set
> > > > of
> > > > > > such
> > > > > > > >> > places
> > > > > > > >> > > > will be quite small. I believe only runners (but not
> > > > > transforms)
> > > > > > > >> will
> > > > > > > >> > > ever
> > > > > > > >> > > > need to set this thread-local
> > > > > > > >> > > >
> > > > > > > >> > > > Transform part:
> > > > > > > >> > > > - Transforms that take user-code lambdas and want to
> let
> > > > them
> > > > > > > access
> > > > > > > >> > side
> > > > > > > >> > > > inputs still will need to be configurable with a
> method
> > > like
> > > > > > > >> > > > .withSideInputs(view1, view2...) and will need to
> plumb
> > > > those
> > > > > > down
> > > > > > > >> to
> > > > > > > >> > the
> > > > > > > >> > > > primitive DoFn's or CombineFn's - information on
> *which*
> > > > side
> > > > > > > inputs
> > > > > > > >> > will
> > > > > > > >> > > > be accessed, of course, still needs to be in the
> graph.
> > > > > > > >> > > >
> > > > > > > >> > > > I quickly prototyped this in direct runner and
> > > MapElements,
> > > > > and
> > > > > > it
> > > > > > > >> > worked
> > > > > > > >> > > > with no issues - I'm wondering if there's something
> > subtle
> > > > > that
> > > > > > > I'm
> > > > > > > >> > > > missing, or is it actually a good idea?
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to