It seems to me that the most valuable code to share and keep up with is the
Python/Go/etc SDK harness; they would need to be enhanced with new
primitive operations. So you would want to depend directly and share the
original proto-generated classes too, which Beam publishes as separate
artifacts for Java. Is the runner-side support code that valuable for
direct integration into Flink? I would expect once you get past trivial
wrappers (that you can copy/paste with no loss) you would hit differences
in architecture so you would diverge anyhow.

Kenn

On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <m...@apache.org> wrote:

> Hi Jincheng,
>
> Copying code is a solution for the short term. In the long run I'd like
> the Fn services to be a library not only for the Beam portability layer
> but also for other projects which want to leverage it. We should thus
> make an effort to make it more generic/extensible where necessary and
> feasible.
>
> Since you are investigating reuse of Beam portability in the context of
> Flink, do you think it would make sense to setup a document where we
> collect ideas and challenges?
>
> Thanks,
> Max
>
> On 23.04.19 13:00, jincheng sun wrote:
> > Hi Reuven,
> >
> > I think you have provided an optional solution for other community which
> > wants to take advantage of Beam's existing achievements. Thank you very
> > much!
> >
> > I think the Flink community can choose to copy from Beam's code or
> > choose to rely directly on the beam's class library. The Flink community
> > also initiated a discussion, more info can be found here
> > <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
> >
> >
> > The purpose of Turns `WindowedValue<T>` into `T` is to promote the
> > interface design of Beam more versatile, so that other open source
> > projects have the opportunity to take advantage of Beam's existing
> > achievements. Of course, just changing the `WindowedValue<T>` into `T`
> > is not enough to be shared by other projects in the form of a class
> > library, we need to do more efforts. If Beam can provide a class library
> > in the future, other community contributors will also have the
> > willingness to contribute to the beam community. This will benefit both
> > the community that wants to take advantage of Beam's existing
> > achievements and the Beam community itself. And thanks to Thomas for
> > that he has also made a lot of efforts in this regard.
> >
> > Thanks again for your valuable suggestion, and welcome any feedback!
> >
> > Best,
> > Jincheng
> >
> > Reuven Lax <re...@google.com <mailto:re...@google.com>> 于2019年4月23日
> > 周二 上午1:00写道:
> >
> >     One concern here: these interfaces are intended for use within the
> >     Beam project. Beam may decide to make specific changes to them to
> >     support needed functionality in Beam. If they are being reused by
> >     other projects, then those changes risk breaking those other
> >     projects in unexpected ways. I don't think we can guarantee that we
> >     don't do that. If this is useful in Flink, it would be safer to copy
> >     the code IMO rather than to directly depend on it.
> >
> >     On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
> >     <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote:
> >
> >         Hi Kenn,
> >
> >         Thanks for your reply, and explained the design of WindowValue
> >         clearly!
> >
> >         At present, the definitions of `FnDataService` and
> >         `BeamFnDataClient` in Data Plane are very clear and universal,
> >         such as: send(...)/receive(...). If it is only applied in the
> >         project of Beam, it is already very good. Because `WindowValue`
> >         is a very basic data structure in the Beam project, both the
> >         Runner and the SDK harness have define the WindowedValue data
> >         structure.
> >
> >         The reason I want to change the interface parameter from
> >         `WindowedValue<T>` to T is because I want to make the `Data
> >         Plane` interface into a class library that can be used by other
> >         projects (such as Apache Flink), so that other projects Can have
> >         its own `FnDataService` implementation. However, the definition
> >         of `WindowedValue` does not apply to all projects. For example,
> >         Apache Flink also has a definition similar to WindowedValue. For
> >         example, Apache Flink Stream has StreamRecord. If we change
> >         `WindowedValue<T>` to T, then other project's implementation
> >         does not need to wrap WindowedValue, the interface will become
> >         more concise.  Furthermore,  we only need one T, such as the
> >         Apache Flink DataSet operator.
> >
> >         So, I agree with your understanding, I don't expect
> >         `WindowedValueXXX<T>` in the FnDataService interface, I hope to
> >         just use a `T`.
> >
> >         Have you seen some problem if we change the interface parameter
> >         from `WindowedValue<T>` to T?
> >
> >         Thanks,
> >         Jincheng
> >
> >         Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> 于
> >         2019年4月20日周六 上午2:38写道:
> >
> >             WindowedValue has always been an interface, not a concrete
> >             representation:
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
> >             <
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52
> >.
> >             It is an abstract class because we started in Java 7 where
> >             you could not have default methods, and just due to legacy
> >             style concerns. it is not just discussed, but implemented,
> >             that there are WindowedValue implementations with fewer
> >             allocations.
> >             At the coder level, it was also always intended to have
> >             multiple encodings. We already do have separate encodings
> >             based on whether there is 1 window or multiple windows. The
> >             coder for a particular kind of WindowedValue should decide
> >             this. Before the Fn API none of this had to be standardized,
> >             because the runner could just choose whatever it wants. Now
> >             we have to standardize any encodings that runners and
> >             harnesses both need to know. There should be many, and
> >             adding more should be just a matter of standardization, no
> >             new design.
> >
> >             None of this should be user-facing or in the runner API /
> >             pipeline graph - that is critical to making it flexible on
> >             the backend between the runner & SDK harness.
> >
> >             If I understand it, from our offline discussion, you are
> >             interested in the case where you issue a
> >             ProcessBundleRequest to the SDK harness and none of the
> >             primitives in the subgraph will ever observe the metadata.
> >             So you want to not even have a tiny
> >             "WindowedValueWithNoMetadata". Is that accurate?
> >
> >             Kenn
> >
> >             On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
> >             <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>>
> >             wrote:
> >
> >                 Thank you! And have a nice weekend!
> >
> >
> >                 Lukasz Cwik <lc...@google.com <mailto:lc...@google.com>>
> >                 于2019年4月20日周六 上午1:14写道:
> >
> >                     I have added you as a contributor.
> >
> >                     On Fri, Apr 19, 2019 at 9:56 AM jincheng sun
> >                     <sunjincheng...@gmail.com
> >                     <mailto:sunjincheng...@gmail.com>> wrote:
> >
> >                         Hi Lukasz,
> >
> >                         Thanks for your affirmation and provide more
> >                         contextual information. :)
> >
> >                         Would you please give me the contributor
> >                         permission?  My JIRA ID is sunjincheng121.
> >
> >                         I would like to create/assign tickets for this
> work.
> >
> >                         Thanks,
> >                         Jincheng
> >
> >                         Lukasz Cwik <lc...@google.com
> >                         <mailto:lc...@google.com>> 于2019年4月20日周六
> >                         上午12:26写道:
> >
> >                             Since I don't think this is a contentious
> >                             change.
> >
> >                             On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik
> >                             <lc...@google.com <mailto:lc...@google.com>>
> >                             wrote:
> >
> >                                 Yes, using T makes sense.
> >
> >                                 The WindowedValue was meant to be a
> >                                 context object in the SDK harness that
> >                                 propagates various information about the
> >                                 current element. We have discussed in
> >                                 the past about:
> >                                 * making optimizations which would pass
> >                                 around less of the context information
> >                                 if we know that the DoFns don't need it
> >                                 (for example, all the values share the
> >                                 same window).
> >                                 * versioning the encoding separately
> >                                 from the WindowedValue context object
> >                                 (see recent discussion about element
> >                                 timestamp precision [1])
> >                                 * the runner may want its own
> >                                 representation of a context object that
> >                                 makes sense for it which isn't a
> >                                 WindowedValue necessarily.
> >
> >                                 Feel free to cut a JIRA about this and
> >                                 start working on a change towards this.
> >
> >                                 1:
> >
> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
> >
> >                                 On Fri, Apr 19, 2019 at 3:18 AM jincheng
> >                                 sun <sunjincheng...@gmail.com
> >                                 <mailto:sunjincheng...@gmail.com>>
> wrote:
> >
> >                                     Hi Beam devs,
> >
> >                                     I read some of the docs about
> >                                     `Communicating over the Fn API` in
> >                                     Beam. I feel that Beam has a very
> >                                     good design for Control Plane/Data
> >                                     Plane/State Plane/Logging services,
> >                                     and it is described in <How to send
> >                                     and receive data> document. When
> >                                     communicating between Runner and SDK
> >                                     Harness, the DataPlane API will be
> >                                     WindowedValue(An immutable triple of
> >                                     value, timestamp, and windows.) As a
> >                                     contract object between Runner and
> >                                     SDK Harness. I see the interface
> >                                     definitions for sending and
> >                                     receiving data in the code as
> follows:
> >
> >                                     -
> >
>  org.apache.beam.runners.fnexecution.data.FnDataService
> >
> >                                         public interface FnDataService {
> >                                            <T> InboundDataClient
> >                                         receive(LogicalEndpoint
> >                                         inputLocation,
> >                                         Coder<WindowedValue<T>> coder,
> >                                         FnDataReceiver<WindowedValue<T>>
> >                                         listener);
> >                                            <T>
> >
>  CloseableFnDataReceiver<WindowedValue<T>>
> >                                         send(
> >                                                LogicalEndpoint
> >                                         outputLocation,
> >                                         Coder<WindowedValue<T>> coder);
> >                                         }
> >
> >
> >
> >                                     -
> >
>  org.apache.beam.fn.harness.data.BeamFnDataClient
> >
> >                                         public interface
> BeamFnDataClient {
> >                                            <T> InboundDataClient
> >                                         receive(ApiServiceDescriptor
> >                                         apiServiceDescriptor,
> >                                         LogicalEndpoint inputLocation,
> >                                         Coder<WindowedValue<T>> coder,
> >                                         FnDataReceiver<WindowedValue<T>>
> >                                         receiver);
> >                                            <T>
> >
>  CloseableFnDataReceiver<WindowedValue<T>>
> >                                         send(BeamFnDataGrpcClient
> >                                         Endpoints.ApiServiceDescriptor
> >                                         apiServiceDescriptor,
> >                                         LogicalEndpoint outputLocation,
> >                                         Coder<WindowedValue<T>> coder);
> >                                         }
> >
> >
> >                                     Both `Coder<WindowedValue<T>>` and
> >                                     `FnDataReceiver<WindowedValue<T>>`
> >                                     use `WindowedValue` as the data
> >                                     structure that both sides of Runner
> >                                     and SDK Harness know each other.
> >                                     Control Plane/Data Plane/State
> >                                     Plane/Logging is a highly
> >                                     abstraction, such as Control Plane
> >                                     and Logging, these are common
> >                                     requirements for all multi-language
> >                                     platforms. For example, the Flink
> >                                     community is also discussing how to
> >                                     support Python UDF, as well as how
> >                                     to deal with docker environment. how
> >                                     to data transfer, how to state
> >                                     access, how to logging etc. If Beam
> >                                     can further abstract these service
> >                                     interfaces, i.e., interface
> >                                     definitions are compatible with
> >                                     multiple engines, and finally
> >                                     provided to other projects in the
> >                                     form of class libraries, it
> >                                     definitely will help other platforms
> >                                     that want to support multiple
> >                                     languages. So could beam can further
> >                                     abstract the interface definition of
> >                                     FnDataService's BeamFnDataClient?
> >                                     Here I am to throw out a minnow to
> >                                     catch a whale, take the
> >                                     FnDataService#receive interface as
> >                                     an example, and turn
> >                                     `WindowedValue<T>` into `T` so that
> >                                     other platforms can be extended
> >                                     arbitrarily, as follows:
> >
> >                                     <T> InboundDataClient
> >                                     receive(LogicalEndpoint
> >                                     inputLocation, Coder<T> coder,
> >                                     FnDataReceiver<T>> listener);
> >
> >                                     What do you think?
> >
> >                                     Feel free to correct me if there any
> >                                     incorrect understanding. And welcome
> >                                     any feedback!
> >
> >
> >                                     Regards,
> >                                     Jincheng
> >
>

Reply via email to