Hi Jincheng,

Thanks for getting back to us.

> For the next major release of Flink, we plan to add Python user defined 
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
> portability framework and think that it is perfect for our requirements. 
> However we also find some improvements needed for Beam:

That sounds great! The improvement list contains very reasonable
suggestions, some of them which are already on our TODO list. I think
Thomas and Robert already provided the answers you were looking for.

> Open questions:
> ---------------------
> 1) Which coders should be / can be defined in StandardCoders?

The ones which are present now those are:

          BYTES_CODER
          INT64_CODER
          STRING_UTF8
          ITERABLE_CODER
          TIMER_CODER
          KV_CODER
          LENGTH_PREFIX_CODER
          GLOBAL_WINDOW_CODER
          INTERVAL_WINDOW_CODER
          WINDOWED_VALUE_CODER
          DOUBLE_CODER

Note, that this is just across SDK borders. If you stay within one SDK,
you can use any coder. If a Runner wants to replace a particular coder
with its own coder implementation, it could do that. Flink may want to
use its own set of coders for the sake of coder migration. Another
option Robert alluded to, would be to make use of Schema were possible,
which has been built with migration in mind.

Thanks,
Max

> 
> Must Have:
> ----------------
> 1) Currently only BagState is supported in gRPC protocol and I think we 
> should support more kinds of state types, such as MapState, ValueState, 
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's because 
> these kinds of state will be used in both user-defined function or Flink 
> Python DataStream API.
> 
> 2) There are warnings that Python 3 is not fully supported in Beam 
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
> portability framework due to Python 2 will be not supported officially.
> 
> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
> the runner side. Why I think it's  must to have is because when the 
> environment type is "PROCESS", the default value "/tmp" may become a big 
> problem. 
> 
> 4) The buffer size configure policy should be improved, such as: 
>    At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver is 
> size based. We should also support time based especially for the streaming 
> case.
>    At Python SDK Harness, the buffer size is not configurable in 
> GrpcDataService. The input queue size of the input buffer in Python SDK 
> Harness is not size limited.
>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the threshold 
> configurable and support time based threshold.
> 
> Nice To Have:
> -------------------
> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
> etc, to change the parameter type from WindowedValue<T> to T. (We have 
> already discussed in the previous mails)
> 
> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
> because there are a few classes in beam-sdks-java-core are used in 
> beam-runners-java-fn-execution, such as:
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
> BeamFileSystemArtifactRetrievalService.
> It means maybe we can add a new module such as beam-sdks-java-common to hold 
> the classes used by both runner and SDK.
> 
> 3) State cache is not shared between bundles which is performance critical 
> for streaming jobs.
> 
> 4) The coder of WindowedValue cannot be configured and most of time we don't 
> need to serialize and deserialize the timestamp, window and pane properties 
> in Flink. But currently FullWindowedValueCoder is used by default in 
> WireCoders.addWireCoder, I suggest to make the coder configurable (i.e. 
> allowing to use ValueOnlyWindowedValueCoder)
> 
> 5) Currently if a coder is not defined in StandardCoders, it will be wrapped 
> with LengthPrefixedCoder (WireCoders.addWireCoder -> 
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few coders 
> are defined in StandardCoders. It means that for most coder, a length will be 
> added to the serialized bytes which is not necessary in my thoughts. My 
> suggestion is maybe we can add some interfaces or tags for the coder which 
> indicate whether the coder is needed a length prefix or not.
> 
> 6) Set log level according to PipelineOption in Python SDK Harness. Currently 
> the log level is set to INFO by default.
> 
> 7) Allows to start up StatusServer according to PipelineOption in Python SDK 
> Harness. Currently the StatusServer is start up by default.
> 
> Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
> related, I still think they are very critical for Python UDF execution 
> performance.
> 
> Open questions:
> ---------------------
> 1) Which coders should be / can be defined in StandardCoders?
> 
> Currently we are preparing the design of how to support Python UDF in Flink 
> based on the Beam portability framework and we will bring up the discussion 
> in Flink community. We may propose more changes for Beam during that time and 
> may need more support from Beam community.
> 
> To be honest, I'm not an expert of Beam and so please feel free to correct me 
> if my understanding is wrong. Welcome any feedback.
> 
> Best,
> Jincheng






Cheers,
Max

On 31.07.19 12:16, Robert Bradshaw wrote:
> Yep, Python support under active development,
> e.g. https://github.com/apache/beam/pull/9188
> 
> On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <sunjincheng...@gmail.com
> <mailto:sunjincheng...@gmail.com>> wrote:
> 
>     Thanks a lot for sharing the link. I take a quick look at the design
>     and the implementation in Java and think it could address my
>     concern. It seems that it's still not supported in the Python SDK
>     Harness. Is there any plan on that?
> 
>     Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>>
>     于2019年7月30日周二 下午12:33写道:
> 
>         On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
>         <sunjincheng...@gmail.com <mailto:sunjincheng...@gmail.com>> wrote:
> 
> 
>                     Is it possible to add an interface such as
>                     `isSelfContained()` to the `Coder`? This interface
>                     indicates
>                     whether the serialized bytes are self contained. If
>                     it returns true, then there is no need to add a
>                     prefixing length.
>                     In this way, there is no need to introduce an extra
>                     protocol,  Please correct me if I missed something :)
> 
> 
>                 The question is how it is self contained. E.g.
>                 DoubleCoder is self contained because it always uses
>                 exactly 8 bytes, but one needs to know the double coder
>                 to leverage this. VarInt coder is self-contained a
>                 different way, as is StringCoder (which does just do
>                 prefixing). 
> 
> 
>             Yes, you are right! I think it again that we can not add
>             such interface for the coder, due to runner can not call it.
>             And just one more thought: does it make sense to add a
>             method such as "registerSelfContained Coder(xxx)" or so to
>             let users register the coders which can be processed in the
>             SDK Harness?  It's the responsibility of the SDK harness to
>             ensure that the coder is supported.
> 
> 
>         Basically, a "please don't add length prefixing to this coder,
>         assume everyone else can understand it (and errors will ensue if
>         anyone doesn't)" at the user level? Seems a bit dangerous. Also,
>         there is not "the SDK"--there may be multiple other SDKs in
>         general, and of course runner components, some of which may
>         understand the coder in question and some of which may not. 
> 
>         I would say that if this becomes a problem, we could look at the
>         pros and cons of various remedies, this being one alternative. 
>          
> 
>              
> 
>                 I am hopeful that schemas give us a rich enough way to
>                 encode the vast majority of types that we will want to
>                 transmit across language barriers (possibly with some
>                 widening promotions). For high performance one will want
>                 to use formats like arrow rather than one-off coders as
>                 well, which also biases us towards the schema work. The
>                 set of StandardCoders is not closed, and nor is the
>                 possibility of figuring out a way to communicate outside
>                 this set for a particular pair of languages, but I think
>                 it makes sense to avoid going that direction unless we
>                 have to due to the increased API surface aread and
>                 complexity it imposes on all runners and SDKs. 
> 
> 
>             Great! Could you share some links about the schema work. It
>             seems very interesting and promising.
> 
> 
>         https://beam.apache.org/contribute/design-documents/#sql--schema and
>         of particular relevance https://s.apache.org/beam-schemas
> 
>          
> 

Reply via email to