On Wed, Sep 13, 2017 at 1:17 PM, Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:
> Hi Robert,
>
> Given the anticipated usage of this proposal in Java, I'm not sure the
> Python approach you quoted is the right one.

Perhaps not, but does that mean it would be a Java-ism only or would
we implement it in Python despite it being worse there?

> The main reason: I see how it works with Map/FlatMap, but what about cases
> like FileIO.write(), parameterized by several lambdas (element ->
> destination, destination -> filename policy, destination -> sink), where
> different lambdas may want to access different side inputs? It feels
> excessive to make each of the lambdas take all of the side inputs in the
> same order; moreover, if the composite transform internally needs to pass
> some more side inputs to the DoFn's executing these lambdas, it will need
> to manipulate the argument lists in nontrivial ways to make sure it passes
> them only the side inputs the user asked for, and in the proper order.

In Python it would be trivial to "slice" the side input arguments
across the lambdas in a natural way, but I can see that this would be
more of a pain in Java, especially as lambdas are unnecessarily
crippled during compilation.

> Another reason is, I think with Java's type system it's impossible to have
> a NewDoFn-style API for lambdas, because annotations on lambda arguments
> are dropped when the lambda is converted to the respective single-method
> interface - a lambda is subject to a lot more type erasure than anonymous
> class.

Yeah, this is unfortunate. But, as mentioned, side inputs don't need
to be annotated, just counted. For something like inspecting the
window the NewDoFn has a lot of advantages over implicit access (and
makes it so you can't "forget" to declare your dependency), but I do
see advantages for the implicit way of doing things for delegating to
other callables.

On the other hand, there is a bit of precedence for this: metrics have
the "implicit" api. If we do go this direction for side inputs, we
should also consider it for state and side outputs.

> On Wed, Sep 13, 2017 at 1:03 PM Robert Bradshaw <rober...@google.com.invalid>
> wrote:
>
>> +1 to reducing the amount of boilerplate for dealing with side inputs.
>>
>> I prefer the "NewDoFn" style of side inputs for consistency. The
>> primary drawback seems to be lambda's incompatibility with
>> annotations. This is solved in Python by letting all the first
>> annotated argument of the process method be the main input, and
>> subsequent ones be the side input. For example
>>
>> main_pcoll | beam.Map(
>>     lambda main_input_elem, side_input_value: main_input_elem +
>> side_input_value,
>>     side_input_pvalue)
>>
>> For multiple side inputs they are mapped positionally (though Python
>> has the advantage that arguments can be passed by keyword as well to
>> enhance readability when there are many of them, and we allow that for
>> side inputs). Note that side_input_pvalue is not referenced anywhere
>> else, so we don't even have to store it and pass it around (one
>> typically writes pvalue.AsList(some_pcoll) inline here). When the
>> concrete PCollectionView is used to access the value this means that
>> it must be passed separately to both the ParDo and the callback
>> (unless we can infer it, which I don't think we can do in all (many?)
>> cases).
>>
>> There's no reason we couldn't do this, or something very similar, in
>> Java as well.
>>
>> On Wed, Sep 13, 2017 at 10:55 AM, Reuven Lax <re...@google.com.invalid>
>> wrote:
>> > On Wed, Sep 13, 2017 at 10:05 AM, Eugene Kirpichov <
>> > kirpic...@google.com.invalid> wrote:
>> >
>> >> Hi,
>> >>
>> >> I agree with these concerns to an extent, however I think the advantage
>> of
>> >> transparently letting any user code access side inputs, especially
>> >> including lambdas, is so great that we should find a way to address
>> these
>> >> concerns within the constraints of the pattern I'm proposing. See more
>> >> below.
>> >>
>> >> On Wed, Sep 13, 2017 at 9:29 AM Ben Chambers
>> <bchamb...@google.com.invalid
>> >> >
>> >> wrote:
>> >>
>> >> > One possible issue with this is that updating a thread local is
>> likely to
>> >> > be much more expensive than passing an additional argument.
>> >>
>> >> This is an implementation detail that can be fixed - Luke made a
>> suggestion
>> >> on the PR to set up the side input context once per bundle rather than
>> once
>> >> per element.
>> >>
>> >
>> > However remember that bundles might be small. Dataflow streaming runner
>> > creates small bundles by design. The Flink runner creates single-element
>> > bundles.
>> >
>> >
>> >>
>> >>
>> >> > Also, not all
>> >> > code called from within the DoFn will necessarily be in the same
>> thread
>> >> > (eg., sometimes we create a pool of threads for doing work).
>> >>
>> >> I think we already require that c.output() can not be done from multiple
>> >> threads; and I don't think we document c.sideInput() to be thread-safe
>> - it
>> >> may be reasonable to declare that it isn't and has to be accessed from
>> the
>> >> same thread as the ProcessElement call. If we want to relax this, then
>> >> there might be ways to deal with that too, e.g. provide utilities for
>> the
>> >> user to capture the "user code context" and restoring it inside a
>> thread.
>> >> This would likely be valuable for other purposes, such as making those
>> >> extra threads visible to our profiling utilities.
>> >>
>> >
>> > This seems fair, but we should be be very careful about our
>> documentation.
>> > And +1 to adding utilities to make multi-threaded work easier to manage.
>> >
>> >>
>> >>
>> >> > It may be
>> >> > *more* confusing for this to sometimes work magically and sometimes
>> fail
>> >> > horribly. Also, requiring the PCollectionView to be passed to user
>> code
>> >> > that accesses it is nice because it makes *very clear* that the side
>> >> input
>> >> > needs to be provided from the DoFn to that particular utility. If it
>> is
>> >> > accessed via "spooky action at a distance" we lose that piece of
>> "free"
>> >> > documentation, which may lead to extensive misuse of these utility
>> >> methods.
>> >> >
>> >> I'd like to understand this concern better - from this description it's
>> not
>> >> clear to me. The pattern I'm proposing is that, when you're authoring a
>> >> PTransform that is configured by any user callbacks, then:
>> >> - you should provide a builder method .withSideInputs(...)
>> >> - you should propagate those side inputs to all your internal DoFn's
>> that
>> >> invoke the user code
>> >> - in return the user callbacks will be allowed to access those
>> particular
>> >> side inputs
>> >> This seems like a simple enough model to me to understand, both from a
>> >> user's perspective and from a transform author's perspective. Steps 1
>> and 2
>> >> may eventually be automated by annotation analysis or other means (e.g.
>> SDK
>> >> giving a way to provide given side inputs automatically to everything
>> >> inside a composite transform rather than to individual DoFn's).
>> >>
>> >>
>> >> >
>> >> > On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
>> >> > <kirpic...@google.com.invalid> wrote:
>> >> >
>> >> > > Hi,
>> >> > >
>> >> > > On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles
>> <k...@google.com.invalid
>> >> >
>> >> > > wrote:
>> >> > >
>> >> > > > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
>> >> > > > kirpic...@google.com.invalid> wrote:
>> >> > > >
>> >> > > > >
>> >> > > > > The differences are:
>> >> > > > > - The proposal in the doc allows wiring different side inputs to
>> >> the
>> >> > > same
>> >> > > > > Supplier, but I'm not convinced that this is important - you can
>> >> just
>> >> > > as
>> >> > > > > easily call the constructor of your DoFn passing different
>> >> > > > > PCollectionView's for it to capture.
>> >> > > > >
>> >> > > >
>> >> > > > I disagree with this bit about it being "just as easy". Passing
>> the
>> >> > > needed
>> >> > > > PCollectionViews to your constructor (or even having a
>> constructor)
>> >> is
>> >> > a
>> >> > > > pain. Every time I have to do it, it adds a ton of boilerplate
>> that
>> >> > feels
>> >> > > > like pure noise. To make a DoFn reusable it must be made into a
>> named
>> >> > > class
>> >> > > > with a constructor, versus inlined with no constructor.
>> >> > >
>> >> > > Hm, why? You can have the DoFn be an anonymous class capturing the
>> >> > > PCollectionView into a @SideInput field as a closure.
>> >> > >
>> >> > >
>> >> > > > A generous analogy
>> >> > > > is is that it is "just" manual closure conversion/currying,
>> changing
>> >> > > > f(side, main) to f(side)(main). But in practice in Beam the second
>> >> one
>> >> > > has
>> >> > > > much more boilerplate.
>> >> > > >
>> >> > > > Also, Beam is worse. We present the user with higher-order
>> functions,
>> >> > > which
>> >> > > > is where the actual annoyance comes in. When you want to pardo(f)
>> you
>> >> > > have
>> >> > > > to write pardo(f(side))(side, main). Your proposal is to support
>> >> > > > pardo(f(side))(main) and mine is to support pardo(f)(side, main).
>> I
>> >> > still
>> >> > > > propose that we support both (as they get implemented). If you
>> buy in
>> >> > to
>> >> > > my
>> >> > > > analogy, then there's decades of precedent and the burden of proof
>> >> > falls
>> >> > > > heavily on whoever doesn't want to support both.
>> >> > > >
>> >> > > I see your point. I think the proposal is compatible with what
>> you're
>> >> > > suggesting too - in DoFn we could have @SideInput *parameters* of
>> type
>> >> > > PCollectionView, with the same semantics as a field.
>> >> > >
>> >> > >
>> >> > > >
>> >> > > > - My proposal allows getting rid of .withSideInputs() entirely,
>> >> because
>> >> > > the
>> >> > > > > DoFn captures the PCollectionView so you don't need to specify
>> it
>> >> > > > > explicitly for wiring.
>> >> > > > >
>> >> > > >
>> >> > > > I've decided to change to full +1 (whatever that means compared to
>> >> 0.75
>> >> > > :-)
>> >> > > > to adding support for @SideInput fields, because the benefits
>> >> outweigh
>> >> > > this
>> >> > > > failure mode:
>> >> > > >
>> >> > > > new DoFn {
>> >> > > >   // forgot the annotation
>> >> > > >   private final PCollectionView whatever;
>> >> > > >
>> >> > > >   @ProcessElement public void process(...) {
>> >> > > >     whatever.get(); // crash during execution
>> >> > > >   }
>> >> > > > }
>> >> > > >
>> >> > > > But ideas to mitigate that would be cool.
>> >> > >
>> >> > > Hm, can't think of anything less hacky than "prohibit having fields
>> of
>> >> > type
>> >> > > PCollectionView that are not public, final, and annotated with
>> >> > @SideInput"
>> >> > > - not sure we'd want to go down this road. I suppose a good error
>> >> message
>> >> > > in .get() would be sufficient, saying "Did you forget to specify a
>> >> > > requirement for this side input via .withSideInputs() or by
>> annotating
>> >> > the
>> >> > > field as @SideInput" or something like that.
>> >> > >
>> >> > > >
>> >> > >
>> >> > >
>> >> > > > Kenn
>> >> > > >
>> >> > > >
>> >> > > > >
>> >> > > > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik
>> >> <lc...@google.com.invalid
>> >> > >
>> >> > > > > wrote:
>> >> > > > >
>> >> > > > > > My concern with the proposal is not the specifics of how it
>> will
>> >> > work
>> >> > > > and
>> >> > > > > > more about it being yet another way on how our API is to be
>> used
>> >> > even
>> >> > > > > > though we have a proposal [1] of an API style we were working
>> >> > towards
>> >> > > > in
>> >> > > > > > Java and Python. I would rather re-open that discussion now
>> about
>> >> > > what
>> >> > > > we
>> >> > > > > > want that API to look like for our major features and work
>> >> towards
>> >> > > > > > consistency (or not if there is a strong argument as to why
>> some
>> >> > > > feature
>> >> > > > > > should have a different style).
>> >> > > > > >
>> >> > > > > > 1: https://s.apache.org/a-new-dofn
>> >> > > > > >
>> >> > > > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles
>> >> > > > <k...@google.com.invalid
>> >> > > > > >
>> >> > > > > > wrote:
>> >> > > > > >
>> >> > > > > > > +0.75 because I'd like to bring up invalid pipelines.
>> >> > > > > > >
>> >> > > > > > > I had proposed side inputs as parameters to DoFn in
>> >> > > > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so
>> the
>> >> > only
>> >> > > > > place
>> >> > > > > > > they are specified is in the graph construction, making the
>> >> DoFn
>> >> > > more
>> >> > > > > > > reusable and errors impossible. I've actually been noodling
>> my
>> >> > way
>> >> > > > > > towards
>> >> > > > > > > this in a branch :-)
>> >> > > > > > >
>> >> > > > > > > Eugene's proposal is a sort of converse, where the side
>> inputs
>> >> > are
>> >> > > > > values
>> >> > > > > > > captured in the closure and not parameters, yet the only
>> place
>> >> > they
>> >> > > > are
>> >> > > > > > > specified is in the DoFn.
>> >> > > > > > >
>> >> > > > > > > I see no conflict between these two. It is very natural to
>> have
>> >> > > both
>> >> > > > > the
>> >> > > > > > > capability to accept parameters and the ability to capture
>> >> > > variables
>> >> > > > in
>> >> > > > > > the
>> >> > > > > > > closure. Supporting both is totally standard in up-to-date
>> >> > > > programming
>> >> > > > > > > languages.
>> >> > > > > > >
>> >> > > > > > > Today we have the worse of both worlds: PCollectionView
>> behaves
>> >> > as
>> >> > > > > > > something captured in the closure/constructor, but must
>> still
>> >> be
>> >> > > > > > explicitly
>> >> > > > > > > wired up.
>> >> > > > > > >
>> >> > > > > > > But if I use PCollectionView.get() and have not wired it up
>> in
>> >> > any
>> >> > > > way,
>> >> > > > > > > what happens? Just like today, you can try to
>> .sideInput(...) a
>> >> > > thing
>> >> > > > > > that
>> >> > > > > > > is not available. With side inputs as parameters, this is
>> not
>> >> > > > possible.
>> >> > > > > > If
>> >> > > > > > > you want to treat them as captured in a closure, while
>> avoiding
>> >> > > > errors,
>> >> > > > > > it
>> >> > > > > > > seems like you might need to do some low-level magic, like
>> the
>> >> > > > > > > serialization-based detection that Luke has suggested before
>> >> > (there
>> >> > > > are
>> >> > > > > > > known downsides that we haven't explored, like overcapture).
>> >> > > > > > >
>> >> > > > > > > Kenn
>> >> > > > > > >
>> >> > > > > > > [1]
>> >> > > > > > >
>> >> > https://docs.google.com/document/d/1ClmQ6LqdnfseRzeSw3SL68DAO1f8j
>> >> > > > > > > sWBL2FfzWErlbw/edit#heading=h.1budnm7l01ko
>> >> > > > > > >
>> >> > > > > > >
>> >> > > > > > > On Tue, Sep 5, 2017 at 11:24 PM, Eugene Kirpichov <
>> >> > > > > > > kirpic...@google.com.invalid> wrote:
>> >> > > > > > >
>> >> > > > > > > > Hm, I guess you're right - for outputs it could be indeed
>> >> quite
>> >> > > > > > valuable
>> >> > > > > > > to
>> >> > > > > > > > output to them without plumbing (e.g. outputting errors).
>> >> Could
>> >> > > be
>> >> > > > > done
>> >> > > > > > > > perhaps via TupleTag.output()? (assuming the same TupleTag
>> >> can
>> >> > > not
>> >> > > > be
>> >> > > > > > > > reused to tag multiple PCollection's)
>> >> > > > > > > >
>> >> > > > > > > > For now I sent a PR for side input support
>> >> > > > > > > > https://github.com/apache/beam/pull/3814 .
>> >> > > > > > > >
>> >> > > > > > > > On Tue, Sep 5, 2017 at 9:52 PM Lukasz Cwik
>> >> > > > <lc...@google.com.invalid
>> >> > > > > >
>> >> > > > > > > > wrote:
>> >> > > > > > > >
>> >> > > > > > > > > I disagree, state may not care where it is used as well
>> >> > since a
>> >> > > > > > person
>> >> > > > > > > > may
>> >> > > > > > > > > call a function which needs to store/retrieve state and
>> >> > instead
>> >> > > > of
>> >> > > > > > > having
>> >> > > > > > > > > the DoFn declare the StateSpec and then pass in the
>> state
>> >> > > > > > > implementation
>> >> > > > > > > > > down into the function everywhere. Similarly for
>> outputs,
>> >> the
>> >> > > > > > internal
>> >> > > > > > > > > functions could take the TupleTag and request an output
>> >> > manager
>> >> > > > or
>> >> > > > > > take
>> >> > > > > > > > an
>> >> > > > > > > > > "output" reference which give functions the ability to
>> >> > produce
>> >> > > > > output
>> >> > > > > > > > > directly without needing to pass everything that is
>> needed
>> >> to
>> >> > > be
>> >> > > > > > output
>> >> > > > > > > > > back to the caller.
>> >> > > > > > > > >
>> >> > > > > > > > > On Tue, Sep 5, 2017 at 9:23 PM, Eugene Kirpichov <
>> >> > > > > > > > > kirpic...@google.com.invalid> wrote:
>> >> > > > > > > > >
>> >> > > > > > > > > > Hm, I think of these things (state, side outputs
>> etc.),
>> >> > only
>> >> > > > side
>> >> > > > > > > > inputs
>> >> > > > > > > > > > make sense to access in arbitrary user callbacks
>> without
>> >> > > > explicit
>> >> > > > > > > > > knowledge
>> >> > > > > > > > > > of the surrounding transform - so only side inputs
>> can be
>> >> > > > > implicit
>> >> > > > > > > like
>> >> > > > > > > > > > this.
>> >> > > > > > > > > >
>> >> > > > > > > > > > Ultimately we'll probably end up removing
>> ProcessContext,
>> >> > and
>> >> > > > > > keeping
>> >> > > > > > > > > only
>> >> > > > > > > > > > annotations (on fields / methods / parameters). In
>> that
>> >> > > world,
>> >> > > > a
>> >> > > > > > > field
>> >> > > > > > > > > > annotation could be used (like per my previous email)
>> to
>> >> > > > > statically
>> >> > > > > > > > > specify
>> >> > > > > > > > > > which side inputs will be needed - while the value
>> could
>> >> > > still
>> >> > > > be
>> >> > > > > > > > > accessed
>> >> > > > > > > > > > via .get(), just like state cells are accessed via
>> >> .read()
>> >> > > and
>> >> > > > > > > > .write():
>> >> > > > > > > > > > i.e., #get() is not a new method of access.
>> >> > > > > > > > > >
>> >> > > > > > > > > > Overall, it seems like I should proceed with the
>> idea. I
>> >> > > filed
>> >> > > > > > > > > > https://issues.apache.org/jira/browse/BEAM-2844.
>> >> > > > > > > > > >
>> >> > > > > > > > > > On Tue, Sep 5, 2017 at 9:08 PM Lukasz Cwik
>> >> > > > > > <lc...@google.com.invalid
>> >> > > > > > > >
>> >> > > > > > > > > > wrote:
>> >> > > > > > > > > >
>> >> > > > > > > > > > > For API consistency reasons, it would be good if we
>> did
>> >> > > this
>> >> > > > > > > > > holistically
>> >> > > > > > > > > > > and expanded this approach to state, side outputs,
>> ...
>> >> so
>> >> > > > that
>> >> > > > > a
>> >> > > > > > > > person
>> >> > > > > > > > > > can
>> >> > > > > > > > > > > always call Something.get() to return something that
>> >> they
>> >> > > can
>> >> > > > > > > access
>> >> > > > > > > > > > > implementation wise. It will be confusing for our
>> users
>> >> > to
>> >> > > > have
>> >> > > > > > > many
>> >> > > > > > > > > > > variations in our style of how all these concepts
>> are
>> >> > used
>> >> > > > > > > > > > (ProcessContext
>> >> > > > > > > > > > > / Annotations / #get())
>> >> > > > > > > > > > >
>> >> > > > > > > > > > > On Tue, Sep 5, 2017 at 8:08 AM, Eugene Kirpichov <
>> >> > > > > > > > > > > kirpic...@google.com.invalid> wrote:
>> >> > > > > > > > > > >
>> >> > > > > > > > > > > > Also, I think my approach is compatible with
>> >> > annotations
>> >> > > > and
>> >> > > > > > > future
>> >> > > > > > > > > > > removal
>> >> > > > > > > > > > > > of .withSideInputs if we annotate a field:
>> >> > > > > > > > > > > > final PCollectionView<Foo> foo = ...;
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > > > class MyDoFn {
>> >> > > > > > > > > > > >   @SideInput
>> >> > > > > > > > > > > >   PCollectionView<Foo> foo = foo;
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > > >   ...foo.get()...
>> >> > > > > > > > > > > > }
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > > > We can extract the accessed views from the DoFn
>> >> > instance
>> >> > > > > using
>> >> > > > > > > > > > > reflection.
>> >> > > > > > > > > > > > Still not compatible with lambdas, but compatible
>> >> > > > > automatically
>> >> > > > > > > > with
>> >> > > > > > > > > > all
>> >> > > > > > > > > > > > anonymous classes.
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > > > On Tue, Sep 5, 2017, 8:02 AM Eugene Kirpichov <
>> >> > > > > > > > kirpic...@google.com>
>> >> > > > > > > > > > > > wrote:
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > > > > Hi Luke,
>> >> > > > > > > > > > > > >
>> >> > > > > > > > > > > > > I know this (annotations) is the pattern we were
>> >> > > > > considering
>> >> > > > > > > for
>> >> > > > > > > > > side
>> >> > > > > > > > > > > > > inputs, but I no longer think it is the best
>> way to
>> >> > > > access
>> >> > > > > > > them.
>> >> > > > > > > > > > > > > Annotations help getting rid of the
>> >> .withSideInputs()
>> >> > > > call,
>> >> > > > > > but
>> >> > > > > > > > > this
>> >> > > > > > > > > > is
>> >> > > > > > > > > > > > > where their advantage ends.
>> >> > > > > > > > > > > > >
>> >> > > > > > > > > > > > > The advantages of the proposed approach are
>> that it
>> >> > > > > > > automatically
>> >> > > > > > > > > > works
>> >> > > > > > > > > > > > > with all existing callback or lambda code. No
>> need
>> >> to
>> >> > > > > further
>> >> > > > > > > > > develop
>> >> > > > > > > > > > > the
>> >> > > > > > > > > > > > > reflection machinery to support side input
>> >> > annotations
>> >> > > -
>> >> > > > > and
>> >> > > > > > > > > > especially
>> >> > > > > > > > > > > > to
>> >> > > > > > > > > > > > > support arbitrary user interfaces, no need to
>> >> change
>> >> > > > > existing
>> >> > > > > > > > > > > transforms,
>> >> > > > > > > > > > > > > no need for transform authors to even know that
>> the
>> >> > > > > machinery
>> >> > > > > > > > > exists
>> >> > > > > > > > > > to
>> >> > > > > > > > > > > > > make side inputs usable in their transforms
>> (and no
>> >> > > need
>> >> > > > > for
>> >> > > > > > > > > authors
>> >> > > > > > > > > > to
>> >> > > > > > > > > > > > > think about whether or not they should support
>> side
>> >> > > > > inputs).
>> >> > > > > > > > > > > > >
>> >> > > > > > > > > > > > > Moreover, like Reuven says, annotations don't
>> work
>> >> > with
>> >> > > > > > lambdas
>> >> > > > > > > > at
>> >> > > > > > > > > > all:
>> >> > > > > > > > > > > > > creating a lambda with a flexible set of
>> annotation
>> >> > > > > arguments
>> >> > > > > > > > > appears
>> >> > > > > > > > > > > to
>> >> > > > > > > > > > > > be
>> >> > > > > > > > > > > > > currently impossible, and even capturing the
>> >> > > annotations
>> >> > > > on
>> >> > > > > > > > > arguments
>> >> > > > > > > > > > > of
>> >> > > > > > > > > > > > a
>> >> > > > > > > > > > > > > lambda is I believe also impossible because the
>> >> Java
>> >> > > > > compiler
>> >> > > > > > > > drops
>> >> > > > > > > > > > > them
>> >> > > > > > > > > > > > in
>> >> > > > > > > > > > > > > the generated class or method handle.
>> >> > > > > > > > > > > > >
>> >> > > > > > > > > > > > > On Tue, Sep 5, 2017 at 6:57 AM Lukasz Cwik
>> >> > > > > > > > > <lc...@google.com.invalid
>> >> > > > > > > > > > >
>> >> > > > > > > > > > > > > wrote:
>> >> > > > > > > > > > > > >
>> >> > > > > > > > > > > > >> I believe we should follow the pattern that
>> state
>> >> > uses
>> >> > > > and
>> >> > > > > > > add a
>> >> > > > > > > > > > type
>> >> > > > > > > > > > > > >> annotation to link the side input definition to
>> >> its
>> >> > > > usage
>> >> > > > > > > > > directly.
>> >> > > > > > > > > > > This
>> >> > > > > > > > > > > > >> would allow us to know that the side input was
>> >> > > > definitely
>> >> > > > > > > being
>> >> > > > > > > > > > > accessed
>> >> > > > > > > > > > > > >> and perform validation during graph
>> construction
>> >> for
>> >> > > any
>> >> > > > > > used
>> >> > > > > > > > but
>> >> > > > > > > > > > > > >> unspecified side inputs.
>> >> > > > > > > > > > > > >>
>> >> > > > > > > > > > > > >> Code snippet:
>> >> > > > > > > > > > > > >> final PCollectionView<String> foo =
>> >> > > > > > pipeline.apply("fooName",
>> >> > > > > > > > > > > > >> Create.of("foo")).apply(View.<
>> >> String>asSingleton());
>> >> > > > > > > > > > > > >> PCollection<String> output = pipeline
>> >> > > > > > > > > > > > >>     .apply(Create.of(1, 2, 3))
>> >> > > > > > > > > > > > >>     .apply(MapElements.via(
>> >> > > > > > > > > > > > >>         new SimpleFunction<Integer, String>() {
>> >> > > > > > > > > > > > >>           @Override
>> >> > > > > > > > > > > > >>           public String apply(Integer input,
>> >> > > > > > > > @SideInput("fooName")
>> >> > > > > > > > > > > > String
>> >> > > > > > > > > > > > >> fooValue) {
>> >> > > > > > > > > > > > >>             return fooValue + " " + input;
>> >> > > > > > > > > > > > >>           }
>> >> > > > > > > > > > > > >>         }).withSideInputs(foo));*
>> >> > > > > > > > > > > > >>
>> >> > > > > > > > > > > > >> On Mon, Sep 4, 2017 at 6:22 PM, Eugene
>> Kirpichov <
>> >> > > > > > > > > > > > >> kirpic...@google.com.invalid> wrote:
>> >> > > > > > > > > > > > >>
>> >> > > > > > > > > > > > >> > Sure, here's how a modified (passing)
>> >> MapElements
>> >> > > unit
>> >> > > > > > test
>> >> > > > > > > > > looks
>> >> > > > > > > > > > > > like,
>> >> > > > > > > > > > > > >> > with usage of side inputs:
>> >> > > > > > > > > > > > >> >
>> >> > > > > > > > > > > > >> >   @Test
>> >> > > > > > > > > > > > >> >   @Category(NeedsRunner.class)
>> >> > > > > > > > > > > > >> >   public void testMapBasicWithSideInput()
>> throws
>> >> > > > > > Exception {
>> >> > > > > > > > > > > > >> >    * final PCollectionView<String> foo =*
>> >> > > > > > > > > > > > >> > *        pipeline.apply("foo",
>> >> > > > > > > > > > > > >> >
>> >> > > Create.of("foo")).apply(View.<String>asSingleton());*
>> >> > > > > > > > > > > > >> >     PCollection<String> output = pipeline
>> >> > > > > > > > > > > > >> >         .apply(Create.of(1, 2, 3))
>> >> > > > > > > > > > > > >> >         .apply(MapElements.via(
>> >> > > > > > > > > > > > >> >             new SimpleFunction<Integer,
>> >> String>()
>> >> > {
>> >> > > > > > > > > > > > >> >               @Override
>> >> > > > > > > > > > > > >> >               public String apply(Integer
>> >> input) {
>> >> > > > > > > > > > > > >> >                 return* foo.get() *+ " " +
>> >> input;
>> >> > > > > > > > > > > > >> >               }
>> >> > > > > > > > > > > > >> >             })
>> >> > > > > > > > > > > > >> >         *.withSideInputs(foo));*
>> >> > > > > > > > > > > > >> >
>> >> > > > > > > > > > > > >> >     PAssert.that(output).
>> >> containsInAnyOrder("foo
>> >> > 1",
>> >> > > > > "foo
>> >> > > > > > > 2",
>> >> > > > > > > > > > "foo
>> >> > > > > > > > > > > > 3");
>> >> > > > > > > > > > > > >> >     pipeline.run();
>> >> > > > > > > > > > > > >> >   }
>> >> > > > > > > > > > > > >> >
>> >> > > > > > > > > > > > >> >
>> >> > > > > > > > > > > > >> > On Mon, Sep 4, 2017 at 6:12 PM Reuven Lax
>> >> > > > > > > > > > <re...@google.com.invalid
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > > > >> > wrote:
>> >> > > > > > > > > > > > >> >
>> >> > > > > > > > > > > > >> > > Can you provide a code snippet showing how
>> >> this
>> >> > > > would
>> >> > > > > > > look?
>> >> > > > > > > > > > > > >> > >
>> >> > > > > > > > > > > > >> > > On Sun, Sep 3, 2017 at 6:49 PM, Eugene
>> >> > Kirpichov <
>> >> > > > > > > > > > > > >> > > kirpic...@google.com.invalid> wrote:
>> >> > > > > > > > > > > > >> > >
>> >> > > > > > > > > > > > >> > > > TL;DR Introduce method
>> >> PCollectionView.get(),
>> >> > > > > > > implemented
>> >> > > > > > > > > as:
>> >> > > > > > > > > > > get
>> >> > > > > > > > > > > > >> > > > thread-local ProcessContext and do
>> >> > > > > c.sideInput(this).
>> >> > > > > > > As a
>> >> > > > > > > > > > > result,
>> >> > > > > > > > > > > > >> any
>> >> > > > > > > > > > > > >> > > user
>> >> > > > > > > > > > > > >> > > > lambdas such as MapElements can use side
>> >> > inputs.
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > Quite a few transforms have user-code
>> >> > callbacks
>> >> > > or
>> >> > > > > > > > lambdas:
>> >> > > > > > > > > > > ParDo
>> >> > > > > > > > > > > > >> > (DoFn),
>> >> > > > > > > > > > > > >> > > > Map/FlatMapElements, the
>> DynamicDestinations
>> >> > > > classes
>> >> > > > > > in
>> >> > > > > > > > > > various
>> >> > > > > > > > > > > > IOs,
>> >> > > > > > > > > > > > >> > > > combine fns, the PollFn callback in
>> Watch,
>> >> > etc.
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > Of these, only DoFn and CombineFn have
>> >> > built-in
>> >> > > > > > support
>> >> > > > > > > > for
>> >> > > > > > > > > > side
>> >> > > > > > > > > > > > >> > inputs;
>> >> > > > > > > > > > > > >> > > > for DynamicDestinations it is plumbed
>> >> > > explicitly;
>> >> > > > > > others
>> >> > > > > > > > > don't
>> >> > > > > > > > > > > > have
>> >> > > > > > > > > > > > >> > > access
>> >> > > > > > > > > > > > >> > > > (e.g. you can't access side inputs from
>> >> > > > > > > > Map/FlatMapElements
>> >> > > > > > > > > > > > because
>> >> > > > > > > > > > > > >> > they
>> >> > > > > > > > > > > > >> > > > don't have a ProcessContext or any
>> context
>> >> for
>> >> > > > that
>> >> > > > > > > > matter).
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > I think it's important to solve this,
>> >> > especially
>> >> > > > as
>> >> > > > > > > Java 8
>> >> > > > > > > > > > > becomes
>> >> > > > > > > > > > > > >> > > people's
>> >> > > > > > > > > > > > >> > > > default choice: users will want to use
>> side
>> >> > > inputs
>> >> > > > > in
>> >> > > > > > > > > > > > >> > > Map/FlatMapElements.
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > It also appears to be quite easy to
>> >> implement:
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > Runner part:
>> >> > > > > > > > > > > > >> > > > - introduce a SideInputAccessor interface
>> >> > > > > > > > > > > > >> > > > - make .get() on a PCollectionView get it
>> >> > from a
>> >> > > > > > > > > thread-local
>> >> > > > > > > > > > > > >> > > > SideInputAccessor
>> >> > > > > > > > > > > > >> > > > - make runners set the thread-local
>> >> > > > > SideInputAccessor
>> >> > > > > > > any
>> >> > > > > > > > > time
>> >> > > > > > > > > > > the
>> >> > > > > > > > > > > > >> > runner
>> >> > > > > > > > > > > > >> > > > is evaluating something in a context
>> where
>> >> > side
>> >> > > > > inputs
>> >> > > > > > > are
>> >> > > > > > > > > > > > >> available,
>> >> > > > > > > > > > > > >> > > e.g.
>> >> > > > > > > > > > > > >> > > > a ProcessElement method, or applying a
>> >> > > CombineFn -
>> >> > > > > the
>> >> > > > > > > set
>> >> > > > > > > > > of
>> >> > > > > > > > > > > such
>> >> > > > > > > > > > > > >> > places
>> >> > > > > > > > > > > > >> > > > will be quite small. I believe only
>> runners
>> >> > (but
>> >> > > > not
>> >> > > > > > > > > > transforms)
>> >> > > > > > > > > > > > >> will
>> >> > > > > > > > > > > > >> > > ever
>> >> > > > > > > > > > > > >> > > > need to set this thread-local
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > Transform part:
>> >> > > > > > > > > > > > >> > > > - Transforms that take user-code lambdas
>> and
>> >> > > want
>> >> > > > to
>> >> > > > > > let
>> >> > > > > > > > > them
>> >> > > > > > > > > > > > access
>> >> > > > > > > > > > > > >> > side
>> >> > > > > > > > > > > > >> > > > inputs still will need to be configurable
>> >> > with a
>> >> > > > > > method
>> >> > > > > > > > like
>> >> > > > > > > > > > > > >> > > > .withSideInputs(view1, view2...) and will
>> >> need
>> >> > > to
>> >> > > > > > plumb
>> >> > > > > > > > > those
>> >> > > > > > > > > > > down
>> >> > > > > > > > > > > > >> to
>> >> > > > > > > > > > > > >> > the
>> >> > > > > > > > > > > > >> > > > primitive DoFn's or CombineFn's -
>> >> information
>> >> > on
>> >> > > > > > *which*
>> >> > > > > > > > > side
>> >> > > > > > > > > > > > inputs
>> >> > > > > > > > > > > > >> > will
>> >> > > > > > > > > > > > >> > > > be accessed, of course, still needs to
>> be in
>> >> > the
>> >> > > > > > graph.
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > > > I quickly prototyped this in direct
>> runner
>> >> and
>> >> > > > > > > > MapElements,
>> >> > > > > > > > > > and
>> >> > > > > > > > > > > it
>> >> > > > > > > > > > > > >> > worked
>> >> > > > > > > > > > > > >> > > > with no issues - I'm wondering if there's
>> >> > > > something
>> >> > > > > > > subtle
>> >> > > > > > > > > > that
>> >> > > > > > > > > > > > I'm
>> >> > > > > > > > > > > > >> > > > missing, or is it actually a good idea?
>> >> > > > > > > > > > > > >> > > >
>> >> > > > > > > > > > > > >> > >
>> >> > > > > > > > > > > > >> >
>> >> > > > > > > > > > > > >>
>> >> > > > > > > > > > > > >
>> >> > > > > > > > > > > >
>> >> > > > > > > > > > >
>> >> > > > > > > > > >
>> >> > > > > > > > >
>> >> > > > > > > >
>> >> > > > > > >
>> >> > > > > >
>> >> > > > >
>> >> > > >
>> >> > >
>> >> >
>> >>
>>

Reply via email to