On Sun, Jul 28, 2019 at 6:51 AM jincheng sun <sunjincheng...@gmail.com> wrote:
>
> Hi, Thomas & Robert, Thanks for your comments and providing relevant 
> discussions and JIRA links, very helpful to me!
>
> I am glad to see your affirmative response,  And I am glad to add my thoughts 
> on the comment you left:
> -----------------
>
> >> There are two distinct levels at which one can talk about a certain type 
> >> of state being supported: the user-visible SDK's API and the runner's API. 
> >> For example, BagState, ValueState, ReducingState, CombiningState,  can all 
> >> be implemented on top of a runner-offered MapState in the SDK. On the one 
> >> hand, there's a desire to keep the number of "primitive" states types to a 
> >> minimum (to ease the use of authoring runners), but if a runner can 
> >> perform a specific optimization due to knowing about the particular state 
> >> type it might be preferable to pass it through rather than emulate it in 
> >> the SDK.
> -------------------
> Agree. Regarding MapState, it's definitely needed as it cannot be implemented 
> on top of the existing BagState.
> Regarding ValueState, it can be implemented on top of BagState. However, we 
> can do optimization if we know a state is ValueState.
> For example, if a key is updated with a new value, if the ValueState is 
> implemented on top of BagState, two RPC calls are needed
> to write the new value back to runner: clear + append; if we know it's 
> ValueState, just one RPC call is enough: set.
> We can discuss case by case whether a state type is needed.

In the Beam APIs [1] multiple state requests are consumed as a stream
in a single RPC, so clear followed by append still has low overhead.
Is that optimization not sufficient?

[1] 
https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573

> -------------------
>
> >> Note that in the protos, the GRPC ports have a coder attribute
> specifically to allow this kind of customization (and the SDKs should
> be respecting that). We've also talked about going beyond per-element
> encodings (e.g. using arrow to serialize entire batches across the
> whire). I think all the runner code simply uses the default and we
> could be more intelligent there.
> -------------------
>
> Yes, the gRPC allows to use customization coder. However, I'm afraid that 
> this is not enough as we want to use
> Beam's portability framework by depending on the modules used 
> (beam-runners-java-fn-execution and the Python SDK Harness) instead
> of copying that part of code to Flink. So it should also allow to use the 
> customization coder in beam-runners-java-fn-execution.
> Otherwise, we have to copy a lot of code to Flink to use the customization 
> coder.

Agreed, beam-runners-java-fn-execution does not take advantage of the
full flexibility of the protocol, and would make a lot of sense to
enhance it to be able to.

> -------------------
>
> >> I'm wary of having too many buffer size configuration options (is
> there a compelling reason to make it bigger or smaller?) but something
> timebased would be very useful.
> -------------------
>
> I think the default values of buffer size are not needed to change for most 
> cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
> Will 1MB makes more sense?

IIRC, 10MB was the point at which, according to benchmarks Luke did
quite a while ago, there was clearly no performance benefit in making
it larger. Coupled with a time-based threshold, I don't see much of an
advantage to lowering it.

> -------------------
>
> >> The idea of StandardCoders is it's a set of coders that all runners
> and SDKs can be assumed to understand. If you have an element encoded
> with something other Coder, then there's no way to know if the other
> side will be able to decode it (or, indeed, even properly detect
> element boundaries in the stream of contiguous encoded elements).
> Adding a length prefixed coder wrapping allows the other side to at
> least pull it out and pass it around as encoded bytes. In other words,
> whether an encoded element needs length prefixing is a function of the
> other process you're trying to communicate with (and we don't have the
> mechanisms, and I'm not sure it's worth the complexity, to do some
> kind of coder negotiation between the processes here.) Of course for a
> UDF, if the other side does not know about the element type in
> question it'd be difficult (in general) to meaningfully process the
> element.
>
> The work on schemas, and making those portable, will result in a much
> richer set of element types that can be passed through "standard"
> coders.
> -------------------
>
> The design makes sense to me. My concern is that if a coder is not among the 
> StandardCoders, it will be prefixed with a length even if the harness knows 
> how to decode it.

If the harness knows how to decode it, the length prefixing is just a
lost optimization opportunity, but it'll still work. Whether this is a
big enough loss to merit introducing an extra protocol to negotiate on
the set of commonly known coders beyond standard coders is still TBD,
but probably not for v1 (and possibly not ever, especially as schemas
become more expressive).

> Besides, I'm also curious about the standard whether a coder can be put into 
> StandardCoders.
> For example, I noticed that FLOAT is not among StandardCoders, while DOUBLE 
> is among it.

StandardCoders is supposed to be some sort of lowest common
denominator, but theres no hard and fast criteria. For this example,
some languages (e.g. Python) don't have the notion of FLOAT, and using
a FLOAT coder for Python floats (whose underling representation is
double) gets tricky as this coder is not faithful. We also don't have
specific int coders for smaller-than-64-bit types which, like float,
are easily promoted.

> Best, Jincheng
>
> Robert Bradshaw <rober...@google.com> 于2019年7月25日周四 下午2:00写道:
>>
>> On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise <t...@apache.org> wrote:
>> >
>> > Hi Jincheng,
>> >
>> > It is very exciting to see this follow-up, that you have done your 
>> > research on the current state and that there is the intention to join 
>> > forces on the portability effort!
>> >
>> > I have added a few pointers inline.
>> >
>> > Several of the issues you identified affect our usage of Beam as well. 
>> > These present an opportunity for collaboration.
>>
>> +1, a lot of this aligns with improvements we'd like to make as well.
>>
>> > On Wed, Jul 24, 2019 at 2:53 AM jincheng sun <sunjincheng...@gmail.com> 
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Thanks Max and all of your kind words. :)
>> >>
>> >> Sorry for the late reply as I'm busy working on the Flink 1.9 release. 
>> >> For the next major release of Flink, we plan to add Python user defined 
>> >> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> >> portability framework and think that it is perfect for our requirements. 
>> >> However we also find some improvements needed for Beam:
>> >>
>> >> Must Have:
>> >> ----------------
>> >> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> >> should support more kinds of state types, such as MapState, ValueState, 
>> >> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> >> because these kinds of state will be used in both user-defined function 
>> >> or Flink Python DataStream API.
>> >
>> > There has been discussion about the need for different state types and to 
>> > efficiently support those on the runner side there may be a need to look 
>> > at the over the wire representation also.
>> >
>> > https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
>> > https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E
>>
>> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the
>> runner's API. For example, BagState, ValueState, ReducingState,
>> CombiningState,  can all be implemented on top of a runner-offered
>> MapState in the SDK. On the one hand, there's a desire to keep the
>> number of "primitive" states types to a minimum (to ease the use of
>> authoring runners), but if a runner can perform a specific
>> optimization due to knowing about the particular state type it might
>> be preferable to pass it through rather than emulate it in the SDK.
>>
>> >> 2) There are warnings that Python 3 is not fully supported in Beam 
>> >> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> >> portability framework due to Python 2 will be not supported officially.
>> >
>> > This must be obsolete per latest comments on: 
>> > https://issues.apache.org/jira/browse/BEAM-1251
>> >
>> >> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory 
>> >> at the runner side. Why I think it's  must to have is because when the 
>> >> environment type is "PROCESS", the default value "/tmp" may become a big 
>> >> problem.
>>
>> There are still some issues to be worked out around exactly how
>> environments are set up (most notably around dependencies that are
>> external to the docker images, but also things like this).
>>
>> >> 4) The buffer size configure policy should be improved, such as:
>> >>    At runner side, the buffer limit in 
>> >> BeamFnDataBufferingOutboundObserver is size based. We should also support 
>> >> time based especially for the streaming case.
>> >>    At Python SDK Harness, the buffer size is not configurable in 
>> >> GrpcDataService. The input queue size of the input buffer in Python SDK 
>> >> Harness is not size limited.
>> >>   The flush threshold of the output buffer in Python SDK Harness is 10 MB 
>> >> by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the 
>> >> threshold configurable and support time based threshold.
>>
>> I'm wary of having too many buffer size configuration options (is
>> there a compelling reason to make it bigger or smaller?) but something
>> timebased would be very useful.
>>
>> >> Nice To Have:
>> >> -------------------
>> >> 1) Improves the interfaces of FnDataService, BundleProcessor, 
>> >> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to 
>> >> T. (We have already discussed in the previous mails)
>> >>
>> >> 2) Refactor the code to avoid unnecessary dependencies pull in. For 
>> >> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it 
>> >> is pull in because there are a few classes in beam-sdks-java-core are 
>> >> used in beam-runners-java-fn-execution, such as:
>> >> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
>> >> BeamFileSystemArtifactRetrievalService.
>> >> It means maybe we can add a new module such as beam-sdks-java-common to 
>> >> hold the classes used by both runner and SDK.
>> >>
>> >> 3) State cache is not shared between bundles which is performance 
>> >> critical for streaming jobs.
>> >
>> > This is rather important to address:
>> >
>> > https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E
>> >
>> >>
>> >>
>> >> 4) The coder of WindowedValue cannot be configured and most of time we 
>> >> don't need to serialize and deserialize the timestamp, window and pane 
>> >> properties in Flink. But currently FullWindowedValueCoder is used by 
>> >> default in WireCoders.addWireCoder, I suggest to make the coder 
>> >> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>>
>> For sure.
>>
>> Note that in the protos, the GRPC ports have a coder attribute
>> specifically to allow this kind of customization (and the SDKs should
>> be respecting that). We've also talked about going beyond per-element
>> encodings (e.g. using arrow to serialize entire batches across the
>> whire). I think all the runner code simply uses the default and we
>> could be more intelligent there.
>>
>> >> 5) Currently if a coder is not defined in StandardCoders, it will be 
>> >> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> 
>> >> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few 
>> >> coders are defined in StandardCoders. It means that for most coder, a 
>> >> length will be added to the serialized bytes which is not necessary in my 
>> >> thoughts. My suggestion is maybe we can add some interfaces or tags for 
>> >> the coder which indicate whether the coder is needed a length prefix or 
>> >> not.
>>
>> The idea of StandardCoders is it's a set of coders that all runners
>> and SDKs can be assumed to understand. If you have an element encoded
>> with something other Coder, then there's no way to know if the other
>> side will be able to decode it (or, indeed, even properly detect
>> element boundaries in the stream of contiguous encoded elements).
>> Adding a length prefixed coder wrapping allows the other side to at
>> least pull it out and pass it around as encoded bytes. In other words,
>> whether an encoded element needs length prefixing is a function of the
>> other process you're trying to communicate with (and we don't have the
>> mechanisms, and I'm not sure it's worth the complexity, to do some
>> kind of coder negotiation between the processes here.) Of course for a
>> UDF, if the other side does not know about the element type in
>> question it'd be difficult (in general) to meaningfully process the
>> element.
>>
>> The work on schemas, and making those portable, will result in a much
>> richer set of element types that can be passed through "standard"
>> coders.
>>
>> (Hopefully this answers your question below as well.)
>>
>> >> 6) Set log level according to PipelineOption in Python SDK Harness. 
>> >> Currently the log level is set to INFO by default.
>> >
>> > https://issues.apache.org/jira/browse/BEAM-5468
>>
>> Yeah, someone needs to just go and do this.
>>
>> >> 7) Allows to start up StatusServer according to PipelineOption in Python 
>> >> SDK Harness. Currently the StatusServer is start up by default.
>> >>
>> >> Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
>> >> related, I still think they are very critical for Python UDF execution 
>> >> performance.
>> >>
>> >> Open questions:
>> >> ---------------------
>> >> 1) Which coders should be / can be defined in StandardCoders?
>> >>
>> >> Currently we are preparing the design of how to support Python UDF in 
>> >> Flink based on the Beam portability framework and we will bring up the 
>> >> discussion in Flink community. We may propose more changes for Beam 
>> >> during that time and may need more support from Beam community.
>> >>
>> >> To be honest, I'm not an expert of Beam and so please feel free to 
>> >> correct me if my understanding is wrong. Welcome any feedback.
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >> Maximilian Michels <m...@apache.org> 于2019年4月25日周四 上午12:14写道:
>> >>>
>> >>> Fully agree that this is an effort that goes beyond changing a type
>> >>> parameter but I think we have a chance here to cooperate between the two
>> >>> projects. I would be happy to help out where I can.
>> >>>
>> >>> I'm not sure at this point what exactly is feasible for reuse but I
>> >>> would imagine the Runner-related code to be useful as well for the
>> >>> interaction with the SDK Harness. There are some fundamental differences
>> >>> in the model, e.g. how windowing works, which might be challenging to
>> >>> work around.
>> >>>
>> >>> Thanks,
>> >>> Max
>> >>>
>> >>> On 24.04.19 12:03, jincheng sun wrote:
>> >>> >
>> >>> > Hi Kenn, I think you are right, the Python SDK harness can be shared to
>> >>> > Flink, and also need to add some new primitive operations. Regarding
>> >>> > runner-side, I think most of the code which in runners/java-fun-
>> >>> > Execution is can be shared(but need some improvement, such as
>> >>> > FnDataService), some of them cannot be shared, such as job submission
>> >>> > code. So, we may need to set up a document to clearly analyze which 
>> >>> > ones
>> >>> > can be shared, which ones can be shared but need to do some changes, 
>> >>> > and
>> >>> > which ones are definitely cannot be shared.
>> >>> >
>> >>> > Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn
>> >>> > service as a library, also willing to do more efforts for this.
>> >>> >  From the view of the current code, abstracting Fn Service into a class
>> >>> > library that other projects can rely on requires a lot of effort from
>> >>> > the Beam community. Turn `WindowedValue<T>` into `T` is just the
>> >>> > beginning of this effort. If the Beam community is willing on
>> >>> > abstracting Fn Service into a class library that can be relied upon by
>> >>> > other projects, I can try to draft a document, of course during this
>> >>> > period may need a lot of help from you, Kenn, Lukasz, and the Beam
>> >>> > community. (I am a recruit in the Beam community :-))
>> >>> >
>> >>> > What do you think?
>> >>> >
>> >>> > Regards,
>> >>> > Jincheng
>> >>> >
>> >>> > Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> 于2019年4月24
>> >>> > 日周三 上午3:32写道:
>> >>> >
>> >>> >     It seems to me that the most valuable code to share and keep up 
>> >>> > with
>> >>> >     is the Python/Go/etc SDK harness; they would need to be enhanced
>> >>> >     with new primitive operations. So you would want to depend directly
>> >>> >     and share the original proto-generated classes too, which Beam
>> >>> >     publishes as separate artifacts for Java. Is the runner-side 
>> >>> > support
>> >>> >     code that valuable for direct integration into Flink? I would 
>> >>> > expect
>> >>> >     once you get past trivial wrappers (that you can copy/paste with no
>> >>> >     loss) you would hit differences in architecture so you would 
>> >>> > diverge
>> >>> >     anyhow.
>> >>> >
>> >>> >     Kenn
>> >>> >
>> >>> >     On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <m...@apache.org
>> >>> >     <mailto:m...@apache.org>> wrote:
>> >>> >
>> >>> >         Hi Jincheng,
>> >>> >
>> >>> >         Copying code is a solution for the short term. In the long run
>> >>> >         I'd like
>> >>> >         the Fn services to be a library not only for the Beam
>> >>> >         portability layer
>> >>> >         but also for other projects which want to leverage it. We 
>> >>> > should
>> >>> >         thus
>> >>> >         make an effort to make it more generic/extensible where
>> >>> >         necessary and
>> >>> >         feasible.
>> >>> >
>> >>> >         Since you are investigating reuse of Beam portability in the
>> >>> >         context of
>> >>> >         Flink, do you think it would make sense to setup a document
>> >>> >         where we
>> >>> >         collect ideas and challenges?
>> >>> >
>> >>> >         Thanks,
>> >>> >         Max
>> >>> >
>> >>> >         On 23.04.19 13:00, jincheng sun wrote:
>> >>> >          > Hi Reuven,
>> >>> >          >
>> >>> >          > I think you have provided an optional solution for other
>> >>> >         community which
>> >>> >          > wants to take advantage of Beam's existing
>> >>> >         achievements. Thank you very
>> >>> >          > much!
>> >>> >          >
>> >>> >          > I think the Flink community can choose to copy from Beam's
>> >>> >         code or
>> >>> >          > choose to rely directly on the beam's class library. The
>> >>> >         Flink community
>> >>> >          > also initiated a discussion, more info can be found here
>> >>> >          >
>> >>> >         
>> >>> > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>
>> >>> >          >
>> >>> >          > The purpose of Turns `WindowedValue<T>` into `T` is to
>> >>> >         promote the
>> >>> >          > interface design of Beam more versatile, so that other open
>> >>> >         source
>> >>> >          > projects have the opportunity to take advantage of Beam's
>> >>> >         existing
>> >>> >          > achievements. Of course, just changing the 
>> >>> > `WindowedValue<T>`
>> >>> >         into `T`
>> >>> >          > is not enough to be shared by other projects in the form of 
>> >>> > a
>> >>> >         class
>> >>> >          > library, we need to do more efforts. If Beam can provide a
>> >>> >         class library
>> >>> >          > in the future, other community contributors will also have 
>> >>> > the
>> >>> >          > willingness to contribute to the beam community. This will
>> >>> >         benefit both
>> >>> >          > the community that wants to take advantage of Beam's 
>> >>> > existing
>> >>> >          > achievements and the Beam community itself. And thanks to
>> >>> >         Thomas for
>> >>> >          > that he has also made a lot of efforts in this regard.
>> >>> >          >
>> >>> >          > Thanks again for your valuable suggestion, and welcome any
>> >>> >         feedback!
>> >>> >          >
>> >>> >          > Best,
>> >>> >          > Jincheng
>> >>> >          >
>> >>> >          > Reuven Lax <re...@google.com <mailto:re...@google.com>
>> >>> >         <mailto:re...@google.com <mailto:re...@google.com>>> 于2019年4月
>> >>> >         23日
>> >>> >          > 周二 上午1:00写道:
>> >>> >          >
>> >>> >          >     One concern here: these interfaces are intended for use
>> >>> >         within the
>> >>> >          >     Beam project. Beam may decide to make specific changes 
>> >>> > to
>> >>> >         them to
>> >>> >          >     support needed functionality in Beam. If they are being
>> >>> >         reused by
>> >>> >          >     other projects, then those changes risk breaking those 
>> >>> > other
>> >>> >          >     projects in unexpected ways. I don't think we can
>> >>> >         guarantee that we
>> >>> >          >     don't do that. If this is useful in Flink, it would be
>> >>> >         safer to copy
>> >>> >          >     the code IMO rather than to directly depend on it.
>> >>> >          >
>> >>> >          >     On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
>> >>> >          >     <sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>
>> >>> >         <mailto:sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>> >>> >          >
>> >>> >          >         Hi Kenn,
>> >>> >          >
>> >>> >          >         Thanks for your reply, and explained the design of
>> >>> >         WindowValue
>> >>> >          >         clearly!
>> >>> >          >
>> >>> >          >         At present, the definitions of `FnDataService` and
>> >>> >          >         `BeamFnDataClient` in Data Plane are very clear and
>> >>> >         universal,
>> >>> >          >         such as: send(...)/receive(...). If it is only
>> >>> >         applied in the
>> >>> >          >         project of Beam, it is already very good. Because
>> >>> >         `WindowValue`
>> >>> >          >         is a very basic data structure in the Beam project,
>> >>> >         both the
>> >>> >          >         Runner and the SDK harness have define the
>> >>> >         WindowedValue data
>> >>> >          >         structure.
>> >>> >          >
>> >>> >          >         The reason I want to change the interface parameter 
>> >>> > from
>> >>> >          >         `WindowedValue<T>` to T is because I want to make 
>> >>> > the
>> >>> >         `Data
>> >>> >          >         Plane` interface into a class library that can be
>> >>> >         used by other
>> >>> >          >         projects (such as Apache Flink), so that other
>> >>> >         projects Can have
>> >>> >          >         its own `FnDataService` implementation. However, the
>> >>> >         definition
>> >>> >          >         of `WindowedValue` does not apply to all projects.
>> >>> >         For example,
>> >>> >          >         Apache Flink also has a definition similar to
>> >>> >         WindowedValue. For
>> >>> >          >         example, Apache Flink Stream has StreamRecord. If we
>> >>> >         change
>> >>> >          >         `WindowedValue<T>` to T, then other project's
>> >>> >         implementation
>> >>> >          >         does not need to wrap WindowedValue, the interface
>> >>> >         will become
>> >>> >          >         more concise.  Furthermore,  we only need one T, 
>> >>> > such
>> >>> >         as the
>> >>> >          >         Apache Flink DataSet operator.
>> >>> >          >
>> >>> >          >         So, I agree with your understanding, I don't expect
>> >>> >          >         `WindowedValueXXX<T>` in the FnDataService 
>> >>> > interface,
>> >>> >         I hope to
>> >>> >          >         just use a `T`.
>> >>> >          >
>> >>> >          >         Have you seen some problem if we change the 
>> >>> > interface
>> >>> >         parameter
>> >>> >          >         from `WindowedValue<T>` to T?
>> >>> >          >
>> >>> >          >         Thanks,
>> >>> >          >         Jincheng
>> >>> >          >
>> >>> >          >         Kenneth Knowles <k...@apache.org
>> >>> >         <mailto:k...@apache.org> <mailto:k...@apache.org
>> >>> >         <mailto:k...@apache.org>>> 于
>> >>> >          >         2019年4月20日周六 上午2:38写道:
>> >>> >          >
>> >>> >          >             WindowedValue has always been an interface, not 
>> >>> > a
>> >>> >         concrete
>> >>> >          >             representation:
>> >>> >          >
>> >>> >         
>> >>> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>> >>> >          >
>> >>> >           
>> >>> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
>> >>> >          >             It is an abstract class because we started in
>> >>> >         Java 7 where
>> >>> >          >             you could not have default methods, and just due
>> >>> >         to legacy
>> >>> >          >             style concerns. it is not just discussed, but
>> >>> >         implemented,
>> >>> >          >             that there are WindowedValue implementations 
>> >>> > with
>> >>> >         fewer
>> >>> >          >             allocations.
>> >>> >          >             At the coder level, it was also always intended
>> >>> >         to have
>> >>> >          >             multiple encodings. We already do have separate
>> >>> >         encodings
>> >>> >          >             based on whether there is 1 window or multiple
>> >>> >         windows. The
>> >>> >          >             coder for a particular kind of WindowedValue
>> >>> >         should decide
>> >>> >          >             this. Before the Fn API none of this had to be
>> >>> >         standardized,
>> >>> >          >             because the runner could just choose whatever it
>> >>> >         wants. Now
>> >>> >          >             we have to standardize any encodings that 
>> >>> > runners and
>> >>> >          >             harnesses both need to know. There should be
>> >>> >         many, and
>> >>> >          >             adding more should be just a matter of
>> >>> >         standardization, no
>> >>> >          >             new design.
>> >>> >          >
>> >>> >          >             None of this should be user-facing or in the
>> >>> >         runner API /
>> >>> >          >             pipeline graph - that is critical to making it
>> >>> >         flexible on
>> >>> >          >             the backend between the runner & SDK harness.
>> >>> >          >
>> >>> >          >             If I understand it, from our offline discussion,
>> >>> >         you are
>> >>> >          >             interested in the case where you issue a
>> >>> >          >             ProcessBundleRequest to the SDK harness and none
>> >>> >         of the
>> >>> >          >             primitives in the subgraph will ever observe the
>> >>> >         metadata.
>> >>> >          >             So you want to not even have a tiny
>> >>> >          >             "WindowedValueWithNoMetadata". Is that accurate?
>> >>> >          >
>> >>> >          >             Kenn
>> >>> >          >
>> >>> >          >             On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
>> >>> >          >             <sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>
>> >>> >         <mailto:sunjincheng...@gmail.com 
>> >>> > <mailto:sunjincheng...@gmail.com>>>
>> >>> >          >             wrote:
>> >>> >          >
>> >>> >          >                 Thank you! And have a nice weekend!
>> >>> >          >
>> >>> >          >
>> >>> >          >                 Lukasz Cwik <lc...@google.com
>> >>> >         <mailto:lc...@google.com> <mailto:lc...@google.com
>> >>> >         <mailto:lc...@google.com>>>
>> >>> >          >                 于2019年4月20日周六 上午1:14写道:
>> >>> >          >
>> >>> >          >                     I have added you as a contributor.
>> >>> >          >
>> >>> >          >                     On Fri, Apr 19, 2019 at 9:56 AM 
>> >>> > jincheng sun
>> >>> >          >                     <sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>
>> >>> >          >                     <mailto:sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>> >>> >          >
>> >>> >          >                         Hi Lukasz,
>> >>> >          >
>> >>> >          >                         Thanks for your affirmation and
>> >>> >         provide more
>> >>> >          >                         contextual information. :)
>> >>> >          >
>> >>> >          >                         Would you please give me the 
>> >>> > contributor
>> >>> >          >                         permission?  My JIRA ID is
>> >>> >         sunjincheng121.
>> >>> >          >
>> >>> >          >                         I would like to create/assign 
>> >>> > tickets
>> >>> >         for this work.
>> >>> >          >
>> >>> >          >                         Thanks,
>> >>> >          >                         Jincheng
>> >>> >          >
>> >>> >          >                         Lukasz Cwik <lc...@google.com
>> >>> >         <mailto:lc...@google.com>
>> >>> >          >                         <mailto:lc...@google.com
>> >>> >         <mailto:lc...@google.com>>> 于2019年4月20日周六
>> >>> >          >                         上午12:26写道:
>> >>> >          >
>> >>> >          >                             Since I don't think this is a
>> >>> >         contentious
>> >>> >          >                             change.
>> >>> >          >
>> >>> >          >                             On Fri, Apr 19, 2019 at 9:25 AM
>> >>> >         Lukasz Cwik
>> >>> >          >                             <lc...@google.com
>> >>> >         <mailto:lc...@google.com> <mailto:lc...@google.com
>> >>> >         <mailto:lc...@google.com>>>
>> >>> >          >                             wrote:
>> >>> >          >
>> >>> >          >                                 Yes, using T makes sense.
>> >>> >          >
>> >>> >          >                                 The WindowedValue was meant
>> >>> >         to be a
>> >>> >          >                                 context object in the SDK
>> >>> >         harness that
>> >>> >          >                                 propagates various
>> >>> >         information about the
>> >>> >          >                                 current element. We have
>> >>> >         discussed in
>> >>> >          >                                 the past about:
>> >>> >          >                                 * making optimizations which
>> >>> >         would pass
>> >>> >          >                                 around less of the context
>> >>> >         information
>> >>> >          >                                 if we know that the DoFns
>> >>> >         don't need it
>> >>> >          >                                 (for example, all the values
>> >>> >         share the
>> >>> >          >                                 same window).
>> >>> >          >                                 * versioning the encoding
>> >>> >         separately
>> >>> >          >                                 from the WindowedValue
>> >>> >         context object
>> >>> >          >                                 (see recent discussion about
>> >>> >         element
>> >>> >          >                                 timestamp precision [1])
>> >>> >          >                                 * the runner may want its 
>> >>> > own
>> >>> >          >                                 representation of a context
>> >>> >         object that
>> >>> >          >                                 makes sense for it which 
>> >>> > isn't a
>> >>> >          >                                 WindowedValue necessarily.
>> >>> >          >
>> >>> >          >                                 Feel free to cut a JIRA 
>> >>> > about
>> >>> >         this and
>> >>> >          >                                 start working on a change
>> >>> >         towards this.
>> >>> >          >
>> >>> >          >                                 1:
>> >>> >          >
>> >>> >         
>> >>> > https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>> >>> >          >
>> >>> >          >                                 On Fri, Apr 19, 2019 at 3:18
>> >>> >         AM jincheng
>> >>> >          >                                 sun 
>> >>> > <sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>
>> >>> >          >
>> >>> >           <mailto:sunjincheng...@gmail.com
>> >>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>> >>> >          >
>> >>> >          >                                     Hi Beam devs,
>> >>> >          >
>> >>> >          >                                     I read some of the docs 
>> >>> > about
>> >>> >          >                                     `Communicating over the
>> >>> >         Fn API` in
>> >>> >          >                                     Beam. I feel that Beam
>> >>> >         has a very
>> >>> >          >                                     good design for Control
>> >>> >         Plane/Data
>> >>> >          >                                     Plane/State 
>> >>> > Plane/Logging
>> >>> >         services,
>> >>> >          >                                     and it is described in
>> >>> >         <How to send
>> >>> >          >                                     and receive data>
>> >>> >         document. When
>> >>> >          >                                     communicating between
>> >>> >         Runner and SDK
>> >>> >          >                                     Harness, the DataPlane
>> >>> >         API will be
>> >>> >          >                                     WindowedValue(An
>> >>> >         immutable triple of
>> >>> >          >                                     value, timestamp, and
>> >>> >         windows.) As a
>> >>> >          >                                     contract object between
>> >>> >         Runner and
>> >>> >          >                                     SDK Harness. I see the
>> >>> >         interface
>> >>> >          >                                     definitions for sending 
>> >>> > and
>> >>> >          >                                     receiving data in the
>> >>> >         code as follows:
>> >>> >          >
>> >>> >          >                                     -
>> >>> >          >
>> >>> >           org.apache.beam.runners.fnexecution.data.FnDataService
>> >>> >          >
>> >>> >          >                                         public interface
>> >>> >         FnDataService {
>> >>> >          >                                            <T> 
>> >>> > InboundDataClient
>> >>> >          >                                         
>> >>> > receive(LogicalEndpoint
>> >>> >          >                                         inputLocation,
>> >>> >          >
>> >>> >           Coder<WindowedValue<T>> coder,
>> >>> >          >
>> >>> >           FnDataReceiver<WindowedValue<T>>
>> >>> >          >                                         listener);
>> >>> >          >                                            <T>
>> >>> >          >
>> >>> >           CloseableFnDataReceiver<WindowedValue<T>>
>> >>> >          >                                         send(
>> >>> >          >                                                
>> >>> > LogicalEndpoint
>> >>> >          >                                         outputLocation,
>> >>> >          >
>> >>> >           Coder<WindowedValue<T>> coder);
>> >>> >          >                                         }
>> >>> >          >
>> >>> >          >
>> >>> >          >
>> >>> >          >                                     -
>> >>> >          >
>> >>> >           org.apache.beam.fn.harness.data.BeamFnDataClient
>> >>> >          >
>> >>> >          >                                         public interface
>> >>> >         BeamFnDataClient {
>> >>> >          >                                            <T> 
>> >>> > InboundDataClient
>> >>> >          >
>> >>> >           receive(ApiServiceDescriptor
>> >>> >          >                                         
>> >>> > apiServiceDescriptor,
>> >>> >          >                                         LogicalEndpoint
>> >>> >         inputLocation,
>> >>> >          >
>> >>> >           Coder<WindowedValue<T>> coder,
>> >>> >          >
>> >>> >           FnDataReceiver<WindowedValue<T>>
>> >>> >          >                                         receiver);
>> >>> >          >                                            <T>
>> >>> >          >
>> >>> >           CloseableFnDataReceiver<WindowedValue<T>>
>> >>> >          >                                         
>> >>> > send(BeamFnDataGrpcClient
>> >>> >          >
>> >>> >           Endpoints.ApiServiceDescriptor
>> >>> >          >                                         
>> >>> > apiServiceDescriptor,
>> >>> >          >                                         LogicalEndpoint
>> >>> >         outputLocation,
>> >>> >          >
>> >>> >           Coder<WindowedValue<T>> coder);
>> >>> >          >                                         }
>> >>> >          >
>> >>> >          >
>> >>> >          >                                     Both
>> >>> >         `Coder<WindowedValue<T>>` and
>> >>> >          >
>> >>> >           `FnDataReceiver<WindowedValue<T>>`
>> >>> >          >                                     use `WindowedValue` as
>> >>> >         the data
>> >>> >          >                                     structure that both 
>> >>> > sides
>> >>> >         of Runner
>> >>> >          >                                     and SDK Harness know 
>> >>> > each
>> >>> >         other.
>> >>> >          >                                     Control Plane/Data
>> >>> >         Plane/State
>> >>> >          >                                     Plane/Logging is a 
>> >>> > highly
>> >>> >          >                                     abstraction, such as
>> >>> >         Control Plane
>> >>> >          >                                     and Logging, these are 
>> >>> > common
>> >>> >          >                                     requirements for all
>> >>> >         multi-language
>> >>> >          >                                     platforms. For example,
>> >>> >         the Flink
>> >>> >          >                                     community is also
>> >>> >         discussing how to
>> >>> >          >                                     support Python UDF, as
>> >>> >         well as how
>> >>> >          >                                     to deal with docker
>> >>> >         environment. how
>> >>> >          >                                     to data transfer, how to
>> >>> >         state
>> >>> >          >                                     access, how to logging
>> >>> >         etc. If Beam
>> >>> >          >                                     can further abstract
>> >>> >         these service
>> >>> >          >                                     interfaces, i.e., 
>> >>> > interface
>> >>> >          >                                     definitions are
>> >>> >         compatible with
>> >>> >          >                                     multiple engines, and 
>> >>> > finally
>> >>> >          >                                     provided to other
>> >>> >         projects in the
>> >>> >          >                                     form of class 
>> >>> > libraries, it
>> >>> >          >                                     definitely will help
>> >>> >         other platforms
>> >>> >          >                                     that want to support 
>> >>> > multiple
>> >>> >          >                                     languages. So could beam
>> >>> >         can further
>> >>> >          >                                     abstract the interface
>> >>> >         definition of
>> >>> >          >                                     FnDataService's
>> >>> >         BeamFnDataClient?
>> >>> >          >                                     Here I am to throw out a
>> >>> >         minnow to
>> >>> >          >                                     catch a whale, take the
>> >>> >          >                                     FnDataService#receive
>> >>> >         interface as
>> >>> >          >                                     an example, and turn
>> >>> >          >                                     `WindowedValue<T>` into
>> >>> >         `T` so that
>> >>> >          >                                     other platforms can be
>> >>> >         extended
>> >>> >          >                                     arbitrarily, as follows:
>> >>> >          >
>> >>> >          >                                     <T> InboundDataClient
>> >>> >          >                                     receive(LogicalEndpoint
>> >>> >          >                                     inputLocation, Coder<T>
>> >>> >         coder,
>> >>> >          >                                     FnDataReceiver<T>> 
>> >>> > listener);
>> >>> >          >
>> >>> >          >                                     What do you think?
>> >>> >          >
>> >>> >          >                                     Feel free to correct me
>> >>> >         if there any
>> >>> >          >                                     incorrect understanding.
>> >>> >         And welcome
>> >>> >          >                                     any feedback!
>> >>> >          >
>> >>> >          >
>> >>> >          >                                     Regards,
>> >>> >          >                                     Jincheng
>> >>> >          >
>> >>> >

Reply via email to