Hi Kenn, Thanks for your reply, and explained the design of WindowValue clearly!
At present, the definitions of `FnDataService` and `BeamFnDataClient` in Data Plane are very clear and universal, such as: send(...)/receive(...). If it is only applied in the project of Beam, it is already very good. Because `WindowValue` is a very basic data structure in the Beam project, both the Runner and the SDK harness have define the WindowedValue data structure. The reason I want to change the interface parameter from `WindowedValue<T>` to T is because I want to make the `Data Plane` interface into a class library that can be used by other projects (such as Apache Flink), so that other projects Can have its own `FnDataService` implementation. However, the definition of `WindowedValue` does not apply to all projects. For example, Apache Flink also has a definition similar to WindowedValue. For example, Apache Flink Stream has StreamRecord. If we change `WindowedValue<T>` to T, then other project's implementation does not need to wrap WindowedValue, the interface will become more concise. Furthermore, we only need one T, such as the Apache Flink DataSet operator. So, I agree with your understanding, I don't expect `WindowedValueXXX<T>` in the FnDataService interface, I hope to just use a `T`. Have you seen some problem if we change the interface parameter from `WindowedValue<T>` to T? Thanks, Jincheng Kenneth Knowles <k...@apache.org> 于2019年4月20日周六 上午2:38写道: > WindowedValue has always been an interface, not a concrete representation: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52>. > It is an abstract class because we started in Java 7 where you could not > have default methods, and just due to legacy style concerns. it is not just > discussed, but implemented, that there are WindowedValue implementations > with fewer allocations. > At the coder level, it was also always intended to have multiple > encodings. We already do have separate encodings based on whether there is > 1 window or multiple windows. The coder for a particular kind of > WindowedValue should decide this. Before the Fn API none of this had to be > standardized, because the runner could just choose whatever it wants. Now > we have to standardize any encodings that runners and harnesses both need > to know. There should be many, and adding more should be just a matter of > standardization, no new design. > > None of this should be user-facing or in the runner API / pipeline graph - > that is critical to making it flexible on the backend between the runner & > SDK harness. > > If I understand it, from our offline discussion, you are interested in the > case where you issue a ProcessBundleRequest to the SDK harness and none of > the primitives in the subgraph will ever observe the metadata. So you want > to not even have a tiny > "WindowedValueWithNoMetadata". Is that accurate? > > Kenn > > On Fri, Apr 19, 2019 at 10:17 AM jincheng sun <sunjincheng...@gmail.com> > wrote: > >> Thank you! And have a nice weekend! >> >> >> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午1:14写道: >> >>> I have added you as a contributor. >>> >>> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun <sunjincheng...@gmail.com> >>> wrote: >>> >>>> Hi Lukasz, >>>> >>>> Thanks for your affirmation and provide more contextual information. :) >>>> >>>> Would you please give me the contributor permission? My JIRA ID is >>>> sunjincheng121. >>>> >>>> I would like to create/assign tickets for this work. >>>> >>>> Thanks, >>>> Jincheng >>>> >>>> Lukasz Cwik <lc...@google.com> 于2019年4月20日周六 上午12:26写道: >>>> >>>>> Since I don't think this is a contentious change. >>>>> >>>>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> Yes, using T makes sense. >>>>>> >>>>>> The WindowedValue was meant to be a context object in the SDK harness >>>>>> that propagates various information about the current element. We have >>>>>> discussed in the past about: >>>>>> * making optimizations which would pass around less of the context >>>>>> information if we know that the DoFns don't need it (for example, all the >>>>>> values share the same window). >>>>>> * versioning the encoding separately from the WindowedValue context >>>>>> object (see recent discussion about element timestamp precision [1]) >>>>>> * the runner may want its own representation of a context object that >>>>>> makes sense for it which isn't a WindowedValue necessarily. >>>>>> >>>>>> Feel free to cut a JIRA about this and start working on a change >>>>>> towards this. >>>>>> >>>>>> 1: >>>>>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E >>>>>> >>>>>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun < >>>>>> sunjincheng...@gmail.com> wrote: >>>>>> >>>>>>> Hi Beam devs, >>>>>>> >>>>>>> I read some of the docs about `Communicating over the Fn API` in >>>>>>> Beam. I feel that Beam has a very good design for Control Plane/Data >>>>>>> Plane/State Plane/Logging services, and it is described in <How to send >>>>>>> and >>>>>>> receive data> document. When communicating between Runner and SDK >>>>>>> Harness, >>>>>>> the DataPlane API will be WindowedValue(An immutable triple of value, >>>>>>> timestamp, and windows.) As a contract object between Runner and SDK >>>>>>> Harness. I see the interface definitions for sending and receiving data >>>>>>> in >>>>>>> the code as follows: >>>>>>> >>>>>>> - org.apache.beam.runners.fnexecution.data.FnDataService >>>>>>> >>>>>>> public interface FnDataService { >>>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>>> listener); >>>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> send( >>>>>>>> LogicalEndpoint outputLocation, Coder<WindowedValue<T>> >>>>>>>> coder); >>>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> - org.apache.beam.fn.harness.data.BeamFnDataClient >>>>>>> >>>>>>> public interface BeamFnDataClient { >>>>>>>> <T> InboundDataClient receive(ApiServiceDescriptor >>>>>>>> apiServiceDescriptor, LogicalEndpoint inputLocation, >>>>>>>> Coder<WindowedValue<T>> coder, FnDataReceiver<WindowedValue<T>> >>>>>>>> receiver); >>>>>>>> <T> CloseableFnDataReceiver<WindowedValue<T>> >>>>>>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor >>>>>>>> apiServiceDescriptor, LogicalEndpoint outputLocation, >>>>>>>> Coder<WindowedValue<T>> coder); >>>>>>>> } >>>>>>> >>>>>>> >>>>>>> Both `Coder<WindowedValue<T>>` and >>>>>>> `FnDataReceiver<WindowedValue<T>>` use `WindowedValue` as the data >>>>>>> structure that both sides of Runner and SDK Harness know each other. >>>>>>> Control Plane/Data Plane/State Plane/Logging is a highly abstraction, >>>>>>> such >>>>>>> as Control Plane and Logging, these are common requirements for all >>>>>>> multi-language platforms. For example, the Flink community is also >>>>>>> discussing how to support Python UDF, as well as how to deal with docker >>>>>>> environment. how to data transfer, how to state access, how to logging >>>>>>> etc. >>>>>>> If Beam can further abstract these service interfaces, i.e., interface >>>>>>> definitions are compatible with multiple engines, and finally provided >>>>>>> to >>>>>>> other projects in the form of class libraries, it definitely will help >>>>>>> other platforms that want to support multiple languages. So could beam >>>>>>> can >>>>>>> further abstract the interface definition of FnDataService's >>>>>>> BeamFnDataClient? Here I am to throw out a minnow to catch a whale, take >>>>>>> the FnDataService#receive interface as an example, and turn >>>>>>> `WindowedValue<T>` into `T` so that other platforms can be extended >>>>>>> arbitrarily, as follows: >>>>>>> >>>>>>> <T> InboundDataClient receive(LogicalEndpoint inputLocation, >>>>>>> Coder<T> coder, FnDataReceiver<T>> listener); >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Feel free to correct me if there any incorrect understanding. And >>>>>>> welcome any feedback! >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Jincheng >>>>>>> >>>>>>