Hi Jincheng,

Copying code is a solution for the short term. In the long run I'd like the Fn services to be a library not only for the Beam portability layer but also for other projects which want to leverage it. We should thus make an effort to make it more generic/extensible where necessary and feasible.

Since you are investigating reuse of Beam portability in the context of Flink, do you think it would make sense to setup a document where we collect ideas and challenges?

Thanks,
Max

On 23.04.19 13:00, jincheng sun wrote:
Hi Reuven,

I think you have provided an optional solution for other community which wants to take advantage of Beam's existing achievements. Thank you very much!

I think the Flink community can choose to copy from Beam's code or choose to rely directly on the beam's class library. The Flink community also initiated a discussion, more info can be found here <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096>

The purpose of Turns `WindowedValue<T>` into `T` is to promote the interface design of Beam more versatile, so that other open source projects have the opportunity to take advantage of Beam's existing achievements. Of course, just changing the `WindowedValue<T>` into `T` is not enough to be shared by other projects in the form of a class library, we need to do more efforts. If Beam can provide a class library in the future, other community contributors will also have the willingness to contribute to the beam community. This will benefit both the community that wants to take advantage of Beam's existing achievements and the Beam community itself. And thanks to Thomas for that he has also made a lot of efforts in this regard.

Thanks again for your valuable suggestion, and welcome any feedback!

Best,
Jincheng

Reuven Lax <[email protected] <mailto:[email protected]>> 于2019年4月23日 周二 上午1:00写道:

    One concern here: these interfaces are intended for use within the
    Beam project. Beam may decide to make specific changes to them to
    support needed functionality in Beam. If they are being reused by
    other projects, then those changes risk breaking those other
    projects in unexpected ways. I don't think we can guarantee that we
    don't do that. If this is useful in Flink, it would be safer to copy
    the code IMO rather than to directly depend on it.

    On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
    <[email protected] <mailto:[email protected]>> wrote:

        Hi Kenn,

        Thanks for your reply, and explained the design of WindowValue
        clearly!

        At present, the definitions of `FnDataService` and
        `BeamFnDataClient` in Data Plane are very clear and universal,
        such as: send(...)/receive(...). If it is only applied in the
        project of Beam, it is already very good. Because `WindowValue`
        is a very basic data structure in the Beam project, both the
        Runner and the SDK harness have define the WindowedValue data
        structure.

        The reason I want to change the interface parameter from
        `WindowedValue<T>` to T is because I want to make the `Data
        Plane` interface into a class library that can be used by other
        projects (such as Apache Flink), so that other projects Can have
        its own `FnDataService` implementation. However, the definition
        of `WindowedValue` does not apply to all projects. For example,
        Apache Flink also has a definition similar to WindowedValue. For
        example, Apache Flink Stream has StreamRecord. If we change
        `WindowedValue<T>` to T, then other project's implementation
        does not need to wrap WindowedValue, the interface will become
        more concise.  Furthermore,  we only need one T, such as the
        Apache Flink DataSet operator.

        So, I agree with your understanding, I don't expect
        `WindowedValueXXX<T>` in the FnDataService interface, I hope to
        just use a `T`.

        Have you seen some problem if we change the interface parameter
        from `WindowedValue<T>` to T?

        Thanks,
        Jincheng

        Kenneth Knowles <[email protected] <mailto:[email protected]>> 于
        2019年4月20日周六 上午2:38写道:

            WindowedValue has always been an interface, not a concrete
            representation:
            
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
            
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>.
            It is an abstract class because we started in Java 7 where
            you could not have default methods, and just due to legacy
            style concerns. it is not just discussed, but implemented,
            that there are WindowedValue implementations with fewer
            allocations.
            At the coder level, it was also always intended to have
            multiple encodings. We already do have separate encodings
            based on whether there is 1 window or multiple windows. The
            coder for a particular kind of WindowedValue should decide
            this. Before the Fn API none of this had to be standardized,
            because the runner could just choose whatever it wants. Now
            we have to standardize any encodings that runners and
            harnesses both need to know. There should be many, and
            adding more should be just a matter of standardization, no
            new design.

            None of this should be user-facing or in the runner API /
            pipeline graph - that is critical to making it flexible on
            the backend between the runner & SDK harness.

            If I understand it, from our offline discussion, you are
            interested in the case where you issue a
            ProcessBundleRequest to the SDK harness and none of the
            primitives in the subgraph will ever observe the metadata.
            So you want to not even have a tiny
            "WindowedValueWithNoMetadata". Is that accurate?

            Kenn

            On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
            <[email protected] <mailto:[email protected]>>
            wrote:

                Thank you! And have a nice weekend!


                Lukasz Cwik <[email protected] <mailto:[email protected]>>
                于2019年4月20日周六 上午1:14写道:

                    I have added you as a contributor.

                    On Fri, Apr 19, 2019 at 9:56 AM jincheng sun
                    <[email protected]
                    <mailto:[email protected]>> wrote:

                        Hi Lukasz,

                        Thanks for your affirmation and provide more
                        contextual information. :)

                        Would you please give me the contributor
                        permission?  My JIRA ID is sunjincheng121.

                        I would like to create/assign tickets for this work.

                        Thanks,
                        Jincheng

                        Lukasz Cwik <[email protected]
                        <mailto:[email protected]>> 于2019年4月20日周六
                        上午12:26写道:

                            Since I don't think this is a contentious
                            change.

                            On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik
                            <[email protected] <mailto:[email protected]>>
                            wrote:

                                Yes, using T makes sense.

                                The WindowedValue was meant to be a
                                context object in the SDK harness that
                                propagates various information about the
                                current element. We have discussed in
                                the past about:
                                * making optimizations which would pass
                                around less of the context information
                                if we know that the DoFns don't need it
                                (for example, all the values share the
                                same window).
                                * versioning the encoding separately
                                from the WindowedValue context object
                                (see recent discussion about element
                                timestamp precision [1])
                                * the runner may want its own
                                representation of a context object that
                                makes sense for it which isn't a
                                WindowedValue necessarily.

                                Feel free to cut a JIRA about this and
                                start working on a change towards this.

                                1:
                                
https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E

                                On Fri, Apr 19, 2019 at 3:18 AM jincheng
                                sun <[email protected]
                                <mailto:[email protected]>> wrote:

                                    Hi Beam devs,

                                    I read some of the docs about
                                    `Communicating over the Fn API` in
                                    Beam. I feel that Beam has a very
                                    good design for Control Plane/Data
                                    Plane/State Plane/Logging services,
                                    and it is described in <How to send
                                    and receive data> document. When
                                    communicating between Runner and SDK
                                    Harness, the DataPlane API will be
                                    WindowedValue(An immutable triple of
                                    value, timestamp, and windows.) As a
                                    contract object between Runner and
                                    SDK Harness. I see the interface
                                    definitions for sending and
                                    receiving data in the code as follows:

                                    -
                                    
org.apache.beam.runners.fnexecution.data.FnDataService

                                        public interface FnDataService {
                                           <T> InboundDataClient
                                        receive(LogicalEndpoint
                                        inputLocation,
                                        Coder<WindowedValue<T>> coder,
                                        FnDataReceiver<WindowedValue<T>>
                                        listener);
                                           <T>
                                        
CloseableFnDataReceiver<WindowedValue<T>>
                                        send(
                                               LogicalEndpoint
                                        outputLocation,
                                        Coder<WindowedValue<T>> coder);
                                        }



                                    -
                                    
org.apache.beam.fn.harness.data.BeamFnDataClient

                                        public interface BeamFnDataClient {
                                           <T> InboundDataClient
                                        receive(ApiServiceDescriptor
                                        apiServiceDescriptor,
                                        LogicalEndpoint inputLocation,
                                        Coder<WindowedValue<T>> coder,
                                        FnDataReceiver<WindowedValue<T>>
                                        receiver);
                                           <T>
                                        
CloseableFnDataReceiver<WindowedValue<T>>
                                        send(BeamFnDataGrpcClient
                                        Endpoints.ApiServiceDescriptor
                                        apiServiceDescriptor,
                                        LogicalEndpoint outputLocation,
                                        Coder<WindowedValue<T>> coder);
                                        }


                                    Both `Coder<WindowedValue<T>>` and
                                    `FnDataReceiver<WindowedValue<T>>`
                                    use `WindowedValue` as the data
                                    structure that both sides of Runner
                                    and SDK Harness know each other.
                                    Control Plane/Data Plane/State
                                    Plane/Logging is a highly
                                    abstraction, such as Control Plane
                                    and Logging, these are common
                                    requirements for all multi-language
                                    platforms. For example, the Flink
                                    community is also discussing how to
                                    support Python UDF, as well as how
                                    to deal with docker environment. how
                                    to data transfer, how to state
                                    access, how to logging etc. If Beam
                                    can further abstract these service
                                    interfaces, i.e., interface
                                    definitions are compatible with
                                    multiple engines, and finally
                                    provided to other projects in the
                                    form of class libraries, it
                                    definitely will help other platforms
                                    that want to support multiple
                                    languages. So could beam can further
                                    abstract the interface definition of
                                    FnDataService's BeamFnDataClient?
                                    Here I am to throw out a minnow to
                                    catch a whale, take the
                                    FnDataService#receive interface as
                                    an example, and turn
                                    `WindowedValue<T>` into `T` so that
                                    other platforms can be extended
                                    arbitrarily, as follows:

                                    <T> InboundDataClient
                                    receive(LogicalEndpoint
                                    inputLocation, Coder<T> coder,
                                    FnDataReceiver<T>> listener);

                                    What do you think?

                                    Feel free to correct me if there any
                                    incorrect understanding. And welcome
                                    any feedback!


                                    Regards,
                                    Jincheng

Reply via email to