Hi all,

Thanks Max and all of your kind words. :)

Sorry for the late reply as I'm busy working on the Flink 1.9 release. For
the next major release of Flink, we plan to add Python user defined
functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
portability framework and think that it is perfect for our requirements.
However we also find some improvements needed for Beam:

Must Have:
----------------
1) Currently only BagState is supported in gRPC protocol and I think we
should support more kinds of state types, such as MapState, ValueState,
ReducingState, CombiningState(AggregatingState in Flink), etc. That's
because these kinds of state will be used in both user-defined function or
Flink Python DataStream API.

2) There are warnings that Python 3 is not fully supported in Beam
(beam/sdks/python/setup.py). We should support Python 3.x for the beam
portability framework due to Python 2 will be not supported officially.

3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at
the runner side. Why I think it's  must to have is because when the
environment type is "PROCESS", the default value "/tmp" may become a big
problem.

4) The buffer size configure policy should be improved, such as:
   At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver
is size based. We should also support time based especially for the
streaming case.
   At Python SDK Harness, the buffer size is not configurable in
GrpcDataService. The input queue size of the input buffer in Python SDK
Harness is not size limited.
  The flush threshold of the output buffer in Python SDK Harness is 10 MB
by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
threshold configurable and support time based threshold.

Nice To Have:
-------------------
1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle,
etc, to change the parameter type from WindowedValue<T> to T. (We have
already discussed in the previous mails)

2) Refactor the code to avoid unnecessary dependencies pull in. For
example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
is pull in because there are a few classes in beam-sdks-java-core are used
in beam-runners-java-fn-execution, such as:
PipelineOptions used in DefaultJobBundleFactory FileSystems used in
BeamFileSystemArtifactRetrievalService.
It means maybe we can add a new module such as beam-sdks-java-common to
hold the classes used by both runner and SDK.

3) State cache is not shared between bundles which is performance critical
for streaming jobs.

4) The coder of WindowedValue cannot be configured and most of time we
don't need to serialize and deserialize the timestamp, window and pane
properties in Flink. But currently FullWindowedValueCoder is used by
default in WireCoders.addWireCoder, I suggest to make the coder
configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)

5) Currently if a coder is not defined in StandardCoders, it will be
wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
coders are defined in StandardCoders. It means that for most coder, a
length will be added to the serialized bytes which is not necessary in my
thoughts. My suggestion is maybe we can add some interfaces or tags for the
coder which indicate whether the coder is needed a length prefix or not.

6) Set log level according to PipelineOption in Python SDK Harness.
Currently the log level is set to INFO by default.

7) Allows to start up StatusServer according to PipelineOption in Python
SDK Harness. Currently the StatusServer is start up by default.

Although I put 3) 4) 5) into the "Nice to Have" as they are performance
related, I still think they are very critical for Python UDF execution
performance.

Open questions:
---------------------
1) Which coders should be / can be defined in StandardCoders?

Currently we are preparing the design of how to support Python UDF in Flink
based on the Beam portability framework and we will bring up the discussion
in Flink community. We may propose more changes for Beam during that time
and may need more support from Beam community.

To be honest, I'm not an expert of Beam and so please feel free to correct
me if my understanding is wrong. Welcome any feedback.

Best,
Jincheng

Maximilian Michels <m...@apache.org> 于2019年4月25日周四 上午12:14写道:

> Fully agree that this is an effort that goes beyond changing a type
> parameter but I think we have a chance here to cooperate between the two
> projects. I would be happy to help out where I can.
>
> I'm not sure at this point what exactly is feasible for reuse but I
> would imagine the Runner-related code to be useful as well for the
> interaction with the SDK Harness. There are some fundamental differences
> in the model, e.g. how windowing works, which might be challenging to
> work around.
>
> Thanks,
> Max
>
> On 24.04.19 12:03, jincheng sun wrote:
> >
> > Hi Kenn, I think you are right, the Python SDK harness can be shared to
> > Flink, and also need to add some new primitive operations. Regarding
> > runner-side, I think most of the code which in runners/java-fun-
> > Execution is can be shared(but need some improvement, such as
> > FnDataService), some of them cannot be shared, such as job submission
> > code. So, we may need to set up a document to clearly analyze which ones
> > can be shared, which ones can be shared but need to do some changes, and
> > which ones are definitely cannot be shared.
> >
> > Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn
> > service as a library, also willing to do more efforts for this.
> >  From the view of the current code, abstracting Fn Service into a class
> > library that other projects can rely on requires a lot of effort from
> > the Beam community. Turn `WindowedValue<T>` into `T` is just the
> > beginning of this effort. If the Beam community is willing on
> > abstracting Fn Service into a class library that can be relied upon by
> > other projects, I can try to draft a document, of course during this
> > period may need a lot of help from you, Kenn, Lukasz, and the Beam
> > community. (I am a recruit in the Beam community :-))
> >
> > What do you think?
> >
> > Regards,
> > Jincheng
> >
> > Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> 于2019年4月24
> > 日周三 上午3:32写道:
> >
> >     It seems to me that the most valuable code to share and keep up with
> >     is the Python/Go/etc SDK harness; they would need to be enhanced
> >     with new primitive operations. So you would want to depend directly
> >     and share the original proto-generated classes too, which Beam
> >     publishes as separate artifacts for Java. Is the runner-side support
> >     code that valuable for direct integration into Flink? I would expect
> >     once you get past trivial wrappers (that you can copy/paste with no
> >     loss) you would hit differences in architecture so you would diverge
> >     anyhow.
> >
> >     Kenn
> >
> >     On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels <m...@apache.org
> >     <mailto:m...@apache.org>> wrote:
> >
> >         Hi Jincheng,
> >
> >         Copying code is a solution for the short term. In the long run
> >         I'd like
> >         the Fn services to be a library not only for the Beam
> >         portability layer
> >         but also for other projects which want to leverage it. We should
> >         thus
> >         make an effort to make it more generic/extensible where
> >         necessary and
> >         feasible.
> >
> >         Since you are investigating reuse of Beam portability in the
> >         context of
> >         Flink, do you think it would make sense to setup a document
> >         where we
> >         collect ideas and challenges?
> >
> >         Thanks,
> >         Max
> >
> >         On 23.04.19 13:00, jincheng sun wrote:
> >          > Hi Reuven,
> >          >
> >          > I think you have provided an optional solution for other
> >         community which
> >          > wants to take advantage of Beam's existing
> >         achievements. Thank you very
> >          > much!
> >          >
> >          > I think the Flink community can choose to copy from Beam's
> >         code or
> >          > choose to rely directly on the beam's class library. The
> >         Flink community
> >          > also initiated a discussion, more info can be found here
> >          >
> >         <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
> >
> >          >
> >          > The purpose of Turns `WindowedValue<T>` into `T` is to
> >         promote the
> >          > interface design of Beam more versatile, so that other open
> >         source
> >          > projects have the opportunity to take advantage of Beam's
> >         existing
> >          > achievements. Of course, just changing the `WindowedValue<T>`
> >         into `T`
> >          > is not enough to be shared by other projects in the form of a
> >         class
> >          > library, we need to do more efforts. If Beam can provide a
> >         class library
> >          > in the future, other community contributors will also have the
> >          > willingness to contribute to the beam community. This will
> >         benefit both
> >          > the community that wants to take advantage of Beam's existing
> >          > achievements and the Beam community itself. And thanks to
> >         Thomas for
> >          > that he has also made a lot of efforts in this regard.
> >          >
> >          > Thanks again for your valuable suggestion, and welcome any
> >         feedback!
> >          >
> >          > Best,
> >          > Jincheng
> >          >
> >          > Reuven Lax <re...@google.com <mailto:re...@google.com>
> >         <mailto:re...@google.com <mailto:re...@google.com>>> 于2019年4月
> >         23日
> >          > 周二 上午1:00写道:
> >          >
> >          >     One concern here: these interfaces are intended for use
> >         within the
> >          >     Beam project. Beam may decide to make specific changes to
> >         them to
> >          >     support needed functionality in Beam. If they are being
> >         reused by
> >          >     other projects, then those changes risk breaking those
> other
> >          >     projects in unexpected ways. I don't think we can
> >         guarantee that we
> >          >     don't do that. If this is useful in Flink, it would be
> >         safer to copy
> >          >     the code IMO rather than to directly depend on it.
> >          >
> >          >     On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
> >          >     <sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>
> >         <mailto:sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>>> wrote:
> >          >
> >          >         Hi Kenn,
> >          >
> >          >         Thanks for your reply, and explained the design of
> >         WindowValue
> >          >         clearly!
> >          >
> >          >         At present, the definitions of `FnDataService` and
> >          >         `BeamFnDataClient` in Data Plane are very clear and
> >         universal,
> >          >         such as: send(...)/receive(...). If it is only
> >         applied in the
> >          >         project of Beam, it is already very good. Because
> >         `WindowValue`
> >          >         is a very basic data structure in the Beam project,
> >         both the
> >          >         Runner and the SDK harness have define the
> >         WindowedValue data
> >          >         structure.
> >          >
> >          >         The reason I want to change the interface parameter
> from
> >          >         `WindowedValue<T>` to T is because I want to make the
> >         `Data
> >          >         Plane` interface into a class library that can be
> >         used by other
> >          >         projects (such as Apache Flink), so that other
> >         projects Can have
> >          >         its own `FnDataService` implementation. However, the
> >         definition
> >          >         of `WindowedValue` does not apply to all projects.
> >         For example,
> >          >         Apache Flink also has a definition similar to
> >         WindowedValue. For
> >          >         example, Apache Flink Stream has StreamRecord. If we
> >         change
> >          >         `WindowedValue<T>` to T, then other project's
> >         implementation
> >          >         does not need to wrap WindowedValue, the interface
> >         will become
> >          >         more concise.  Furthermore,  we only need one T, such
> >         as the
> >          >         Apache Flink DataSet operator.
> >          >
> >          >         So, I agree with your understanding, I don't expect
> >          >         `WindowedValueXXX<T>` in the FnDataService interface,
> >         I hope to
> >          >         just use a `T`.
> >          >
> >          >         Have you seen some problem if we change the interface
> >         parameter
> >          >         from `WindowedValue<T>` to T?
> >          >
> >          >         Thanks,
> >          >         Jincheng
> >          >
> >          >         Kenneth Knowles <k...@apache.org
> >         <mailto:k...@apache.org> <mailto:k...@apache.org
> >         <mailto:k...@apache.org>>> 于
> >          >         2019年4月20日周六 上午2:38写道:
> >          >
> >          >             WindowedValue has always been an interface, not a
> >         concrete
> >          >             representation:
> >          >
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
> >          >
> >           <
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java#L52
> >.
> >          >             It is an abstract class because we started in
> >         Java 7 where
> >          >             you could not have default methods, and just due
> >         to legacy
> >          >             style concerns. it is not just discussed, but
> >         implemented,
> >          >             that there are WindowedValue implementations with
> >         fewer
> >          >             allocations.
> >          >             At the coder level, it was also always intended
> >         to have
> >          >             multiple encodings. We already do have separate
> >         encodings
> >          >             based on whether there is 1 window or multiple
> >         windows. The
> >          >             coder for a particular kind of WindowedValue
> >         should decide
> >          >             this. Before the Fn API none of this had to be
> >         standardized,
> >          >             because the runner could just choose whatever it
> >         wants. Now
> >          >             we have to standardize any encodings that runners
> and
> >          >             harnesses both need to know. There should be
> >         many, and
> >          >             adding more should be just a matter of
> >         standardization, no
> >          >             new design.
> >          >
> >          >             None of this should be user-facing or in the
> >         runner API /
> >          >             pipeline graph - that is critical to making it
> >         flexible on
> >          >             the backend between the runner & SDK harness.
> >          >
> >          >             If I understand it, from our offline discussion,
> >         you are
> >          >             interested in the case where you issue a
> >          >             ProcessBundleRequest to the SDK harness and none
> >         of the
> >          >             primitives in the subgraph will ever observe the
> >         metadata.
> >          >             So you want to not even have a tiny
> >          >             "WindowedValueWithNoMetadata". Is that accurate?
> >          >
> >          >             Kenn
> >          >
> >          >             On Fri, Apr 19, 2019 at 10:17 AM jincheng sun
> >          >             <sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>
> >         <mailto:sunjincheng...@gmail.com <mailto:
> sunjincheng...@gmail.com>>>
> >          >             wrote:
> >          >
> >          >                 Thank you! And have a nice weekend!
> >          >
> >          >
> >          >                 Lukasz Cwik <lc...@google.com
> >         <mailto:lc...@google.com> <mailto:lc...@google.com
> >         <mailto:lc...@google.com>>>
> >          >                 于2019年4月20日周六 上午1:14写道:
> >          >
> >          >                     I have added you as a contributor.
> >          >
> >          >                     On Fri, Apr 19, 2019 at 9:56 AM jincheng
> sun
> >          >                     <sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>
> >          >                     <mailto:sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>>> wrote:
> >          >
> >          >                         Hi Lukasz,
> >          >
> >          >                         Thanks for your affirmation and
> >         provide more
> >          >                         contextual information. :)
> >          >
> >          >                         Would you please give me the
> contributor
> >          >                         permission?  My JIRA ID is
> >         sunjincheng121.
> >          >
> >          >                         I would like to create/assign tickets
> >         for this work.
> >          >
> >          >                         Thanks,
> >          >                         Jincheng
> >          >
> >          >                         Lukasz Cwik <lc...@google.com
> >         <mailto:lc...@google.com>
> >          >                         <mailto:lc...@google.com
> >         <mailto:lc...@google.com>>> 于2019年4月20日周六
> >          >                         上午12:26写道:
> >          >
> >          >                             Since I don't think this is a
> >         contentious
> >          >                             change.
> >          >
> >          >                             On Fri, Apr 19, 2019 at 9:25 AM
> >         Lukasz Cwik
> >          >                             <lc...@google.com
> >         <mailto:lc...@google.com> <mailto:lc...@google.com
> >         <mailto:lc...@google.com>>>
> >          >                             wrote:
> >          >
> >          >                                 Yes, using T makes sense.
> >          >
> >          >                                 The WindowedValue was meant
> >         to be a
> >          >                                 context object in the SDK
> >         harness that
> >          >                                 propagates various
> >         information about the
> >          >                                 current element. We have
> >         discussed in
> >          >                                 the past about:
> >          >                                 * making optimizations which
> >         would pass
> >          >                                 around less of the context
> >         information
> >          >                                 if we know that the DoFns
> >         don't need it
> >          >                                 (for example, all the values
> >         share the
> >          >                                 same window).
> >          >                                 * versioning the encoding
> >         separately
> >          >                                 from the WindowedValue
> >         context object
> >          >                                 (see recent discussion about
> >         element
> >          >                                 timestamp precision [1])
> >          >                                 * the runner may want its own
> >          >                                 representation of a context
> >         object that
> >          >                                 makes sense for it which
> isn't a
> >          >                                 WindowedValue necessarily.
> >          >
> >          >                                 Feel free to cut a JIRA about
> >         this and
> >          >                                 start working on a change
> >         towards this.
> >          >
> >          >                                 1:
> >          >
> >
> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
> >          >
> >          >                                 On Fri, Apr 19, 2019 at 3:18
> >         AM jincheng
> >          >                                 sun <sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>
> >          >
> >           <mailto:sunjincheng...@gmail.com
> >         <mailto:sunjincheng...@gmail.com>>> wrote:
> >          >
> >          >                                     Hi Beam devs,
> >          >
> >          >                                     I read some of the docs
> about
> >          >                                     `Communicating over the
> >         Fn API` in
> >          >                                     Beam. I feel that Beam
> >         has a very
> >          >                                     good design for Control
> >         Plane/Data
> >          >                                     Plane/State Plane/Logging
> >         services,
> >          >                                     and it is described in
> >         <How to send
> >          >                                     and receive data>
> >         document. When
> >          >                                     communicating between
> >         Runner and SDK
> >          >                                     Harness, the DataPlane
> >         API will be
> >          >                                     WindowedValue(An
> >         immutable triple of
> >          >                                     value, timestamp, and
> >         windows.) As a
> >          >                                     contract object between
> >         Runner and
> >          >                                     SDK Harness. I see the
> >         interface
> >          >                                     definitions for sending
> and
> >          >                                     receiving data in the
> >         code as follows:
> >          >
> >          >                                     -
> >          >
> >           org.apache.beam.runners.fnexecution.data.FnDataService
> >          >
> >          >                                         public interface
> >         FnDataService {
> >          >                                            <T>
> InboundDataClient
> >          >
>  receive(LogicalEndpoint
> >          >                                         inputLocation,
> >          >
> >           Coder<WindowedValue<T>> coder,
> >          >
> >           FnDataReceiver<WindowedValue<T>>
> >          >                                         listener);
> >          >                                            <T>
> >          >
> >           CloseableFnDataReceiver<WindowedValue<T>>
> >          >                                         send(
> >          >                                                LogicalEndpoint
> >          >                                         outputLocation,
> >          >
> >           Coder<WindowedValue<T>> coder);
> >          >                                         }
> >          >
> >          >
> >          >
> >          >                                     -
> >          >
> >           org.apache.beam.fn.harness.data.BeamFnDataClient
> >          >
> >          >                                         public interface
> >         BeamFnDataClient {
> >          >                                            <T>
> InboundDataClient
> >          >
> >           receive(ApiServiceDescriptor
> >          >                                         apiServiceDescriptor,
> >          >                                         LogicalEndpoint
> >         inputLocation,
> >          >
> >           Coder<WindowedValue<T>> coder,
> >          >
> >           FnDataReceiver<WindowedValue<T>>
> >          >                                         receiver);
> >          >                                            <T>
> >          >
> >           CloseableFnDataReceiver<WindowedValue<T>>
> >          >
>  send(BeamFnDataGrpcClient
> >          >
> >           Endpoints.ApiServiceDescriptor
> >          >                                         apiServiceDescriptor,
> >          >                                         LogicalEndpoint
> >         outputLocation,
> >          >
> >           Coder<WindowedValue<T>> coder);
> >          >                                         }
> >          >
> >          >
> >          >                                     Both
> >         `Coder<WindowedValue<T>>` and
> >          >
> >           `FnDataReceiver<WindowedValue<T>>`
> >          >                                     use `WindowedValue` as
> >         the data
> >          >                                     structure that both sides
> >         of Runner
> >          >                                     and SDK Harness know each
> >         other.
> >          >                                     Control Plane/Data
> >         Plane/State
> >          >                                     Plane/Logging is a highly
> >          >                                     abstraction, such as
> >         Control Plane
> >          >                                     and Logging, these are
> common
> >          >                                     requirements for all
> >         multi-language
> >          >                                     platforms. For example,
> >         the Flink
> >          >                                     community is also
> >         discussing how to
> >          >                                     support Python UDF, as
> >         well as how
> >          >                                     to deal with docker
> >         environment. how
> >          >                                     to data transfer, how to
> >         state
> >          >                                     access, how to logging
> >         etc. If Beam
> >          >                                     can further abstract
> >         these service
> >          >                                     interfaces, i.e.,
> interface
> >          >                                     definitions are
> >         compatible with
> >          >                                     multiple engines, and
> finally
> >          >                                     provided to other
> >         projects in the
> >          >                                     form of class libraries,
> it
> >          >                                     definitely will help
> >         other platforms
> >          >                                     that want to support
> multiple
> >          >                                     languages. So could beam
> >         can further
> >          >                                     abstract the interface
> >         definition of
> >          >                                     FnDataService's
> >         BeamFnDataClient?
> >          >                                     Here I am to throw out a
> >         minnow to
> >          >                                     catch a whale, take the
> >          >                                     FnDataService#receive
> >         interface as
> >          >                                     an example, and turn
> >          >                                     `WindowedValue<T>` into
> >         `T` so that
> >          >                                     other platforms can be
> >         extended
> >          >                                     arbitrarily, as follows:
> >          >
> >          >                                     <T> InboundDataClient
> >          >                                     receive(LogicalEndpoint
> >          >                                     inputLocation, Coder<T>
> >         coder,
> >          >                                     FnDataReceiver<T>>
> listener);
> >          >
> >          >                                     What do you think?
> >          >
> >          >                                     Feel free to correct me
> >         if there any
> >          >                                     incorrect understanding.
> >         And welcome
> >          >                                     any feedback!
> >          >
> >          >
> >          >                                     Regards,
> >          >                                     Jincheng
> >          >
> >
>

Reply via email to