It seems to me that the most valuable code to share and keep up with is the Python/Go/etc SDK harness; they would need to be enhanced with new primitive operations. So you would want to depend directly and share the original proto-generated classes too, which Beam publishes as separate artifacts for Java. Is the runner-side support code that valuable for direct integration into Flink? I would expect once you get past trivial wrappers (that you can copy/paste with no loss) you would hit differences in architecture so you would diverge anyhow.
Kenn On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <m...@apache.org> wrote: > Hi Jincheng, > > Copying code is a solution for the short term. In the long run I'd like > the Fn services to be a library not only for the Beam portability layer > but also for other projects which want to leverage it. We should thus > make an effort to make it more generic/extensible where necessary and > feasible. > > Since you are investigating reuse of Beam portability in the context of > Flink, do you think it would make sense to setup a document where we > collect ideas and challenges? > > Thanks, > Max > > On 23.04.19 13:00, jincheng sun wrote: > > Hi Reuven, > > > > I think you have provided an optional solution for other community which > > wants to take advantage of Beam's existing achievements. Thank you very > > much! > > > > I think the Flink community can choose to copy from Beam's code or > > choose to rely directly on the beam's class library. The Flink community > > also initiated a discussion, more info can be found here > > < > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096 > > > > > > The purpose of Turns `WindowedValue<T>` into `T` is to promote the > > interface design of Beam more versatile, so that other open source > > projects have the opportunity to take advantage of Beam's existing > > achievements. Of course, just changing the `WindowedValue<T>` into `T` > > is not enough to be shared by other projects in the form of a class > > library, we need to do more efforts. If Beam can provide a class library > > in the future, other community contributors will also have the > > willingness to contribute to the beam community. This will benefit both > > the community that wants to take advantage of Beam's existing > > achievements and the Beam community itself. And thanks to Thomas for > > that he has also made a lot of efforts in this regard. > > > > Thanks again for your valuable suggestion, and welcome any feedback! > > > > Best, > > Jincheng > > > > Reuven Lax <re...@google.com <mailto:re...@google.com>> 于2019年4月23日 > > 周二 上午1:00写道: > > > > One concern here: these interfaces are intended for use within the > > Beam project. Beam may decide to make specific changes to them to > > support needed functionality in Beam. If they are being reused by > > other projects, then those changes risk breaking those other > > projects in unexpected ways. I don't think we can guarantee that we > > don't do that. If this is useful in Flink, it would be safer to copy > > the code IMO rather than to directly depend on it. > > > > On Mon, Apr 22, 2019 at 12:08 AM jincheng sun > > <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote: > > > > Hi Kenn, > > > > Thanks for your reply, and explained the design of WindowValue > > clearly! > > > > At present, the definitions of `FnDataService` and > > `BeamFnDataClient` in Data Plane are very clear and universal, > > such as: send(...)/receive(...). If it is only applied in the > > project of Beam, it is already very good. Because `WindowValue` > > is a very basic data structure in the Beam project, both the > > Runner and the SDK harness have define the WindowedValue data > > structure. > > > > The reason I want to change the interface parameter from > > `WindowedValue<T>` to T is because I want to make the `Data > > Plane` interface into a class library that can be used by other > > projects (such as Apache Flink), so that other projects Can have > > its own `FnDataService` implementation. However, the definition > > of `WindowedValue` does not apply to all projects. For example, > > Apache Flink also has a definition similar to WindowedValue. For > > example, Apache Flink Stream has StreamRecord. If we change > > `WindowedValue<T>` to T, then other project's implementation > > does not need to wrap WindowedValue, the interface will become > > more concise. Furthermore, we only need one T, such as the > > Apache Flink DataSet operator. > > > > So, I agree with your understanding, I don't expect > > `WindowedValueXXX<T>` in the FnDataService interface, I hope to > > just use a `T`. > > > > Have you seen some problem if we change the interface parameter > > from `WindowedValue<T>` to T? > > > > Thanks, > > Jincheng > > > > Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> 于 > > 2019年4月20日周六 上午2:38写道: > > > > WindowedValue has always been an interface, not a concrete > > representation: > > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java > > < > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52 > >. > > It is an abstract class because we started in Java 7 where > > you could not have default methods, and just due to legacy > > style concerns. it is not just discussed, but implemented, > > that there are WindowedValue implementations with fewer > > allocations. > > At the coder level, it was also always intended to have > > multiple encodings. We already do have separate encodings > > based on whether there is 1 window or multiple windows. The > > coder for a particular kind of WindowedValue should decide > > this. Before the Fn API none of this had to be standardized, > > because the runner could just choose whatever it wants. Now > > we have to standardize any encodings that runners and > > harnesses both need to know. There should be many, and > > adding more should be just a matter of standardization, no > > new design. > > > > None of this should be user-facing or in the runner API / > > pipeline graph - that is critical to making it flexible on > > the backend between the runner & SDK harness. > > > > If I understand it, from our offline discussion, you are > > interested in the case where you issue a > > ProcessBundleRequest to the SDK harness and none of the > > primitives in the subgraph will ever observe the metadata. > > So you want to not even have a tiny > > "WindowedValueWithNoMetadata". Is that accurate? > > > > Kenn > > > > On Fri, Apr 19, 2019 at 10:17 AM jincheng sun > > <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> > > wrote: > > > > Thank you! And have a nice weekend! > > > > > > Lukasz Cwik <lc...@google.com <mailto:lc...@google.com>> > > 于2019年4月20日周六 上午1:14写道: > > > > I have added you as a contributor. > > > > On Fri, Apr 19, 2019 at 9:56 AM jincheng sun > > <sunjincheng...@gmail.com > > <mailto:sunjincheng...@gmail.com>> wrote: > > > > Hi Lukasz, > > > > Thanks for your affirmation and provide more > > contextual information. :) > > > > Would you please give me the contributor > > permission? My JIRA ID is sunjincheng121. > > > > I would like to create/assign tickets for this > work. > > > > Thanks, > > Jincheng > > > > Lukasz Cwik <lc...@google.com > > <mailto:lc...@google.com>> 于2019年4月20日周六 > > 上午12:26写道: > > > > Since I don't think this is a contentious > > change. > > > > On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik > > <lc...@google.com <mailto:lc...@google.com>> > > wrote: > > > > Yes, using T makes sense. > > > > The WindowedValue was meant to be a > > context object in the SDK harness that > > propagates various information about the > > current element. We have discussed in > > the past about: > > * making optimizations which would pass > > around less of the context information > > if we know that the DoFns don't need it > > (for example, all the values share the > > same window). > > * versioning the encoding separately > > from the WindowedValue context object > > (see recent discussion about element > > timestamp precision [1]) > > * the runner may want its own > > representation of a context object that > > makes sense for it which isn't a > > WindowedValue necessarily. > > > > Feel free to cut a JIRA about this and > > start working on a change towards this. > > > > 1: > > > https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E > > > > On Fri, Apr 19, 2019 at 3:18 AM jincheng > > sun <sunjincheng...@gmail.com > > <mailto:sunjincheng...@gmail.com>> > wrote: > > > > Hi Beam devs, > > > > I read some of the docs about > > `Communicating over the Fn API` in > > Beam. I feel that Beam has a very > > good design for Control Plane/Data > > Plane/State Plane/Logging services, > > and it is described in <How to send > > and receive data> document. When > > communicating between Runner and SDK > > Harness, the DataPlane API will be > > WindowedValue(An immutable triple of > > value, timestamp, and windows.) As a > > contract object between Runner and > > SDK Harness. I see the interface > > definitions for sending and > > receiving data in the code as > follows: > > > > - > > > org.apache.beam.runners.fnexecution.data.FnDataService > > > > public interface FnDataService { > > <T> InboundDataClient > > receive(LogicalEndpoint > > inputLocation, > > Coder<WindowedValue<T>> coder, > > FnDataReceiver<WindowedValue<T>> > > listener); > > <T> > > > CloseableFnDataReceiver<WindowedValue<T>> > > send( > > LogicalEndpoint > > outputLocation, > > Coder<WindowedValue<T>> coder); > > } > > > > > > > > - > > > org.apache.beam.fn.harness.data.BeamFnDataClient > > > > public interface > BeamFnDataClient { > > <T> InboundDataClient > > receive(ApiServiceDescriptor > > apiServiceDescriptor, > > LogicalEndpoint inputLocation, > > Coder<WindowedValue<T>> coder, > > FnDataReceiver<WindowedValue<T>> > > receiver); > > <T> > > > CloseableFnDataReceiver<WindowedValue<T>> > > send(BeamFnDataGrpcClient > > Endpoints.ApiServiceDescriptor > > apiServiceDescriptor, > > LogicalEndpoint outputLocation, > > Coder<WindowedValue<T>> coder); > > } > > > > > > Both `Coder<WindowedValue<T>>` and > > `FnDataReceiver<WindowedValue<T>>` > > use `WindowedValue` as the data > > structure that both sides of Runner > > and SDK Harness know each other. > > Control Plane/Data Plane/State > > Plane/Logging is a highly > > abstraction, such as Control Plane > > and Logging, these are common > > requirements for all multi-language > > platforms. For example, the Flink > > community is also discussing how to > > support Python UDF, as well as how > > to deal with docker environment. how > > to data transfer, how to state > > access, how to logging etc. If Beam > > can further abstract these service > > interfaces, i.e., interface > > definitions are compatible with > > multiple engines, and finally > > provided to other projects in the > > form of class libraries, it > > definitely will help other platforms > > that want to support multiple > > languages. So could beam can further > > abstract the interface definition of > > FnDataService's BeamFnDataClient? > > Here I am to throw out a minnow to > > catch a whale, take the > > FnDataService#receive interface as > > an example, and turn > > `WindowedValue<T>` into `T` so that > > other platforms can be extended > > arbitrarily, as follows: > > > > <T> InboundDataClient > > receive(LogicalEndpoint > > inputLocation, Coder<T> coder, > > FnDataReceiver<T>> listener); > > > > What do you think? > > > > Feel free to correct me if there any > > incorrect understanding. And welcome > > any feedback! > > > > > > Regards, > > Jincheng > > >