On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise <t...@apache.org> wrote:
>
> Hi Jincheng,
>
> It is very exciting to see this follow-up, that you have done your research 
> on the current state and that there is the intention to join forces on the 
> portability effort!
>
> I have added a few pointers inline.
>
> Several of the issues you identified affect our usage of Beam as well. These 
> present an opportunity for collaboration.

+1, a lot of this aligns with improvements we'd like to make as well.

> On Wed, Jul 24, 2019 at 2:53 AM jincheng sun <sunjincheng...@gmail.com> wrote:
>>
>> Hi all,
>>
>> Thanks Max and all of your kind words. :)
>>
>> Sorry for the late reply as I'm busy working on the Flink 1.9 release. For 
>> the next major release of Flink, we plan to add Python user defined 
>> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> portability framework and think that it is perfect for our requirements. 
>> However we also find some improvements needed for Beam:
>>
>> Must Have:
>> ----------------
>> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> should support more kinds of state types, such as MapState, ValueState, 
>> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> because these kinds of state will be used in both user-defined function or 
>> Flink Python DataStream API.
>
> There has been discussion about the need for different state types and to 
> efficiently support those on the runner side there may be a need to look at 
> the over the wire representation also.
>
> https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
> https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E

There are two distinct levels at which one can talk about a certain
type of state being supported: the user-visible SDK's API and the
runner's API. For example, BagState, ValueState, ReducingState,
CombiningState,  can all be implemented on top of a runner-offered
MapState in the SDK. On the one hand, there's a desire to keep the
number of "primitive" states types to a minimum (to ease the use of
authoring runners), but if a runner can perform a specific
optimization due to knowing about the particular state type it might
be preferable to pass it through rather than emulate it in the SDK.

>> 2) There are warnings that Python 3 is not fully supported in Beam 
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> portability framework due to Python 2 will be not supported officially.
>
> This must be obsolete per latest comments on: 
> https://issues.apache.org/jira/browse/BEAM-1251
>
>> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
>> the runner side. Why I think it's  must to have is because when the 
>> environment type is "PROCESS", the default value "/tmp" may become a big 
>> problem.

There are still some issues to be worked out around exactly how
environments are set up (most notably around dependencies that are
external to the docker images, but also things like this).

>> 4) The buffer size configure policy should be improved, such as:
>>    At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver 
>> is size based. We should also support time based especially for the 
>> streaming case.
>>    At Python SDK Harness, the buffer size is not configurable in 
>> GrpcDataService. The input queue size of the input buffer in Python SDK 
>> Harness is not size limited.
>>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
>> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the 
>> threshold configurable and support time based threshold.

I'm wary of having too many buffer size configuration options (is
there a compelling reason to make it bigger or smaller?) but something
timebased would be very useful.

>> Nice To Have:
>> -------------------
>> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
>> etc, to change the parameter type from WindowedValue<T> to T. (We have 
>> already discussed in the previous mails)
>>
>> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
>> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
>> because there are a few classes in beam-sdks-java-core are used in 
>> beam-runners-java-fn-execution, such as:
>> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
>> BeamFileSystemArtifactRetrievalService.
>> It means maybe we can add a new module such as beam-sdks-java-common to hold 
>> the classes used by both runner and SDK.
>>
>> 3) State cache is not shared between bundles which is performance critical 
>> for streaming jobs.
>
> This is rather important to address:
>
> https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E
>
>>
>>
>> 4) The coder of WindowedValue cannot be configured and most of time we don't 
>> need to serialize and deserialize the timestamp, window and pane properties 
>> in Flink. But currently FullWindowedValueCoder is used by default in 
>> WireCoders.addWireCoder, I suggest to make the coder configurable (i.e. 
>> allowing to use ValueOnlyWindowedValueCoder)

For sure.

Note that in the protos, the GRPC ports have a coder attribute
specifically to allow this kind of customization (and the SDKs should
be respecting that). We've also talked about going beyond per-element
encodings (e.g. using arrow to serialize entire batches across the
whire). I think all the runner code simply uses the default and we
could be more intelligent there.

>> 5) Currently if a coder is not defined in StandardCoders, it will be wrapped 
>> with LengthPrefixedCoder (WireCoders.addWireCoder -> 
>> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few 
>> coders are defined in StandardCoders. It means that for most coder, a length 
>> will be added to the serialized bytes which is not necessary in my thoughts. 
>> My suggestion is maybe we can add some interfaces or tags for the coder 
>> which indicate whether the coder is needed a length prefix or not.

The idea of StandardCoders is it's a set of coders that all runners
and SDKs can be assumed to understand. If you have an element encoded
with something other Coder, then there's no way to know if the other
side will be able to decode it (or, indeed, even properly detect
element boundaries in the stream of contiguous encoded elements).
Adding a length prefixed coder wrapping allows the other side to at
least pull it out and pass it around as encoded bytes. In other words,
whether an encoded element needs length prefixing is a function of the
other process you're trying to communicate with (and we don't have the
mechanisms, and I'm not sure it's worth the complexity, to do some
kind of coder negotiation between the processes here.) Of course for a
UDF, if the other side does not know about the element type in
question it'd be difficult (in general) to meaningfully process the
element.

The work on schemas, and making those portable, will result in a much
richer set of element types that can be passed through "standard"
coders.

(Hopefully this answers your question below as well.)

>> 6) Set log level according to PipelineOption in Python SDK Harness. 
>> Currently the log level is set to INFO by default.
>
> https://issues.apache.org/jira/browse/BEAM-5468

Yeah, someone needs to just go and do this.

>> 7) Allows to start up StatusServer according to PipelineOption in Python SDK 
>> Harness. Currently the StatusServer is start up by default.
>>
>> Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
>> related, I still think they are very critical for Python UDF execution 
>> performance.
>>
>> Open questions:
>> ---------------------
>> 1) Which coders should be / can be defined in StandardCoders?
>>
>> Currently we are preparing the design of how to support Python UDF in Flink 
>> based on the Beam portability framework and we will bring up the discussion 
>> in Flink community. We may propose more changes for Beam during that time 
>> and may need more support from Beam community.
>>
>> To be honest, I'm not an expert of Beam and so please feel free to correct 
>> me if my understanding is wrong. Welcome any feedback.
>>
>> Best,
>> Jincheng
>>
>> Maximilian Michels <m...@apache.org> 于2019年4月25日周四 上午12:14写道:
>>>
>>> Fully agree that this is an effort that goes beyond changing a type
>>> parameter but I think we have a chance here to cooperate between the two
>>> projects. I would be happy to help out where I can.
>>>
>>> I'm not sure at this point what exactly is feasible for reuse but I
>>> would imagine the Runner-related code to be useful as well for the
>>> interaction with the SDK Harness. There are some fundamental differences
>>> in the model, e.g. how windowing works, which might be challenging to
>>> work around.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 24.04.19 12:03, jincheng sun wrote:
>>> >
>>> > Hi Kenn, I think you are right, the Python SDK harness can be shared to
>>> > Flink, and also need to add some new primitive operations. Regarding
>>> > runner-side, I think most of the code which in runners/java-fun-
>>> > Execution is can be shared(but need some improvement, such as
>>> > FnDataService), some of them cannot be shared, such as job submission
>>> > code. So, we may need to set up a document to clearly analyze which ones
>>> > can be shared, which ones can be shared but need to do some changes, and
>>> > which ones are definitely cannot be shared.
>>> >
>>> > Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn
>>> > service as a library, also willing to do more efforts for this.
>>> >  From the view of the current code, abstracting Fn Service into a class
>>> > library that other projects can rely on requires a lot of effort from
>>> > the Beam community. Turn `WindowedValue<T>` into `T` is just the
>>> > beginning of this effort. If the Beam community is willing on
>>> > abstracting Fn Service into a class library that can be relied upon by
>>> > other projects, I can try to draft a document, of course during this
>>> > period may need a lot of help from you, Kenn, Lukasz, and the Beam
>>> > community. (I am a recruit in the Beam community :-))
>>> >
>>> > What do you think?
>>> >
>>> > Regards,
>>> > Jincheng
>>> >
>>> > Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> 于2019年4月24
>>> > 日周三 上午3:32写道:
>>> >
>>> >     It seems to me that the most valuable code to share and keep up with
>>> >     is the Python/Go/etc SDK harness; they would need to be enhanced
>>> >     with new primitive operations. So you would want to depend directly
>>> >     and share the original proto-generated classes too, which Beam
>>> >     publishes as separate artifacts for Java. Is the runner-side support
>>> >     code that valuable for direct integration into Flink? I would expect
>>> >     once you get past trivial wrappers (that you can copy/paste with no
>>> >     loss) you would hit differences in architecture so you would diverge
>>> >     anyhow.
>>> >
>>> >     Kenn
>>> >
>>> >     On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <m...@apache.org
>>> >     <mailto:m...@apache.org>> wrote:
>>> >
>>> >         Hi Jincheng,
>>> >
>>> >         Copying code is a solution for the short term. In the long run
>>> >         I'd like
>>> >         the Fn services to be a library not only for the Beam
>>> >         portability layer
>>> >         but also for other projects which want to leverage it. We should
>>> >         thus
>>> >         make an effort to make it more generic/extensible where
>>> >         necessary and
>>> >         feasible.
>>> >
>>> >         Since you are investigating reuse of Beam portability in the
>>> >         context of
>>> >         Flink, do you think it would make sense to setup a document
>>> >         where we
>>> >         collect ideas and challenges?
>>> >
>>> >         Thanks,
>>> >         Max
>>> >
>>> >         On 23.04.19 13:00, jincheng sun wrote:
>>> >          > Hi Reuven,
>>> >          >
>>> >          > I think you have provided an optional solution for other
>>> >         community which
>>> >          > wants to take advantage of Beam's existing
>>> >         achievements. Thank you very
>>> >          > much!
>>> >          >
>>> >          > I think the Flink community can choose to copy from Beam's
>>> >         code or
>>> >          > choose to rely directly on the beam's class library. The
>>> >         Flink community
>>> >          > also initiated a discussion, more info can be found here
>>> >          >
>>> >         
>>> > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>
>>> >          >
>>> >          > The purpose of Turns `WindowedValue<T>` into `T` is to
>>> >         promote the
>>> >          > interface design of Beam more versatile, so that other open
>>> >         source
>>> >          > projects have the opportunity to take advantage of Beam's
>>> >         existing
>>> >          > achievements. Of course, just changing the `WindowedValue<T>`
>>> >         into `T`
>>> >          > is not enough to be shared by other projects in the form of a
>>> >         class
>>> >          > library, we need to do more efforts. If Beam can provide a
>>> >         class library
>>> >          > in the future, other community contributors will also have the
>>> >          > willingness to contribute to the beam community. This will
>>> >         benefit both
>>> >          > the community that wants to take advantage of Beam's existing
>>> >          > achievements and the Beam community itself. And thanks to
>>> >         Thomas for
>>> >          > that he has also made a lot of efforts in this regard.
>>> >          >
>>> >          > Thanks again for your valuable suggestion, and welcome any
>>> >         feedback!
>>> >          >
>>> >          > Best,
>>> >          > Jincheng
>>> >          >
>>> >          > Reuven Lax <re...@google.com <mailto:re...@google.com>
>>> >         <mailto:re...@google.com <mailto:re...@google.com>>> 于2019年4月
>>> >         23日
>>> >          > 周二 上午1:00写道:
>>> >          >
>>> >          >     One concern here: these interfaces are intended for use
>>> >         within the
>>> >          >     Beam project. Beam may decide to make specific changes to
>>> >         them to
>>> >          >     support needed functionality in Beam. If they are being
>>> >         reused by
>>> >          >     other projects, then those changes risk breaking those 
>>> > other
>>> >          >     projects in unexpected ways. I don't think we can
>>> >         guarantee that we
>>> >          >     don't do that. If this is useful in Flink, it would be
>>> >         safer to copy
>>> >          >     the code IMO rather than to directly depend on it.
>>> >          >
>>> >          >     On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
>>> >          >     <sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>
>>> >         <mailto:sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>>> >          >
>>> >          >         Hi Kenn,
>>> >          >
>>> >          >         Thanks for your reply, and explained the design of
>>> >         WindowValue
>>> >          >         clearly!
>>> >          >
>>> >          >         At present, the definitions of `FnDataService` and
>>> >          >         `BeamFnDataClient` in Data Plane are very clear and
>>> >         universal,
>>> >          >         such as: send(...)/receive(...). If it is only
>>> >         applied in the
>>> >          >         project of Beam, it is already very good. Because
>>> >         `WindowValue`
>>> >          >         is a very basic data structure in the Beam project,
>>> >         both the
>>> >          >         Runner and the SDK harness have define the
>>> >         WindowedValue data
>>> >          >         structure.
>>> >          >
>>> >          >         The reason I want to change the interface parameter 
>>> > from
>>> >          >         `WindowedValue<T>` to T is because I want to make the
>>> >         `Data
>>> >          >         Plane` interface into a class library that can be
>>> >         used by other
>>> >          >         projects (such as Apache Flink), so that other
>>> >         projects Can have
>>> >          >         its own `FnDataService` implementation. However, the
>>> >         definition
>>> >          >         of `WindowedValue` does not apply to all projects.
>>> >         For example,
>>> >          >         Apache Flink also has a definition similar to
>>> >         WindowedValue. For
>>> >          >         example, Apache Flink Stream has StreamRecord. If we
>>> >         change
>>> >          >         `WindowedValue<T>` to T, then other project's
>>> >         implementation
>>> >          >         does not need to wrap WindowedValue, the interface
>>> >         will become
>>> >          >         more concise.  Furthermore,  we only need one T, such
>>> >         as the
>>> >          >         Apache Flink DataSet operator.
>>> >          >
>>> >          >         So, I agree with your understanding, I don't expect
>>> >          >         `WindowedValueXXX<T>` in the FnDataService interface,
>>> >         I hope to
>>> >          >         just use a `T`.
>>> >          >
>>> >          >         Have you seen some problem if we change the interface
>>> >         parameter
>>> >          >         from `WindowedValue<T>` to T?
>>> >          >
>>> >          >         Thanks,
>>> >          >         Jincheng
>>> >          >
>>> >          >         Kenneth Knowles <k...@apache.org
>>> >         <mailto:k...@apache.org> <mailto:k...@apache.org
>>> >         <mailto:k...@apache.org>>> 于
>>> >          >         2019年4月20日周六 上午2:38写道:
>>> >          >
>>> >          >             WindowedValue has always been an interface, not a
>>> >         concrete
>>> >          >             representation:
>>> >          >
>>> >         
>>> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>>> >          >
>>> >           
>>> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
>>> >          >             It is an abstract class because we started in
>>> >         Java 7 where
>>> >          >             you could not have default methods, and just due
>>> >         to legacy
>>> >          >             style concerns. it is not just discussed, but
>>> >         implemented,
>>> >          >             that there are WindowedValue implementations with
>>> >         fewer
>>> >          >             allocations.
>>> >          >             At the coder level, it was also always intended
>>> >         to have
>>> >          >             multiple encodings. We already do have separate
>>> >         encodings
>>> >          >             based on whether there is 1 window or multiple
>>> >         windows. The
>>> >          >             coder for a particular kind of WindowedValue
>>> >         should decide
>>> >          >             this. Before the Fn API none of this had to be
>>> >         standardized,
>>> >          >             because the runner could just choose whatever it
>>> >         wants. Now
>>> >          >             we have to standardize any encodings that runners 
>>> > and
>>> >          >             harnesses both need to know. There should be
>>> >         many, and
>>> >          >             adding more should be just a matter of
>>> >         standardization, no
>>> >          >             new design.
>>> >          >
>>> >          >             None of this should be user-facing or in the
>>> >         runner API /
>>> >          >             pipeline graph - that is critical to making it
>>> >         flexible on
>>> >          >             the backend between the runner & SDK harness.
>>> >          >
>>> >          >             If I understand it, from our offline discussion,
>>> >         you are
>>> >          >             interested in the case where you issue a
>>> >          >             ProcessBundleRequest to the SDK harness and none
>>> >         of the
>>> >          >             primitives in the subgraph will ever observe the
>>> >         metadata.
>>> >          >             So you want to not even have a tiny
>>> >          >             "WindowedValueWithNoMetadata". Is that accurate?
>>> >          >
>>> >          >             Kenn
>>> >          >
>>> >          >             On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
>>> >          >             <sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>
>>> >         <mailto:sunjincheng...@gmail.com 
>>> > <mailto:sunjincheng...@gmail.com>>>
>>> >          >             wrote:
>>> >          >
>>> >          >                 Thank you! And have a nice weekend!
>>> >          >
>>> >          >
>>> >          >                 Lukasz Cwik <lc...@google.com
>>> >         <mailto:lc...@google.com> <mailto:lc...@google.com
>>> >         <mailto:lc...@google.com>>>
>>> >          >                 于2019年4月20日周六 上午1:14写道:
>>> >          >
>>> >          >                     I have added you as a contributor.
>>> >          >
>>> >          >                     On Fri, Apr 19, 2019 at 9:56 AM jincheng 
>>> > sun
>>> >          >                     <sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>
>>> >          >                     <mailto:sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>>> >          >
>>> >          >                         Hi Lukasz,
>>> >          >
>>> >          >                         Thanks for your affirmation and
>>> >         provide more
>>> >          >                         contextual information. :)
>>> >          >
>>> >          >                         Would you please give me the 
>>> > contributor
>>> >          >                         permission?  My JIRA ID is
>>> >         sunjincheng121.
>>> >          >
>>> >          >                         I would like to create/assign tickets
>>> >         for this work.
>>> >          >
>>> >          >                         Thanks,
>>> >          >                         Jincheng
>>> >          >
>>> >          >                         Lukasz Cwik <lc...@google.com
>>> >         <mailto:lc...@google.com>
>>> >          >                         <mailto:lc...@google.com
>>> >         <mailto:lc...@google.com>>> 于2019年4月20日周六
>>> >          >                         上午12:26写道:
>>> >          >
>>> >          >                             Since I don't think this is a
>>> >         contentious
>>> >          >                             change.
>>> >          >
>>> >          >                             On Fri, Apr 19, 2019 at 9:25 AM
>>> >         Lukasz Cwik
>>> >          >                             <lc...@google.com
>>> >         <mailto:lc...@google.com> <mailto:lc...@google.com
>>> >         <mailto:lc...@google.com>>>
>>> >          >                             wrote:
>>> >          >
>>> >          >                                 Yes, using T makes sense.
>>> >          >
>>> >          >                                 The WindowedValue was meant
>>> >         to be a
>>> >          >                                 context object in the SDK
>>> >         harness that
>>> >          >                                 propagates various
>>> >         information about the
>>> >          >                                 current element. We have
>>> >         discussed in
>>> >          >                                 the past about:
>>> >          >                                 * making optimizations which
>>> >         would pass
>>> >          >                                 around less of the context
>>> >         information
>>> >          >                                 if we know that the DoFns
>>> >         don't need it
>>> >          >                                 (for example, all the values
>>> >         share the
>>> >          >                                 same window).
>>> >          >                                 * versioning the encoding
>>> >         separately
>>> >          >                                 from the WindowedValue
>>> >         context object
>>> >          >                                 (see recent discussion about
>>> >         element
>>> >          >                                 timestamp precision [1])
>>> >          >                                 * the runner may want its own
>>> >          >                                 representation of a context
>>> >         object that
>>> >          >                                 makes sense for it which isn't 
>>> > a
>>> >          >                                 WindowedValue necessarily.
>>> >          >
>>> >          >                                 Feel free to cut a JIRA about
>>> >         this and
>>> >          >                                 start working on a change
>>> >         towards this.
>>> >          >
>>> >          >                                 1:
>>> >          >
>>> >         
>>> > https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>>> >          >
>>> >          >                                 On Fri, Apr 19, 2019 at 3:18
>>> >         AM jincheng
>>> >          >                                 sun <sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>
>>> >          >
>>> >           <mailto:sunjincheng...@gmail.com
>>> >         <mailto:sunjincheng...@gmail.com>>> wrote:
>>> >          >
>>> >          >                                     Hi Beam devs,
>>> >          >
>>> >          >                                     I read some of the docs 
>>> > about
>>> >          >                                     `Communicating over the
>>> >         Fn API` in
>>> >          >                                     Beam. I feel that Beam
>>> >         has a very
>>> >          >                                     good design for Control
>>> >         Plane/Data
>>> >          >                                     Plane/State Plane/Logging
>>> >         services,
>>> >          >                                     and it is described in
>>> >         <How to send
>>> >          >                                     and receive data>
>>> >         document. When
>>> >          >                                     communicating between
>>> >         Runner and SDK
>>> >          >                                     Harness, the DataPlane
>>> >         API will be
>>> >          >                                     WindowedValue(An
>>> >         immutable triple of
>>> >          >                                     value, timestamp, and
>>> >         windows.) As a
>>> >          >                                     contract object between
>>> >         Runner and
>>> >          >                                     SDK Harness. I see the
>>> >         interface
>>> >          >                                     definitions for sending and
>>> >          >                                     receiving data in the
>>> >         code as follows:
>>> >          >
>>> >          >                                     -
>>> >          >
>>> >           org.apache.beam.runners.fnexecution.data.FnDataService
>>> >          >
>>> >          >                                         public interface
>>> >         FnDataService {
>>> >          >                                            <T> 
>>> > InboundDataClient
>>> >          >                                         receive(LogicalEndpoint
>>> >          >                                         inputLocation,
>>> >          >
>>> >           Coder<WindowedValue<T>> coder,
>>> >          >
>>> >           FnDataReceiver<WindowedValue<T>>
>>> >          >                                         listener);
>>> >          >                                            <T>
>>> >          >
>>> >           CloseableFnDataReceiver<WindowedValue<T>>
>>> >          >                                         send(
>>> >          >                                                LogicalEndpoint
>>> >          >                                         outputLocation,
>>> >          >
>>> >           Coder<WindowedValue<T>> coder);
>>> >          >                                         }
>>> >          >
>>> >          >
>>> >          >
>>> >          >                                     -
>>> >          >
>>> >           org.apache.beam.fn.harness.data.BeamFnDataClient
>>> >          >
>>> >          >                                         public interface
>>> >         BeamFnDataClient {
>>> >          >                                            <T> 
>>> > InboundDataClient
>>> >          >
>>> >           receive(ApiServiceDescriptor
>>> >          >                                         apiServiceDescriptor,
>>> >          >                                         LogicalEndpoint
>>> >         inputLocation,
>>> >          >
>>> >           Coder<WindowedValue<T>> coder,
>>> >          >
>>> >           FnDataReceiver<WindowedValue<T>>
>>> >          >                                         receiver);
>>> >          >                                            <T>
>>> >          >
>>> >           CloseableFnDataReceiver<WindowedValue<T>>
>>> >          >                                         
>>> > send(BeamFnDataGrpcClient
>>> >          >
>>> >           Endpoints.ApiServiceDescriptor
>>> >          >                                         apiServiceDescriptor,
>>> >          >                                         LogicalEndpoint
>>> >         outputLocation,
>>> >          >
>>> >           Coder<WindowedValue<T>> coder);
>>> >          >                                         }
>>> >          >
>>> >          >
>>> >          >                                     Both
>>> >         `Coder<WindowedValue<T>>` and
>>> >          >
>>> >           `FnDataReceiver<WindowedValue<T>>`
>>> >          >                                     use `WindowedValue` as
>>> >         the data
>>> >          >                                     structure that both sides
>>> >         of Runner
>>> >          >                                     and SDK Harness know each
>>> >         other.
>>> >          >                                     Control Plane/Data
>>> >         Plane/State
>>> >          >                                     Plane/Logging is a highly
>>> >          >                                     abstraction, such as
>>> >         Control Plane
>>> >          >                                     and Logging, these are 
>>> > common
>>> >          >                                     requirements for all
>>> >         multi-language
>>> >          >                                     platforms. For example,
>>> >         the Flink
>>> >          >                                     community is also
>>> >         discussing how to
>>> >          >                                     support Python UDF, as
>>> >         well as how
>>> >          >                                     to deal with docker
>>> >         environment. how
>>> >          >                                     to data transfer, how to
>>> >         state
>>> >          >                                     access, how to logging
>>> >         etc. If Beam
>>> >          >                                     can further abstract
>>> >         these service
>>> >          >                                     interfaces, i.e., interface
>>> >          >                                     definitions are
>>> >         compatible with
>>> >          >                                     multiple engines, and 
>>> > finally
>>> >          >                                     provided to other
>>> >         projects in the
>>> >          >                                     form of class libraries, it
>>> >          >                                     definitely will help
>>> >         other platforms
>>> >          >                                     that want to support 
>>> > multiple
>>> >          >                                     languages. So could beam
>>> >         can further
>>> >          >                                     abstract the interface
>>> >         definition of
>>> >          >                                     FnDataService's
>>> >         BeamFnDataClient?
>>> >          >                                     Here I am to throw out a
>>> >         minnow to
>>> >          >                                     catch a whale, take the
>>> >          >                                     FnDataService#receive
>>> >         interface as
>>> >          >                                     an example, and turn
>>> >          >                                     `WindowedValue<T>` into
>>> >         `T` so that
>>> >          >                                     other platforms can be
>>> >         extended
>>> >          >                                     arbitrarily, as follows:
>>> >          >
>>> >          >                                     <T> InboundDataClient
>>> >          >                                     receive(LogicalEndpoint
>>> >          >                                     inputLocation, Coder<T>
>>> >         coder,
>>> >          >                                     FnDataReceiver<T>> 
>>> > listener);
>>> >          >
>>> >          >                                     What do you think?
>>> >          >
>>> >          >                                     Feel free to correct me
>>> >         if there any
>>> >          >                                     incorrect understanding.
>>> >         And welcome
>>> >          >                                     any feedback!
>>> >          >
>>> >          >
>>> >          >                                     Regards,
>>> >          >                                     Jincheng
>>> >          >
>>> >

Reply via email to