Hi Jincheng, It is very exciting to see this follow-up, that you have done your research on the current state and that there is the intention to join forces on the portability effort!
I have added a few pointers inline. Several of the issues you identified affect our usage of Beam as well. These present an opportunity for collaboration. Thanks, Thomas On Wed, Jul 24, 2019 at 2:53 AM jincheng sun <sunjincheng...@gmail.com> wrote: > Hi all, > > Thanks Max and all of your kind words. :) > > Sorry for the late reply as I'm busy working on the Flink 1.9 release. For > the next major release of Flink, we plan to add Python user defined > functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam > portability framework and think that it is perfect for our requirements. > However we also find some improvements needed for Beam: > > Must Have: > ---------------- > 1) Currently only BagState is supported in gRPC protocol and I think we > should support more kinds of state types, such as MapState, ValueState, > ReducingState, CombiningState(AggregatingState in Flink), etc. That's > because these kinds of state will be used in both user-defined function or > Flink Python DataStream API. > There has been discussion about the need for different state types and to efficiently support those on the runner side there may be a need to look at the over the wire representation also. https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E 2) There are warnings that Python 3 is not fully supported in Beam > (beam/sdks/python/setup.py). We should support Python 3.x for the beam > portability framework due to Python 2 will be not supported officially. > This must be obsolete per latest comments on: https://issues.apache.org/jira/browse/BEAM-1251 > > 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory > at the runner side. Why I think it's must to have is because when the > environment type is "PROCESS", the default value "/tmp" may become a big > problem. > > 4) The buffer size configure policy should be improved, such as: > At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver > is size based. We should also support time based especially for the > streaming case. > At Python SDK Harness, the buffer size is not configurable in > GrpcDataService. The input queue size of the input buffer in Python SDK > Harness is not size limited. > The flush threshold of the output buffer in Python SDK Harness is 10 MB > by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the > threshold configurable and support time based threshold. > > Nice To Have: > ------------------- > 1) Improves the interfaces of FnDataService, BundleProcessor, > ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T. > (We have already discussed in the previous mails) > > 2) Refactor the code to avoid unnecessary dependencies pull in. For > example, beam-sdks-java-core(11MB) is a package for Java SDK users and it > is pull in because there are a few classes in beam-sdks-java-core are used > in beam-runners-java-fn-execution, such as: > PipelineOptions used in DefaultJobBundleFactory FileSystems used in > BeamFileSystemArtifactRetrievalService. > It means maybe we can add a new module such as beam-sdks-java-common to > hold the classes used by both runner and SDK. > > 3) State cache is not shared between bundles which is performance critical > for streaming jobs. > This is rather important to address: https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E > > 4) The coder of WindowedValue cannot be configured and most of time we > don't need to serialize and deserialize the timestamp, window and pane > properties in Flink. But currently FullWindowedValueCoder is used by > default in WireCoders.addWireCoder, I suggest to make the coder > configurable (i.e. allowing to use ValueOnlyWindowedValueCoder) > > 5) Currently if a coder is not defined in StandardCoders, it will be > wrapped with LengthPrefixedCoder (WireCoders.addWireCoder -> > LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few > coders are defined in StandardCoders. It means that for most coder, a > length will be added to the serialized bytes which is not necessary in my > thoughts. My suggestion is maybe we can add some interfaces or tags for the > coder which indicate whether the coder is needed a length prefix or not. > > 6) Set log level according to PipelineOption in Python SDK Harness. > Currently the log level is set to INFO by default. > https://issues.apache.org/jira/browse/BEAM-5468 > > 7) Allows to start up StatusServer according to PipelineOption in Python > SDK Harness. Currently the StatusServer is start up by default. > > Although I put 3) 4) 5) into the "Nice to Have" as they are performance > related, I still think they are very critical for Python UDF execution > performance. > > Open questions: > --------------------- > 1) Which coders should be / can be defined in StandardCoders? > > Currently we are preparing the design of how to support Python UDF in > Flink based on the Beam portability framework and we will bring up the > discussion in Flink community. We may propose more changes for Beam during > that time and may need more support from Beam community. > > To be honest, I'm not an expert of Beam and so please feel free to correct > me if my understanding is wrong. Welcome any feedback. > > Best, > Jincheng > > Maximilian Michels <m...@apache.org> 于2019年4月25日周四 上午12:14写道: > >> Fully agree that this is an effort that goes beyond changing a type >> parameter but I think we have a chance here to cooperate between the two >> projects. I would be happy to help out where I can. >> >> I'm not sure at this point what exactly is feasible for reuse but I >> would imagine the Runner-related code to be useful as well for the >> interaction with the SDK Harness. There are some fundamental differences >> in the model, e.g. how windowing works, which might be challenging to >> work around. >> >> Thanks, >> Max >> >> On 24.04.19 12:03, jincheng sun wrote: >> > >> > Hi Kenn, I think you are right, the Python SDK harness can be shared to >> > Flink, and also need to add some new primitive operations. Regarding >> > runner-side, I think most of the code which in runners/java-fun- >> > Execution is can be shared(but need some improvement, such as >> > FnDataService), some of them cannot be shared, such as job submission >> > code. So, we may need to set up a document to clearly analyze which >> ones >> > can be shared, which ones can be shared but need to do some changes, >> and >> > which ones are definitely cannot be shared. >> > >> > Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn >> > service as a library, also willing to do more efforts for this. >> > From the view of the current code, abstracting Fn Service into a class >> > library that other projects can rely on requires a lot of effort from >> > the Beam community. Turn `WindowedValue<T>` into `T` is just the >> > beginning of this effort. If the Beam community is willing on >> > abstracting Fn Service into a class library that can be relied upon by >> > other projects, I can try to draft a document, of course during this >> > period may need a lot of help from you, Kenn, Lukasz, and the Beam >> > community. (I am a recruit in the Beam community :-)) >> > >> > What do you think? >> > >> > Regards, >> > Jincheng >> > >> > Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> 于2019年4月24 >> > 日周三 上午3:32写道: >> > >> > It seems to me that the most valuable code to share and keep up with >> > is the Python/Go/etc SDK harness; they would need to be enhanced >> > with new primitive operations. So you would want to depend directly >> > and share the original proto-generated classes too, which Beam >> > publishes as separate artifacts for Java. Is the runner-side support >> > code that valuable for direct integration into Flink? I would expect >> > once you get past trivial wrappers (that you can copy/paste with no >> > loss) you would hit differences in architecture so you would diverge >> > anyhow. >> > >> > Kenn >> > >> > On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <m...@apache.org >> > <mailto:m...@apache.org>> wrote: >> > >> > Hi Jincheng, >> > >> > Copying code is a solution for the short term. In the long run >> > I'd like >> > the Fn services to be a library not only for the Beam >> > portability layer >> > but also for other projects which want to leverage it. We should >> > thus >> > make an effort to make it more generic/extensible where >> > necessary and >> > feasible. >> > >> > Since you are investigating reuse of Beam portability in the >> > context of >> > Flink, do you think it would make sense to setup a document >> > where we >> > collect ideas and challenges? >> > >> > Thanks, >> > Max >> > >> > On 23.04.19 13:00, jincheng sun wrote: >> > > Hi Reuven, >> > > >> > > I think you have provided an optional solution for other >> > community which >> > > wants to take advantage of Beam's existing >> > achievements. Thank you very >> > > much! >> > > >> > > I think the Flink community can choose to copy from Beam's >> > code or >> > > choose to rely directly on the beam's class library. The >> > Flink community >> > > also initiated a discussion, more info can be found here >> > > >> > < >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096 >> > >> > > >> > > The purpose of Turns `WindowedValue<T>` into `T` is to >> > promote the >> > > interface design of Beam more versatile, so that other open >> > source >> > > projects have the opportunity to take advantage of Beam's >> > existing >> > > achievements. Of course, just changing the `WindowedValue<T>` >> > into `T` >> > > is not enough to be shared by other projects in the form of a >> > class >> > > library, we need to do more efforts. If Beam can provide a >> > class library >> > > in the future, other community contributors will also have >> the >> > > willingness to contribute to the beam community. This will >> > benefit both >> > > the community that wants to take advantage of Beam's existing >> > > achievements and the Beam community itself. And thanks to >> > Thomas for >> > > that he has also made a lot of efforts in this regard. >> > > >> > > Thanks again for your valuable suggestion, and welcome any >> > feedback! >> > > >> > > Best, >> > > Jincheng >> > > >> > > Reuven Lax <re...@google.com <mailto:re...@google.com> >> > <mailto:re...@google.com <mailto:re...@google.com>>> 于2019年4月 >> > 23日 >> > > 周二 上午1:00写道: >> > > >> > > One concern here: these interfaces are intended for use >> > within the >> > > Beam project. Beam may decide to make specific changes to >> > them to >> > > support needed functionality in Beam. If they are being >> > reused by >> > > other projects, then those changes risk breaking those >> other >> > > projects in unexpected ways. I don't think we can >> > guarantee that we >> > > don't do that. If this is useful in Flink, it would be >> > safer to copy >> > > the code IMO rather than to directly depend on it. >> > > >> > > On Mon, Apr 22, 2019 at 12:08 AM jincheng sun >> > > <sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com> >> > <mailto:sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com>>> wrote: >> > > >> > > Hi Kenn, >> > > >> > > Thanks for your reply, and explained the design of >> > WindowValue >> > > clearly! >> > > >> > > At present, the definitions of `FnDataService` and >> > > `BeamFnDataClient` in Data Plane are very clear and >> > universal, >> > > such as: send(...)/receive(...). If it is only >> > applied in the >> > > project of Beam, it is already very good. Because >> > `WindowValue` >> > > is a very basic data structure in the Beam project, >> > both the >> > > Runner and the SDK harness have define the >> > WindowedValue data >> > > structure. >> > > >> > > The reason I want to change the interface parameter >> from >> > > `WindowedValue<T>` to T is because I want to make the >> > `Data >> > > Plane` interface into a class library that can be >> > used by other >> > > projects (such as Apache Flink), so that other >> > projects Can have >> > > its own `FnDataService` implementation. However, the >> > definition >> > > of `WindowedValue` does not apply to all projects. >> > For example, >> > > Apache Flink also has a definition similar to >> > WindowedValue. For >> > > example, Apache Flink Stream has StreamRecord. If we >> > change >> > > `WindowedValue<T>` to T, then other project's >> > implementation >> > > does not need to wrap WindowedValue, the interface >> > will become >> > > more concise. Furthermore, we only need one T, such >> > as the >> > > Apache Flink DataSet operator. >> > > >> > > So, I agree with your understanding, I don't expect >> > > `WindowedValueXXX<T>` in the FnDataService interface, >> > I hope to >> > > just use a `T`. >> > > >> > > Have you seen some problem if we change the interface >> > parameter >> > > from `WindowedValue<T>` to T? >> > > >> > > Thanks, >> > > Jincheng >> > > >> > > Kenneth Knowles <k...@apache.org >> > <mailto:k...@apache.org> <mailto:k...@apache.org >> > <mailto:k...@apache.org>>> 于 >> > > 2019年4月20日周六 上午2:38写道: >> > > >> > > WindowedValue has always been an interface, not a >> > concrete >> > > representation: >> > > >> > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java >> > > >> > < >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52 >> >. >> > > It is an abstract class because we started in >> > Java 7 where >> > > you could not have default methods, and just due >> > to legacy >> > > style concerns. it is not just discussed, but >> > implemented, >> > > that there are WindowedValue implementations with >> > fewer >> > > allocations. >> > > At the coder level, it was also always intended >> > to have >> > > multiple encodings. We already do have separate >> > encodings >> > > based on whether there is 1 window or multiple >> > windows. The >> > > coder for a particular kind of WindowedValue >> > should decide >> > > this. Before the Fn API none of this had to be >> > standardized, >> > > because the runner could just choose whatever it >> > wants. Now >> > > we have to standardize any encodings that >> runners and >> > > harnesses both need to know. There should be >> > many, and >> > > adding more should be just a matter of >> > standardization, no >> > > new design. >> > > >> > > None of this should be user-facing or in the >> > runner API / >> > > pipeline graph - that is critical to making it >> > flexible on >> > > the backend between the runner & SDK harness. >> > > >> > > If I understand it, from our offline discussion, >> > you are >> > > interested in the case where you issue a >> > > ProcessBundleRequest to the SDK harness and none >> > of the >> > > primitives in the subgraph will ever observe the >> > metadata. >> > > So you want to not even have a tiny >> > > "WindowedValueWithNoMetadata". Is that accurate? >> > > >> > > Kenn >> > > >> > > On Fri, Apr 19, 2019 at 10:17 AM jincheng sun >> > > <sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com> >> > <mailto:sunjincheng...@gmail.com <mailto: >> sunjincheng...@gmail.com>>> >> > > wrote: >> > > >> > > Thank you! And have a nice weekend! >> > > >> > > >> > > Lukasz Cwik <lc...@google.com >> > <mailto:lc...@google.com> <mailto:lc...@google.com >> > <mailto:lc...@google.com>>> >> > > 于2019年4月20日周六 上午1:14写道: >> > > >> > > I have added you as a contributor. >> > > >> > > On Fri, Apr 19, 2019 at 9:56 AM jincheng >> sun >> > > <sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com> >> > > <mailto:sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com>>> wrote: >> > > >> > > Hi Lukasz, >> > > >> > > Thanks for your affirmation and >> > provide more >> > > contextual information. :) >> > > >> > > Would you please give me the >> contributor >> > > permission? My JIRA ID is >> > sunjincheng121. >> > > >> > > I would like to create/assign tickets >> > for this work. >> > > >> > > Thanks, >> > > Jincheng >> > > >> > > Lukasz Cwik <lc...@google.com >> > <mailto:lc...@google.com> >> > > <mailto:lc...@google.com >> > <mailto:lc...@google.com>>> 于2019年4月20日周六 >> > > 上午12:26写道: >> > > >> > > Since I don't think this is a >> > contentious >> > > change. >> > > >> > > On Fri, Apr 19, 2019 at 9:25 AM >> > Lukasz Cwik >> > > <lc...@google.com >> > <mailto:lc...@google.com> <mailto:lc...@google.com >> > <mailto:lc...@google.com>>> >> > > wrote: >> > > >> > > Yes, using T makes sense. >> > > >> > > The WindowedValue was meant >> > to be a >> > > context object in the SDK >> > harness that >> > > propagates various >> > information about the >> > > current element. We have >> > discussed in >> > > the past about: >> > > * making optimizations which >> > would pass >> > > around less of the context >> > information >> > > if we know that the DoFns >> > don't need it >> > > (for example, all the values >> > share the >> > > same window). >> > > * versioning the encoding >> > separately >> > > from the WindowedValue >> > context object >> > > (see recent discussion about >> > element >> > > timestamp precision [1]) >> > > * the runner may want its own >> > > representation of a context >> > object that >> > > makes sense for it which >> isn't a >> > > WindowedValue necessarily. >> > > >> > > Feel free to cut a JIRA about >> > this and >> > > start working on a change >> > towards this. >> > > >> > > 1: >> > > >> > >> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E >> > > >> > > On Fri, Apr 19, 2019 at 3:18 >> > AM jincheng >> > > sun < >> sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com> >> > > >> > <mailto:sunjincheng...@gmail.com >> > <mailto:sunjincheng...@gmail.com>>> wrote: >> > > >> > > Hi Beam devs, >> > > >> > > I read some of the docs >> about >> > > `Communicating over the >> > Fn API` in >> > > Beam. I feel that Beam >> > has a very >> > > good design for Control >> > Plane/Data >> > > Plane/State Plane/Logging >> > services, >> > > and it is described in >> > <How to send >> > > and receive data> >> > document. When >> > > communicating between >> > Runner and SDK >> > > Harness, the DataPlane >> > API will be >> > > WindowedValue(An >> > immutable triple of >> > > value, timestamp, and >> > windows.) As a >> > > contract object between >> > Runner and >> > > SDK Harness. I see the >> > interface >> > > definitions for sending >> and >> > > receiving data in the >> > code as follows: >> > > >> > > - >> > > >> > org.apache.beam.runners.fnexecution.data.FnDataService >> > > >> > > public interface >> > FnDataService { >> > > <T> >> InboundDataClient >> > > >> receive(LogicalEndpoint >> > > inputLocation, >> > > >> > Coder<WindowedValue<T>> coder, >> > > >> > FnDataReceiver<WindowedValue<T>> >> > > listener); >> > > <T> >> > > >> > CloseableFnDataReceiver<WindowedValue<T>> >> > > send( >> > > >> LogicalEndpoint >> > > outputLocation, >> > > >> > Coder<WindowedValue<T>> coder); >> > > } >> > > >> > > >> > > >> > > - >> > > >> > org.apache.beam.fn.harness.data.BeamFnDataClient >> > > >> > > public interface >> > BeamFnDataClient { >> > > <T> >> InboundDataClient >> > > >> > receive(ApiServiceDescriptor >> > > apiServiceDescriptor, >> > > LogicalEndpoint >> > inputLocation, >> > > >> > Coder<WindowedValue<T>> coder, >> > > >> > FnDataReceiver<WindowedValue<T>> >> > > receiver); >> > > <T> >> > > >> > CloseableFnDataReceiver<WindowedValue<T>> >> > > >> send(BeamFnDataGrpcClient >> > > >> > Endpoints.ApiServiceDescriptor >> > > apiServiceDescriptor, >> > > LogicalEndpoint >> > outputLocation, >> > > >> > Coder<WindowedValue<T>> coder); >> > > } >> > > >> > > >> > > Both >> > `Coder<WindowedValue<T>>` and >> > > >> > `FnDataReceiver<WindowedValue<T>>` >> > > use `WindowedValue` as >> > the data >> > > structure that both sides >> > of Runner >> > > and SDK Harness know each >> > other. >> > > Control Plane/Data >> > Plane/State >> > > Plane/Logging is a highly >> > > abstraction, such as >> > Control Plane >> > > and Logging, these are >> common >> > > requirements for all >> > multi-language >> > > platforms. For example, >> > the Flink >> > > community is also >> > discussing how to >> > > support Python UDF, as >> > well as how >> > > to deal with docker >> > environment. how >> > > to data transfer, how to >> > state >> > > access, how to logging >> > etc. If Beam >> > > can further abstract >> > these service >> > > interfaces, i.e., >> interface >> > > definitions are >> > compatible with >> > > multiple engines, and >> finally >> > > provided to other >> > projects in the >> > > form of class libraries, >> it >> > > definitely will help >> > other platforms >> > > that want to support >> multiple >> > > languages. So could beam >> > can further >> > > abstract the interface >> > definition of >> > > FnDataService's >> > BeamFnDataClient? >> > > Here I am to throw out a >> > minnow to >> > > catch a whale, take the >> > > FnDataService#receive >> > interface as >> > > an example, and turn >> > > `WindowedValue<T>` into >> > `T` so that >> > > other platforms can be >> > extended >> > > arbitrarily, as follows: >> > > >> > > <T> InboundDataClient >> > > receive(LogicalEndpoint >> > > inputLocation, Coder<T> >> > coder, >> > > FnDataReceiver<T>> >> listener); >> > > >> > > What do you think? >> > > >> > > Feel free to correct me >> > if there any >> > > incorrect understanding. >> > And welcome >> > > any feedback! >> > > >> > > >> > > Regards, >> > > Jincheng >> > > >> > >> >