Hi all,

Thanks a lot for sharing your thoughts!

It seems that we have already reached consensus for the following items.
Could you please read through them again and double-check if you all agree
with these? If yes, then I would start creating JIRA issues for those that
don’t yet have a JIRA issue

1. Items that require improvements of Beam:

1) The configuration of "semi_persist_dir" should be configurable. (TODO)

https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48

2) Time-based cache threshold should be supported. (TODO)

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259

https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java

3) Cross-bundle cache should be supported. (
https://issues.apache.org/jira/browse/BEAM-5428)

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349

4) Allows to configure the log level. (TODO)

https://issues.apache.org/jira/browse/BEAM-5468

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102

5) Improves the interfaces of classes such as FnDataService,
BundleProcessor, ActiveBundle, etc to change the parameter type from
WindowedValue<T> to T. (TODO)

6) Python 3 is already supported in Beam. The warning should be removed.
(TODO)

https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179

7) The coder of WindowedValue should be configurable which makes it
possible to use customization coder such as ValueOnlyWindowedValueCoder.
(TODO)

https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91

8) The schema work can be used to solve the performance issue of the extra
prefixing length of encoding. However, it should also be supported in
Python. (https://github.com/apache/beam/pull/9188)

https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto

9) MapState should be supported in the gRPC protocol. (TODO)

https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662



2. Items where we don’t need to do anything for now:

1) The default buffer size is enough for most cases and there is no need to
make it configurable for now.

2) Do not support ValueState in the gRPC protocol for now unless we have
evidence it matters.


If there are any incorrect understanding,  please feel free to correct me :)

--------------------

There are also some items that I didn’t bring up earlier which require
further discussion:

1) The input queue size of the input buffer in Python SDK Harness is not
size limited. We should give a reasonable default size.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175

2) Refactor the code to avoid unnecessary dependencies pull in. For
example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
is pull in because there are a few classes in beam-sdks-java-core are used
in beam-runners-java-fn-execution, such as:

PipelineOptions used in DefaultJobBundleFactory FileSystems used in
BeamFileSystemArtifactRetrievalService.

It means maybe we can add a new module such as beam-sdks-java-common to
hold the classes used by both runner and SDK.

3) Allows to start up StatusServer according to configuration in Python SDK
Harness. Currently the StatusServer is start up by default.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113


Best,

Jincheng

jincheng sun <[email protected]> 于2019年8月2日周五 下午4:14写道:

> Thanks for share the detail of the current StandardCoders Max!
> That's true, Flink may should defined some of coders, And I will share the
> POC in the Flink Python UDFs DISCUSS Thread later :)
>
> Best,
> Jincheng
>
> Maximilian Michels <[email protected]> 于2019年7月31日周三 下午2:53写道:
>
>> Hi Jincheng,
>>
>> Thanks for getting back to us.
>>
>> > For the next major release of Flink, we plan to add Python user defined
>> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
>> portability framework and think that it is perfect for our requirements.
>> However we also find some improvements needed for Beam:
>>
>> That sounds great! The improvement list contains very reasonable
>> suggestions, some of them which are already on our TODO list. I think
>> Thomas and Robert already provided the answers you were looking for.
>>
>> > Open questions:
>> > ---------------------
>> > 1) Which coders should be / can be defined in StandardCoders?
>>
>> The ones which are present now those are:
>>
>>           BYTES_CODER
>>           INT64_CODER
>>           STRING_UTF8
>>           ITERABLE_CODER
>>           TIMER_CODER
>>           KV_CODER
>>           LENGTH_PREFIX_CODER
>>           GLOBAL_WINDOW_CODER
>>           INTERVAL_WINDOW_CODER
>>           WINDOWED_VALUE_CODER
>>           DOUBLE_CODER
>>
>> Note, that this is just across SDK borders. If you stay within one SDK,
>> you can use any coder. If a Runner wants to replace a particular coder
>> with its own coder implementation, it could do that. Flink may want to
>> use its own set of coders for the sake of coder migration. Another
>> option Robert alluded to, would be to make use of Schema were possible,
>> which has been built with migration in mind.
>>
>> Thanks,
>> Max
>>
>> >
>> > Must Have:
>> > ----------------
>> > 1) Currently only BagState is supported in gRPC protocol and I think we
>> should support more kinds of state types, such as MapState, ValueState,
>> ReducingState, CombiningState(AggregatingState in Flink), etc. That's
>> because these kinds of state will be used in both user-defined function or
>> Flink Python DataStream API.
>> >
>> > 2) There are warnings that Python 3 is not fully supported in Beam
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
>> portability framework due to Python 2 will be not supported officially.
>> >
>> > 3) The configuration "semi_persist_dir" is not set in
>> EnvironmentFactory at the runner side. Why I think it's  must to have is
>> because when the environment type is "PROCESS", the default value "/tmp"
>> may become a big problem.
>> >
>> > 4) The buffer size configure policy should be improved, such as:
>> >    At runner side, the buffer limit in
>> BeamFnDataBufferingOutboundObserver is size based. We should also support
>> time based especially for the streaming case.
>> >    At Python SDK Harness, the buffer size is not configurable in
>> GrpcDataService. The input queue size of the input buffer in Python SDK
>> Harness is not size limited.
>> >   The flush threshold of the output buffer in Python SDK Harness is 10
>> MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
>> threshold configurable and support time based threshold.
>> >
>> > Nice To Have:
>> > -------------------
>> > 1) Improves the interfaces of FnDataService, BundleProcessor,
>> ActiveBundle, etc, to change the parameter type from WindowedValue<T> to T.
>> (We have already discussed in the previous mails)
>> >
>> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
>> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
>> is pull in because there are a few classes in beam-sdks-java-core are used
>> in beam-runners-java-fn-execution, such as:
>> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
>> BeamFileSystemArtifactRetrievalService.
>> > It means maybe we can add a new module such as beam-sdks-java-common to
>> hold the classes used by both runner and SDK.
>> >
>> > 3) State cache is not shared between bundles which is performance
>> critical for streaming jobs.
>> >
>> > 4) The coder of WindowedValue cannot be configured and most of time we
>> don't need to serialize and deserialize the timestamp, window and pane
>> properties in Flink. But currently FullWindowedValueCoder is used by
>> default in WireCoders.addWireCoder, I suggest to make the coder
>> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>> >
>> > 5) Currently if a coder is not defined in StandardCoders, it will be
>> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
>> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
>> coders are defined in StandardCoders. It means that for most coder, a
>> length will be added to the serialized bytes which is not necessary in my
>> thoughts. My suggestion is maybe we can add some interfaces or tags for the
>> coder which indicate whether the coder is needed a length prefix or not.
>> >
>> > 6) Set log level according to PipelineOption in Python SDK Harness.
>> Currently the log level is set to INFO by default.
>> >
>> > 7) Allows to start up StatusServer according to PipelineOption in
>> Python SDK Harness. Currently the StatusServer is start up by default.
>> >
>> > Although I put 3) 4) 5) into the "Nice to Have" as they are performance
>> related, I still think they are very critical for Python UDF execution
>> performance.
>> >
>> > Open questions:
>> > ---------------------
>> > 1) Which coders should be / can be defined in StandardCoders?
>> >
>> > Currently we are preparing the design of how to support Python UDF in
>> Flink based on the Beam portability framework and we will bring up the
>> discussion in Flink community. We may propose more changes for Beam during
>> that time and may need more support from Beam community.
>> >
>> > To be honest, I'm not an expert of Beam and so please feel free to
>> correct me if my understanding is wrong. Welcome any feedback.
>> >
>> > Best,
>> > Jincheng
>>
>>
>>
>>
>>
>>
>> Cheers,
>> Max
>>
>> On 31.07.19 12:16, Robert Bradshaw wrote:
>> > Yep, Python support under active development,
>> > e.g. https://github.com/apache/beam/pull/9188
>> >
>> > On Wed, Jul 31, 2019 at 9:24 AM jincheng sun <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Thanks a lot for sharing the link. I take a quick look at the design
>> >     and the implementation in Java and think it could address my
>> >     concern. It seems that it's still not supported in the Python SDK
>> >     Harness. Is there any plan on that?
>> >
>> >     Robert Bradshaw <[email protected] <mailto:[email protected]>>
>> >     于2019年7月30日周二 下午12:33写道:
>> >
>> >         On Tue, Jul 30, 2019 at 11:52 AM jincheng sun
>> >         <[email protected] <mailto:[email protected]>>
>> wrote:
>> >
>> >
>> >                     Is it possible to add an interface such as
>> >                     `isSelfContained()` to the `Coder`? This interface
>> >                     indicates
>> >                     whether the serialized bytes are self contained. If
>> >                     it returns true, then there is no need to add a
>> >                     prefixing length.
>> >                     In this way, there is no need to introduce an extra
>> >                     protocol,  Please correct me if I missed something
>> :)
>> >
>> >
>> >                 The question is how it is self contained. E.g.
>> >                 DoubleCoder is self contained because it always uses
>> >                 exactly 8 bytes, but one needs to know the double coder
>> >                 to leverage this. VarInt coder is self-contained a
>> >                 different way, as is StringCoder (which does just do
>> >                 prefixing).
>> >
>> >
>> >             Yes, you are right! I think it again that we can not add
>> >             such interface for the coder, due to runner can not call it.
>> >             And just one more thought: does it make sense to add a
>> >             method such as "registerSelfContained Coder(xxx)" or so to
>> >             let users register the coders which can be processed in the
>> >             SDK Harness?  It's the responsibility of the SDK harness to
>> >             ensure that the coder is supported.
>> >
>> >
>> >         Basically, a "please don't add length prefixing to this coder,
>> >         assume everyone else can understand it (and errors will ensue if
>> >         anyone doesn't)" at the user level? Seems a bit dangerous. Also,
>> >         there is not "the SDK"--there may be multiple other SDKs in
>> >         general, and of course runner components, some of which may
>> >         understand the coder in question and some of which may not.
>> >
>> >         I would say that if this becomes a problem, we could look at the
>> >         pros and cons of various remedies, this being one alternative.
>> >
>> >
>> >
>> >
>> >                 I am hopeful that schemas give us a rich enough way to
>> >                 encode the vast majority of types that we will want to
>> >                 transmit across language barriers (possibly with some
>> >                 widening promotions). For high performance one will want
>> >                 to use formats like arrow rather than one-off coders as
>> >                 well, which also biases us towards the schema work. The
>> >                 set of StandardCoders is not closed, and nor is the
>> >                 possibility of figuring out a way to communicate outside
>> >                 this set for a particular pair of languages, but I think
>> >                 it makes sense to avoid going that direction unless we
>> >                 have to due to the increased API surface aread and
>> >                 complexity it imposes on all runners and SDKs.
>> >
>> >
>> >             Great! Could you share some links about the schema work. It
>> >             seems very interesting and promising.
>> >
>> >
>> >
>> https://beam.apache.org/contribute/design-documents/#sql--schema and
>> >         of particular relevance https://s.apache.org/beam-schemas
>> >
>> >
>> >
>>
>

Reply via email to