Hi Kenn, I think you are right, the Python SDK harness can be shared to Flink, and also need to add some new primitive operations. Regarding runner-side, I think most of the code which in runners/java-fun- Execution is can be shared(but need some improvement, such as FnDataService), some of them cannot be shared, such as job submission code. So, we may need to set up a document to clearly analyze which ones can be shared, which ones can be shared but need to do some changes, and which ones are definitely cannot be shared.
Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn service as a library, also willing to do more efforts for this. >From the view of the current code, abstracting Fn Service into a class library that other projects can rely on requires a lot of effort from the Beam community. Turn `WindowedValue<T>` into `T` is just the beginning of this effort. If the Beam community is willing on abstracting Fn Service into a class library that can be relied upon by other projects, I can try to draft a document, of course during this period may need a lot of help from you, Kenn, Lukasz, and the Beam community. (I am a recruit in the Beam community :-)) What do you think? Regards, Jincheng Kenneth Knowles <[email protected]> 于2019年4月24日周三 上午3:32写道: > It seems to me that the most valuable code to share and keep up with is > the Python/Go/etc SDK harness; they would need to be enhanced with new > primitive operations. So you would want to depend directly and share the > original proto-generated classes too, which Beam publishes as separate > artifacts for Java. Is the runner-side support code that valuable for > direct integration into Flink? I would expect once you get past trivial > wrappers (that you can copy/paste with no loss) you would hit differences > in architecture so you would diverge anyhow. > > Kenn > > On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <[email protected]> wrote: > >> Hi Jincheng, >> >> Copying code is a solution for the short term. In the long run I'd like >> the Fn services to be a library not only for the Beam portability layer >> but also for other projects which want to leverage it. We should thus >> make an effort to make it more generic/extensible where necessary and >> feasible. >> >> Since you are investigating reuse of Beam portability in the context of >> Flink, do you think it would make sense to setup a document where we >> collect ideas and challenges? >> >> Thanks, >> Max >> >> On 23.04.19 13:00, jincheng sun wrote: >> > Hi Reuven, >> > >> > I think you have provided an optional solution for other community >> which >> > wants to take advantage of Beam's existing achievements. Thank you very >> > much! >> > >> > I think the Flink community can choose to copy from Beam's code or >> > choose to rely directly on the beam's class library. The Flink >> community >> > also initiated a discussion, more info can be found here >> > < >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096 >> > >> > >> > The purpose of Turns `WindowedValue<T>` into `T` is to promote the >> > interface design of Beam more versatile, so that other open source >> > projects have the opportunity to take advantage of Beam's existing >> > achievements. Of course, just changing the `WindowedValue<T>` into `T` >> > is not enough to be shared by other projects in the form of a class >> > library, we need to do more efforts. If Beam can provide a class >> library >> > in the future, other community contributors will also have the >> > willingness to contribute to the beam community. This will benefit both >> > the community that wants to take advantage of Beam's existing >> > achievements and the Beam community itself. And thanks to Thomas for >> > that he has also made a lot of efforts in this regard. >> > >> > Thanks again for your valuable suggestion, and welcome any feedback! >> > >> > Best, >> > Jincheng >> > >> > Reuven Lax <[email protected] <mailto:[email protected]>> 于2019年4月23日 >> > 周二 上午1:00写道: >> > >> > One concern here: these interfaces are intended for use within the >> > Beam project. Beam may decide to make specific changes to them to >> > support needed functionality in Beam. If they are being reused by >> > other projects, then those changes risk breaking those other >> > projects in unexpected ways. I don't think we can guarantee that we >> > don't do that. If this is useful in Flink, it would be safer to copy >> > the code IMO rather than to directly depend on it. >> > >> > On Mon, Apr 22, 2019 at 12:08 AM jincheng sun >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> > Hi Kenn, >> > >> > Thanks for your reply, and explained the design of WindowValue >> > clearly! >> > >> > At present, the definitions of `FnDataService` and >> > `BeamFnDataClient` in Data Plane are very clear and universal, >> > such as: send(...)/receive(...). If it is only applied in the >> > project of Beam, it is already very good. Because `WindowValue` >> > is a very basic data structure in the Beam project, both the >> > Runner and the SDK harness have define the WindowedValue data >> > structure. >> > >> > The reason I want to change the interface parameter from >> > `WindowedValue<T>` to T is because I want to make the `Data >> > Plane` interface into a class library that can be used by other >> > projects (such as Apache Flink), so that other projects Can have >> > its own `FnDataService` implementation. However, the definition >> > of `WindowedValue` does not apply to all projects. For example, >> > Apache Flink also has a definition similar to WindowedValue. For >> > example, Apache Flink Stream has StreamRecord. If we change >> > `WindowedValue<T>` to T, then other project's implementation >> > does not need to wrap WindowedValue, the interface will become >> > more concise. Furthermore, we only need one T, such as the >> > Apache Flink DataSet operator. >> > >> > So, I agree with your understanding, I don't expect >> > `WindowedValueXXX<T>` in the FnDataService interface, I hope to >> > just use a `T`. >> > >> > Have you seen some problem if we change the interface parameter >> > from `WindowedValue<T>` to T? >> > >> > Thanks, >> > Jincheng >> > >> > Kenneth Knowles <[email protected] <mailto:[email protected]>> 于 >> > 2019年4月20日周六 上午2:38写道: >> > >> > WindowedValue has always been an interface, not a concrete >> > representation: >> > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java >> > < >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52 >> >. >> > It is an abstract class because we started in Java 7 where >> > you could not have default methods, and just due to legacy >> > style concerns. it is not just discussed, but implemented, >> > that there are WindowedValue implementations with fewer >> > allocations. >> > At the coder level, it was also always intended to have >> > multiple encodings. We already do have separate encodings >> > based on whether there is 1 window or multiple windows. The >> > coder for a particular kind of WindowedValue should decide >> > this. Before the Fn API none of this had to be standardized, >> > because the runner could just choose whatever it wants. Now >> > we have to standardize any encodings that runners and >> > harnesses both need to know. There should be many, and >> > adding more should be just a matter of standardization, no >> > new design. >> > >> > None of this should be user-facing or in the runner API / >> > pipeline graph - that is critical to making it flexible on >> > the backend between the runner & SDK harness. >> > >> > If I understand it, from our offline discussion, you are >> > interested in the case where you issue a >> > ProcessBundleRequest to the SDK harness and none of the >> > primitives in the subgraph will ever observe the metadata. >> > So you want to not even have a tiny >> > "WindowedValueWithNoMetadata". Is that accurate? >> > >> > Kenn >> > >> > On Fri, Apr 19, 2019 at 10:17 AM jincheng sun >> > <[email protected] <mailto:[email protected] >> >> >> > wrote: >> > >> > Thank you! And have a nice weekend! >> > >> > >> > Lukasz Cwik <[email protected] <mailto:[email protected] >> >> >> > 于2019年4月20日周六 上午1:14写道: >> > >> > I have added you as a contributor. >> > >> > On Fri, Apr 19, 2019 at 9:56 AM jincheng sun >> > <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi Lukasz, >> > >> > Thanks for your affirmation and provide more >> > contextual information. :) >> > >> > Would you please give me the contributor >> > permission? My JIRA ID is sunjincheng121. >> > >> > I would like to create/assign tickets for this >> work. >> > >> > Thanks, >> > Jincheng >> > >> > Lukasz Cwik <[email protected] >> > <mailto:[email protected]>> 于2019年4月20日周六 >> > 上午12:26写道: >> > >> > Since I don't think this is a contentious >> > change. >> > >> > On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik >> > <[email protected] <mailto:[email protected] >> >> >> > wrote: >> > >> > Yes, using T makes sense. >> > >> > The WindowedValue was meant to be a >> > context object in the SDK harness that >> > propagates various information about the >> > current element. We have discussed in >> > the past about: >> > * making optimizations which would pass >> > around less of the context information >> > if we know that the DoFns don't need it >> > (for example, all the values share the >> > same window). >> > * versioning the encoding separately >> > from the WindowedValue context object >> > (see recent discussion about element >> > timestamp precision [1]) >> > * the runner may want its own >> > representation of a context object that >> > makes sense for it which isn't a >> > WindowedValue necessarily. >> > >> > Feel free to cut a JIRA about this and >> > start working on a change towards this. >> > >> > 1: >> > >> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E >> > >> > On Fri, Apr 19, 2019 at 3:18 AM jincheng >> > sun <[email protected] >> > <mailto:[email protected]>> >> wrote: >> > >> > Hi Beam devs, >> > >> > I read some of the docs about >> > `Communicating over the Fn API` in >> > Beam. I feel that Beam has a very >> > good design for Control Plane/Data >> > Plane/State Plane/Logging services, >> > and it is described in <How to send >> > and receive data> document. When >> > communicating between Runner and SDK >> > Harness, the DataPlane API will be >> > WindowedValue(An immutable triple of >> > value, timestamp, and windows.) As a >> > contract object between Runner and >> > SDK Harness. I see the interface >> > definitions for sending and >> > receiving data in the code as >> follows: >> > >> > - >> > >> org.apache.beam.runners.fnexecution.data.FnDataService >> > >> > public interface FnDataService { >> > <T> InboundDataClient >> > receive(LogicalEndpoint >> > inputLocation, >> > Coder<WindowedValue<T>> coder, >> > FnDataReceiver<WindowedValue<T>> >> > listener); >> > <T> >> > >> CloseableFnDataReceiver<WindowedValue<T>> >> > send( >> > LogicalEndpoint >> > outputLocation, >> > Coder<WindowedValue<T>> coder); >> > } >> > >> > >> > >> > - >> > >> org.apache.beam.fn.harness.data.BeamFnDataClient >> > >> > public interface >> BeamFnDataClient { >> > <T> InboundDataClient >> > receive(ApiServiceDescriptor >> > apiServiceDescriptor, >> > LogicalEndpoint inputLocation, >> > Coder<WindowedValue<T>> coder, >> > FnDataReceiver<WindowedValue<T>> >> > receiver); >> > <T> >> > >> CloseableFnDataReceiver<WindowedValue<T>> >> > send(BeamFnDataGrpcClient >> > Endpoints.ApiServiceDescriptor >> > apiServiceDescriptor, >> > LogicalEndpoint outputLocation, >> > Coder<WindowedValue<T>> coder); >> > } >> > >> > >> > Both `Coder<WindowedValue<T>>` and >> > `FnDataReceiver<WindowedValue<T>>` >> > use `WindowedValue` as the data >> > structure that both sides of Runner >> > and SDK Harness know each other. >> > Control Plane/Data Plane/State >> > Plane/Logging is a highly >> > abstraction, such as Control Plane >> > and Logging, these are common >> > requirements for all multi-language >> > platforms. For example, the Flink >> > community is also discussing how to >> > support Python UDF, as well as how >> > to deal with docker environment. how >> > to data transfer, how to state >> > access, how to logging etc. If Beam >> > can further abstract these service >> > interfaces, i.e., interface >> > definitions are compatible with >> > multiple engines, and finally >> > provided to other projects in the >> > form of class libraries, it >> > definitely will help other platforms >> > that want to support multiple >> > languages. So could beam can further >> > abstract the interface definition of >> > FnDataService's BeamFnDataClient? >> > Here I am to throw out a minnow to >> > catch a whale, take the >> > FnDataService#receive interface as >> > an example, and turn >> > `WindowedValue<T>` into `T` so that >> > other platforms can be extended >> > arbitrarily, as follows: >> > >> > <T> InboundDataClient >> > receive(LogicalEndpoint >> > inputLocation, Coder<T> coder, >> > FnDataReceiver<T>> listener); >> > >> > What do you think? >> > >> > Feel free to correct me if there any >> > incorrect understanding. And welcome >> > any feedback! >> > >> > >> > Regards, >> > Jincheng >> > >> >
