Fully agree that this is an effort that goes beyond changing a type parameter but I think we have a chance here to cooperate between the two projects. I would be happy to help out where I can.

I'm not sure at this point what exactly is feasible for reuse but I would imagine the Runner-related code to be useful as well for the interaction with the SDK Harness. There are some fundamental differences in the model, e.g. how windowing works, which might be challenging to work around.

Thanks,
Max

On 24.04.19 12:03, jincheng sun wrote:

Hi Kenn, I think you are right, the Python SDK harness can be shared to Flink, and also need to add some new primitive operations. Regarding runner-side, I think most of the code which in runners/java-fun- Execution is can be shared(but need some improvement, such as FnDataService), some of them cannot be shared, such as job submission code. So, we may need to set up a document to clearly analyze which ones can be shared, which ones can be shared but need to do some changes, and which ones are definitely cannot be shared.

Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn service as a library, also willing to do more efforts for this. From the view of the current code, abstracting Fn Service into a class library that other projects can rely on requires a lot of effort from the Beam community. Turn `WindowedValue<T>` into `T` is just the beginning of this effort. If the Beam community is willing on abstracting Fn Service into a class library that can be relied upon by other projects, I can try to draft a document, of course during this period may need a lot of help from you, Kenn, Lukasz, and the Beam community. (I am a recruit in the Beam community :-))

What do you think?

Regards,
Jincheng

Kenneth Knowles <[email protected] <mailto:[email protected]>> 于2019年4月24 日周三 上午3:32写道:

    It seems to me that the most valuable code to share and keep up with
    is the Python/Go/etc SDK harness; they would need to be enhanced
    with new primitive operations. So you would want to depend directly
    and share the original proto-generated classes too, which Beam
    publishes as separate artifacts for Java. Is the runner-side support
    code that valuable for direct integration into Flink? I would expect
    once you get past trivial wrappers (that you can copy/paste with no
    loss) you would hit differences in architecture so you would diverge
    anyhow.

    Kenn

    On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Jincheng,

        Copying code is a solution for the short term. In the long run
        I'd like
        the Fn services to be a library not only for the Beam
        portability layer
        but also for other projects which want to leverage it. We should
        thus
        make an effort to make it more generic/extensible where
        necessary and
        feasible.

        Since you are investigating reuse of Beam portability in the
        context of
        Flink, do you think it would make sense to setup a document
        where we
        collect ideas and challenges?

        Thanks,
        Max

        On 23.04.19 13:00, jincheng sun wrote:
         > Hi Reuven,
         >
         > I think you have provided an optional solution for other
        community which
         > wants to take advantage of Beam's existing
        achievements. Thank you very
         > much!
         >
         > I think the Flink community can choose to copy from Beam's
        code or
         > choose to rely directly on the beam's class library. The
        Flink community
         > also initiated a discussion, more info can be found here
         >
        
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>
         >
         > The purpose of Turns `WindowedValue<T>` into `T` is to
        promote the
         > interface design of Beam more versatile, so that other open
        source
         > projects have the opportunity to take advantage of Beam's
        existing
         > achievements. Of course, just changing the `WindowedValue<T>`
        into `T`
         > is not enough to be shared by other projects in the form of a
        class
         > library, we need to do more efforts. If Beam can provide a
        class library
         > in the future, other community contributors will also have the
         > willingness to contribute to the beam community. This will
        benefit both
         > the community that wants to take advantage of Beam's existing
         > achievements and the Beam community itself. And thanks to
        Thomas for
         > that he has also made a lot of efforts in this regard.
         >
         > Thanks again for your valuable suggestion, and welcome any
        feedback!
         >
         > Best,
         > Jincheng
         >
         > Reuven Lax <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>> 于2019年4月
        23日
         > 周二 上午1:00写道:
         >
         >     One concern here: these interfaces are intended for use
        within the
         >     Beam project. Beam may decide to make specific changes to
        them to
         >     support needed functionality in Beam. If they are being
        reused by
         >     other projects, then those changes risk breaking those other
         >     projects in unexpected ways. I don't think we can
        guarantee that we
         >     don't do that. If this is useful in Flink, it would be
        safer to copy
         >     the code IMO rather than to directly depend on it.
         >
         >     On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
         >     <[email protected]
        <mailto:[email protected]>
        <mailto:[email protected]
        <mailto:[email protected]>>> wrote:
         >
         >         Hi Kenn,
         >
         >         Thanks for your reply, and explained the design of
        WindowValue
         >         clearly!
         >
         >         At present, the definitions of `FnDataService` and
         >         `BeamFnDataClient` in Data Plane are very clear and
        universal,
         >         such as: send(...)/receive(...). If it is only
        applied in the
         >         project of Beam, it is already very good. Because
        `WindowValue`
         >         is a very basic data structure in the Beam project,
        both the
         >         Runner and the SDK harness have define the
        WindowedValue data
         >         structure.
         >
         >         The reason I want to change the interface parameter from
         >         `WindowedValue<T>` to T is because I want to make the
        `Data
         >         Plane` interface into a class library that can be
        used by other
         >         projects (such as Apache Flink), so that other
        projects Can have
         >         its own `FnDataService` implementation. However, the
        definition
         >         of `WindowedValue` does not apply to all projects.
        For example,
         >         Apache Flink also has a definition similar to
        WindowedValue. For
         >         example, Apache Flink Stream has StreamRecord. If we
        change
         >         `WindowedValue<T>` to T, then other project's
        implementation
         >         does not need to wrap WindowedValue, the interface
        will become
         >         more concise.  Furthermore,  we only need one T, such
        as the
         >         Apache Flink DataSet operator.
         >
         >         So, I agree with your understanding, I don't expect
         >         `WindowedValueXXX<T>` in the FnDataService interface,
        I hope to
         >         just use a `T`.
         >
         >         Have you seen some problem if we change the interface
        parameter
         >         from `WindowedValue<T>` to T?
         >
         >         Thanks,
         >         Jincheng
         >
         >         Kenneth Knowles <[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>> 于
         >         2019年4月20日周六 上午2:38写道:
         >
         >             WindowedValue has always been an interface, not a
        concrete
         >             representation:
         >
        
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>  <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
         >             It is an abstract class because we started in
        Java 7 where
         >             you could not have default methods, and just due
        to legacy
         >             style concerns. it is not just discussed, but
        implemented,
         >             that there are WindowedValue implementations with
        fewer
         >             allocations.
         >             At the coder level, it was also always intended
        to have
         >             multiple encodings. We already do have separate
        encodings
         >             based on whether there is 1 window or multiple
        windows. The
         >             coder for a particular kind of WindowedValue
        should decide
         >             this. Before the Fn API none of this had to be
        standardized,
         >             because the runner could just choose whatever it
        wants. Now
         >             we have to standardize any encodings that runners and
         >             harnesses both need to know. There should be
        many, and
         >             adding more should be just a matter of
        standardization, no
         >             new design.
         >
         >             None of this should be user-facing or in the
        runner API /
         >             pipeline graph - that is critical to making it
        flexible on
         >             the backend between the runner & SDK harness.
         >
         >             If I understand it, from our offline discussion,
        you are
         >             interested in the case where you issue a
         >             ProcessBundleRequest to the SDK harness and none
        of the
         >             primitives in the subgraph will ever observe the
        metadata.
         >             So you want to not even have a tiny
         >             "WindowedValueWithNoMetadata". Is that accurate?
         >
         >             Kenn
         >
         >             On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
         >             <[email protected]
        <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
         >             wrote:
         >
         >                 Thank you! And have a nice weekend!
         >
         >
         >                 Lukasz Cwik <[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>>
         >                 于2019年4月20日周六 上午1:14写道:
         >
         >                     I have added you as a contributor.
         >
         >                     On Fri, Apr 19, 2019 at 9:56 AM jincheng sun
         >                     <[email protected]
        <mailto:[email protected]>
         >                     <mailto:[email protected]
        <mailto:[email protected]>>> wrote:
         >
         >                         Hi Lukasz,
         >
         >                         Thanks for your affirmation and
        provide more
         >                         contextual information. :)
         >
         >                         Would you please give me the contributor
         >                         permission?  My JIRA ID is
        sunjincheng121.
         >
         >                         I would like to create/assign tickets
        for this work.
         >
         >                         Thanks,
         >                         Jincheng
         >
         >                         Lukasz Cwik <[email protected]
        <mailto:[email protected]>
         >                         <mailto:[email protected]
        <mailto:[email protected]>>> 于2019年4月20日周六
         >                         上午12:26写道:
         >
         >                             Since I don't think this is a
        contentious
         >                             change.
         >
         >                             On Fri, Apr 19, 2019 at 9:25 AM
        Lukasz Cwik
         >                             <[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>>
         >                             wrote:
         >
         >                                 Yes, using T makes sense.
         >
         >                                 The WindowedValue was meant
        to be a
         >                                 context object in the SDK
        harness that
         >                                 propagates various
        information about the
         >                                 current element. We have
        discussed in
         >                                 the past about:
         >                                 * making optimizations which
        would pass
         >                                 around less of the context
        information
         >                                 if we know that the DoFns
        don't need it
         >                                 (for example, all the values
        share the
         >                                 same window).
         >                                 * versioning the encoding
        separately
         >                                 from the WindowedValue
        context object
         >                                 (see recent discussion about
        element
         >                                 timestamp precision [1])
         >                                 * the runner may want its own
         >                                 representation of a context
        object that
         >                                 makes sense for it which isn't a
         >                                 WindowedValue necessarily.
         >
         >                                 Feel free to cut a JIRA about
        this and
         >                                 start working on a change
        towards this.
         >
         >                                 1:
         >
        
https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
         >
         >                                 On Fri, Apr 19, 2019 at 3:18
        AM jincheng
         >                                 sun <[email protected]
        <mailto:[email protected]>
>  <mailto:[email protected]
        <mailto:[email protected]>>> wrote:
         >
         >                                     Hi Beam devs,
         >
         >                                     I read some of the docs about
         >                                     `Communicating over the
        Fn API` in
         >                                     Beam. I feel that Beam
        has a very
         >                                     good design for Control
        Plane/Data
         >                                     Plane/State Plane/Logging
        services,
         >                                     and it is described in
        <How to send
         >                                     and receive data>
        document. When
         >                                     communicating between
        Runner and SDK
         >                                     Harness, the DataPlane
        API will be
         >                                     WindowedValue(An
        immutable triple of
         >                                     value, timestamp, and
        windows.) As a
         >                                     contract object between
        Runner and
         >                                     SDK Harness. I see the
        interface
         >                                     definitions for sending and
         >                                     receiving data in the
        code as follows:
         >
         >                                     -
>  org.apache.beam.runners.fnexecution.data.FnDataService
         >
         >                                         public interface
        FnDataService {
         >                                            <T> InboundDataClient
         >                                         receive(LogicalEndpoint
         >                                         inputLocation,
>  Coder<WindowedValue<T>> coder, >  FnDataReceiver<WindowedValue<T>>
         >                                         listener);
         >                                            <T>
>  CloseableFnDataReceiver<WindowedValue<T>>
         >                                         send(
         >                                                LogicalEndpoint
         >                                         outputLocation,
>  Coder<WindowedValue<T>> coder);
         >                                         }
         >
         >
         >
         >                                     -
>  org.apache.beam.fn.harness.data.BeamFnDataClient
         >
         >                                         public interface
        BeamFnDataClient {
         >                                            <T> InboundDataClient
>  receive(ApiServiceDescriptor
         >                                         apiServiceDescriptor,
         >                                         LogicalEndpoint
        inputLocation,
>  Coder<WindowedValue<T>> coder, >  FnDataReceiver<WindowedValue<T>>
         >                                         receiver);
         >                                            <T>
>  CloseableFnDataReceiver<WindowedValue<T>>
         >                                         send(BeamFnDataGrpcClient
>  Endpoints.ApiServiceDescriptor
         >                                         apiServiceDescriptor,
         >                                         LogicalEndpoint
        outputLocation,
>  Coder<WindowedValue<T>> coder);
         >                                         }
         >
         >
         >                                     Both
        `Coder<WindowedValue<T>>` and
>  `FnDataReceiver<WindowedValue<T>>`
         >                                     use `WindowedValue` as
        the data
         >                                     structure that both sides
        of Runner
         >                                     and SDK Harness know each
        other.
         >                                     Control Plane/Data
        Plane/State
         >                                     Plane/Logging is a highly
         >                                     abstraction, such as
        Control Plane
         >                                     and Logging, these are common
         >                                     requirements for all
        multi-language
         >                                     platforms. For example,
        the Flink
         >                                     community is also
        discussing how to
         >                                     support Python UDF, as
        well as how
         >                                     to deal with docker
        environment. how
         >                                     to data transfer, how to
        state
         >                                     access, how to logging
        etc. If Beam
         >                                     can further abstract
        these service
         >                                     interfaces, i.e., interface
         >                                     definitions are
        compatible with
         >                                     multiple engines, and finally
         >                                     provided to other
        projects in the
         >                                     form of class libraries, it
         >                                     definitely will help
        other platforms
         >                                     that want to support multiple
         >                                     languages. So could beam
        can further
         >                                     abstract the interface
        definition of
         >                                     FnDataService's
        BeamFnDataClient?
         >                                     Here I am to throw out a
        minnow to
         >                                     catch a whale, take the
         >                                     FnDataService#receive
        interface as
         >                                     an example, and turn
         >                                     `WindowedValue<T>` into
        `T` so that
         >                                     other platforms can be
        extended
         >                                     arbitrarily, as follows:
         >
         >                                     <T> InboundDataClient
         >                                     receive(LogicalEndpoint
         >                                     inputLocation, Coder<T>
        coder,
         >                                     FnDataReceiver<T>> listener);
         >
         >                                     What do you think?
         >
         >                                     Feel free to correct me
        if there any
         >                                     incorrect understanding.
        And welcome
         >                                     any feedback!
         >
         >
         >                                     Regards,
         >                                     Jincheng
         >

Reply via email to