Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-12 Thread jincheng sun
Hi all,

Thanks for your confirmation Robert! :)

Thanks for share the details about the state discussion Luke! :)

The MapState is a bit complex, I think it's better to add some detail
design doc when we deal with the map state supported.

I will create JIRAs and follow up on subsequent developments. If there are
big changes, I will provide detailed design documentation and bring up the
discussion in ML.

Thanks everyone for joining this discussion.

Best,
Jincheng

Lukasz Cwik  于2019年8月7日周三 下午8:19写道:

> I wanted to add some more details about the state discussion.
>
> BEAM-7000 is about adding support for a gRPC message saying that the SDK
> is now blocked on one of its requests. This would allow for an easy
> optimization on the runner side where it gathers requests and is able to
> batch them knowing that the SDK is only blocked once it sees one of the
> blocked gRPC messages. This would make it easy for the runner to gather up
> clear + append calls and convert them to sets internally.
>
> Also, most of the reason around map state not existing has been since we
> haven't discuessed the changes to the gRPC APIs that we need. (things like,
> can you lookup/clear/append to ranges?, map or multimap?, should we really
> just get rid of bag state in favor of a multimap state?, can you enumerate
> keys?, know how many keys there are?, ...)
>
> On Wed, Aug 7, 2019 at 9:52 AM Robert Bradshaw 
> wrote:
>
>> The list looks good to me. Thanks for summarizing. Feel free to dive
>> into any of these issues yourself :).
>>
>> On Fri, Aug 2, 2019 at 6:24 PM jincheng sun 
>> wrote:
>> >
>> > Hi all,
>> >
>> >
>> > Thanks a lot for sharing your thoughts!
>> >
>> >
>> > It seems that we have already reached consensus for the following
>> items. Could you please read through them again and double-check if you all
>> agree with these? If yes, then I would start creating JIRA issues for those
>> that don’t yet have a JIRA issue
>> >
>> >
>> > 1. Items that require improvements of Beam:
>> >
>> >
>> > 1) The configuration of "semi_persist_dir" should be configurable.
>> (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48
>> >
>> >
>> > 2) Time-based cache threshold should be supported. (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
>> >
>> >
>> > 3) Cross-bundle cache should be supported. (
>> https://issues.apache.org/jira/browse/BEAM-5428)
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>> >
>> >
>> > 4) Allows to configure the log level. (TODO)
>> >
>> > https://issues.apache.org/jira/browse/BEAM-5468
>> >
>> >
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102
>> >
>> >
>> > 5) Improves the interfaces of classes such as FnDataService,
>> BundleProcessor, ActiveBundle, etc to change the parameter type from
>> WindowedValue to T. (TODO)
>> >
>> >
>> > 6) Python 3 is already supported in Beam. The warning should be
>> removed. (TODO)
>> >
>> > https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179
>> >
>> >
>> > 7) The coder of WindowedValue should be configurable which makes it
>> possible to use customization coder such as ValueOnlyWindowedValueCoder.
>> (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91
>> >
>> >
>> > 8) The schema work can be used to solve the performance issue of the
>> extra prefixing length of encoding. However, it should also be supported in
>> Python. (https://github.com/apache/beam/pull/9188)
>> >
>> >
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto
>> >
>> >
>> > 9) MapState should be supported in the gRPC protocol. (TODO)
>> >
>> >
>> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662
>> >
>> >
>> >
>> >
>> > 2. Items where we don’t need to do anything for now:
>> >
>> >
>> > 1) The default buffer size is enough for most cases and there is no
>> need to make it configurable for now.
>> >
>> > 2) Do not support ValueState in the gRPC protocol for now unless we
>> have evidence it matters.
>> >
>> >
>> >
>> > If there are any incorrect understanding,  please feel free to correct
>> me :)
>> >
>> >
>> > 
>> >
>> >
>> > There are also some items that I didn’t bring up earlier which require
>> further discussion:
>> >
>> > 1) The input queue size of the input buffer in Python SDK Harness is
>> not size limited. We should give a reasonable default size.
>> >
>> >
>> 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-07 Thread Lukasz Cwik
I wanted to add some more details about the state discussion.

BEAM-7000 is about adding support for a gRPC message saying that the SDK is
now blocked on one of its requests. This would allow for an easy
optimization on the runner side where it gathers requests and is able to
batch them knowing that the SDK is only blocked once it sees one of the
blocked gRPC messages. This would make it easy for the runner to gather up
clear + append calls and convert them to sets internally.

Also, most of the reason around map state not existing has been since we
haven't discuessed the changes to the gRPC APIs that we need. (things like,
can you lookup/clear/append to ranges?, map or multimap?, should we really
just get rid of bag state in favor of a multimap state?, can you enumerate
keys?, know how many keys there are?, ...)

On Wed, Aug 7, 2019 at 9:52 AM Robert Bradshaw  wrote:

> The list looks good to me. Thanks for summarizing. Feel free to dive
> into any of these issues yourself :).
>
> On Fri, Aug 2, 2019 at 6:24 PM jincheng sun 
> wrote:
> >
> > Hi all,
> >
> >
> > Thanks a lot for sharing your thoughts!
> >
> >
> > It seems that we have already reached consensus for the following items.
> Could you please read through them again and double-check if you all agree
> with these? If yes, then I would start creating JIRA issues for those that
> don’t yet have a JIRA issue
> >
> >
> > 1. Items that require improvements of Beam:
> >
> >
> > 1) The configuration of "semi_persist_dir" should be configurable. (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48
> >
> >
> > 2) Time-based cache threshold should be supported. (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259
> >
> >
> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
> >
> >
> > 3) Cross-bundle cache should be supported. (
> https://issues.apache.org/jira/browse/BEAM-5428)
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
> >
> >
> > 4) Allows to configure the log level. (TODO)
> >
> > https://issues.apache.org/jira/browse/BEAM-5468
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102
> >
> >
> > 5) Improves the interfaces of classes such as FnDataService,
> BundleProcessor, ActiveBundle, etc to change the parameter type from
> WindowedValue to T. (TODO)
> >
> >
> > 6) Python 3 is already supported in Beam. The warning should be removed.
> (TODO)
> >
> > https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179
> >
> >
> > 7) The coder of WindowedValue should be configurable which makes it
> possible to use customization coder such as ValueOnlyWindowedValueCoder.
> (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91
> >
> >
> > 8) The schema work can be used to solve the performance issue of the
> extra prefixing length of encoding. However, it should also be supported in
> Python. (https://github.com/apache/beam/pull/9188)
> >
> >
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto
> >
> >
> > 9) MapState should be supported in the gRPC protocol. (TODO)
> >
> >
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662
> >
> >
> >
> >
> > 2. Items where we don’t need to do anything for now:
> >
> >
> > 1) The default buffer size is enough for most cases and there is no need
> to make it configurable for now.
> >
> > 2) Do not support ValueState in the gRPC protocol for now unless we have
> evidence it matters.
> >
> >
> >
> > If there are any incorrect understanding,  please feel free to correct
> me :)
> >
> >
> > 
> >
> >
> > There are also some items that I didn’t bring up earlier which require
> further discussion:
> >
> > 1) The input queue size of the input buffer in Python SDK Harness is not
> size limited. We should give a reasonable default size.
> >
> >
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175
> >
> >
> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> >
> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> >
> > It means maybe we can add a new module such as beam-sdks-java-common to
> hold the classes used by both runner and SDK.
> >
> >
> > 3) Allows to start up StatusServer according to configuration in Python
> SDK Harness. Currently the StatusServer is 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-07 Thread Robert Bradshaw
The list looks good to me. Thanks for summarizing. Feel free to dive
into any of these issues yourself :).

On Fri, Aug 2, 2019 at 6:24 PM jincheng sun  wrote:
>
> Hi all,
>
>
> Thanks a lot for sharing your thoughts!
>
>
> It seems that we have already reached consensus for the following items. 
> Could you please read through them again and double-check if you all agree 
> with these? If yes, then I would start creating JIRA issues for those that 
> don’t yet have a JIRA issue
>
>
> 1. Items that require improvements of Beam:
>
>
> 1) The configuration of "semi_persist_dir" should be configurable. (TODO)
>
> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48
>
>
> 2) Time-based cache threshold should be supported. (TODO)
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259
>
> https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java
>
>
> 3) Cross-bundle cache should be supported. 
> (https://issues.apache.org/jira/browse/BEAM-5428)
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349
>
>
> 4) Allows to configure the log level. (TODO)
>
> https://issues.apache.org/jira/browse/BEAM-5468
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102
>
>
> 5) Improves the interfaces of classes such as FnDataService, BundleProcessor, 
> ActiveBundle, etc to change the parameter type from WindowedValue to T. 
> (TODO)
>
>
> 6) Python 3 is already supported in Beam. The warning should be removed. 
> (TODO)
>
> https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179
>
>
> 7) The coder of WindowedValue should be configurable which makes it possible 
> to use customization coder such as ValueOnlyWindowedValueCoder. (TODO)
>
> https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91
>
>
> 8) The schema work can be used to solve the performance issue of the extra 
> prefixing length of encoding. However, it should also be supported in Python. 
> (https://github.com/apache/beam/pull/9188)
>
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto
>
>
> 9) MapState should be supported in the gRPC protocol. (TODO)
>
> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662
>
>
>
>
> 2. Items where we don’t need to do anything for now:
>
>
> 1) The default buffer size is enough for most cases and there is no need to 
> make it configurable for now.
>
> 2) Do not support ValueState in the gRPC protocol for now unless we have 
> evidence it matters.
>
>
>
> If there are any incorrect understanding,  please feel free to correct me :)
>
>
> 
>
>
> There are also some items that I didn’t bring up earlier which require 
> further discussion:
>
> 1) The input queue size of the input buffer in Python SDK Harness is not size 
> limited. We should give a reasonable default size.
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175
>
>
> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
> because there are a few classes in beam-sdks-java-core are used in 
> beam-runners-java-fn-execution, such as:
>
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
> BeamFileSystemArtifactRetrievalService.
>
> It means maybe we can add a new module such as beam-sdks-java-common to hold 
> the classes used by both runner and SDK.
>
>
> 3) Allows to start up StatusServer according to configuration in Python SDK 
> Harness. Currently the StatusServer is start up by default.
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113
>
>
>
> Best,
>
> Jincheng
>
>
> jincheng sun  于2019年8月2日周五 下午4:14写道:
>>
>> Thanks for share the detail of the current StandardCoders Max!
>> That's true, Flink may should defined some of coders, And I will share the 
>> POC in the Flink Python UDFs DISCUSS Thread later :)
>>
>> Best,
>> Jincheng
>>
>> Maximilian Michels  于2019年7月31日周三 下午2:53写道:
>>>
>>> Hi Jincheng,
>>>
>>> Thanks for getting back to us.
>>>
>>> > For the next major release of Flink, we plan to add Python user defined 
>>> > functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>>> > portability framework and think that it is perfect for our requirements. 
>>> > However we also find some improvements needed for Beam:
>>>
>>> That sounds great! The improvement list contains very reasonable
>>> suggestions, some of them which are already on our TODO list. I think
>>> Thomas and Robert already provided the answers you were looking for.
>>>
>>> > Open 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-02 Thread jincheng sun
Hi all,

Thanks a lot for sharing your thoughts!

It seems that we have already reached consensus for the following items.
Could you please read through them again and double-check if you all agree
with these? If yes, then I would start creating JIRA issues for those that
don’t yet have a JIRA issue

1. Items that require improvements of Beam:

1) The configuration of "semi_persist_dir" should be configurable. (TODO)

https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L48

2) Time-based cache threshold should be supported. (TODO)

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L259

https://github.com/apache/beam/blob/master/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java

3) Cross-bundle cache should be supported. (
https://issues.apache.org/jira/browse/BEAM-5428)

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L349

4) Allows to configure the log level. (TODO)

https://issues.apache.org/jira/browse/BEAM-5468

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L102

5) Improves the interfaces of classes such as FnDataService,
BundleProcessor, ActiveBundle, etc to change the parameter type from
WindowedValue to T. (TODO)

6) Python 3 is already supported in Beam. The warning should be removed.
(TODO)

https://github.com/apache/beam/blob/master/sdks/python/setup.py#L179

7) The coder of WindowedValue should be configurable which makes it
possible to use customization coder such as ValueOnlyWindowedValueCoder.
(TODO)

https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L91

8) The schema work can be used to solve the performance issue of the extra
prefixing length of encoding. However, it should also be supported in
Python. (https://github.com/apache/beam/pull/9188)

https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto

9) MapState should be supported in the gRPC protocol. (TODO)

https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/beam_fn_api.proto#L662



2. Items where we don’t need to do anything for now:

1) The default buffer size is enough for most cases and there is no need to
make it configurable for now.

2) Do not support ValueState in the gRPC protocol for now unless we have
evidence it matters.


If there are any incorrect understanding,  please feel free to correct me :)



There are also some items that I didn’t bring up earlier which require
further discussion:

1) The input queue size of the input buffer in Python SDK Harness is not
size limited. We should give a reasonable default size.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/data_plane.py#L175

2) Refactor the code to avoid unnecessary dependencies pull in. For
example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
is pull in because there are a few classes in beam-sdks-java-core are used
in beam-runners-java-fn-execution, such as:

PipelineOptions used in DefaultJobBundleFactory FileSystems used in
BeamFileSystemArtifactRetrievalService.

It means maybe we can add a new module such as beam-sdks-java-common to
hold the classes used by both runner and SDK.

3) Allows to start up StatusServer according to configuration in Python SDK
Harness. Currently the StatusServer is start up by default.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L113


Best,

Jincheng

jincheng sun  于2019年8月2日周五 下午4:14写道:

> Thanks for share the detail of the current StandardCoders Max!
> That's true, Flink may should defined some of coders, And I will share the
> POC in the Flink Python UDFs DISCUSS Thread later :)
>
> Best,
> Jincheng
>
> Maximilian Michels  于2019年7月31日周三 下午2:53写道:
>
>> Hi Jincheng,
>>
>> Thanks for getting back to us.
>>
>> > For the next major release of Flink, we plan to add Python user defined
>> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
>> portability framework and think that it is perfect for our requirements.
>> However we also find some improvements needed for Beam:
>>
>> That sounds great! The improvement list contains very reasonable
>> suggestions, some of them which are already on our TODO list. I think
>> Thomas and Robert already provided the answers you were looking for.
>>
>> > Open questions:
>> > -
>> > 1) Which coders should be / can be defined in StandardCoders?
>>
>> The ones which are present now those are:
>>
>>   BYTES_CODER
>>   INT64_CODER
>>   STRING_UTF8
>>   ITERABLE_CODER
>>   TIMER_CODER
>>   KV_CODER
>>   LENGTH_PREFIX_CODER
>>   GLOBAL_WINDOW_CODER
>>   INTERVAL_WINDOW_CODER
>>   

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-08-02 Thread jincheng sun
Thanks for share the detail of the current StandardCoders Max!
That's true, Flink may should defined some of coders, And I will share the
POC in the Flink Python UDFs DISCUSS Thread later :)

Best,
Jincheng

Maximilian Michels  于2019年7月31日周三 下午2:53写道:

> Hi Jincheng,
>
> Thanks for getting back to us.
>
> > For the next major release of Flink, we plan to add Python user defined
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
> portability framework and think that it is perfect for our requirements.
> However we also find some improvements needed for Beam:
>
> That sounds great! The improvement list contains very reasonable
> suggestions, some of them which are already on our TODO list. I think
> Thomas and Robert already provided the answers you were looking for.
>
> > Open questions:
> > -
> > 1) Which coders should be / can be defined in StandardCoders?
>
> The ones which are present now those are:
>
>   BYTES_CODER
>   INT64_CODER
>   STRING_UTF8
>   ITERABLE_CODER
>   TIMER_CODER
>   KV_CODER
>   LENGTH_PREFIX_CODER
>   GLOBAL_WINDOW_CODER
>   INTERVAL_WINDOW_CODER
>   WINDOWED_VALUE_CODER
>   DOUBLE_CODER
>
> Note, that this is just across SDK borders. If you stay within one SDK,
> you can use any coder. If a Runner wants to replace a particular coder
> with its own coder implementation, it could do that. Flink may want to
> use its own set of coders for the sake of coder migration. Another
> option Robert alluded to, would be to make use of Schema were possible,
> which has been built with migration in mind.
>
> Thanks,
> Max
>
> >
> > Must Have:
> > 
> > 1) Currently only BagState is supported in gRPC protocol and I think we
> should support more kinds of state types, such as MapState, ValueState,
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's
> because these kinds of state will be used in both user-defined function or
> Flink Python DataStream API.
> >
> > 2) There are warnings that Python 3 is not fully supported in Beam
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
> portability framework due to Python 2 will be not supported officially.
> >
> > 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory
> at the runner side. Why I think it's  must to have is because when the
> environment type is "PROCESS", the default value "/tmp" may become a big
> problem.
> >
> > 4) The buffer size configure policy should be improved, such as:
> >At runner side, the buffer limit in
> BeamFnDataBufferingOutboundObserver is size based. We should also support
> time based especially for the streaming case.
> >At Python SDK Harness, the buffer size is not configurable in
> GrpcDataService. The input queue size of the input buffer in Python SDK
> Harness is not size limited.
> >   The flush threshold of the output buffer in Python SDK Harness is 10
> MB by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
> threshold configurable and support time based threshold.
> >
> > Nice To Have:
> > ---
> > 1) Improves the interfaces of FnDataService, BundleProcessor,
> ActiveBundle, etc, to change the parameter type from WindowedValue to T.
> (We have already discussed in the previous mails)
> >
> > 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> > PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> > It means maybe we can add a new module such as beam-sdks-java-common to
> hold the classes used by both runner and SDK.
> >
> > 3) State cache is not shared between bundles which is performance
> critical for streaming jobs.
> >
> > 4) The coder of WindowedValue cannot be configured and most of time we
> don't need to serialize and deserialize the timestamp, window and pane
> properties in Flink. But currently FullWindowedValueCoder is used by
> default in WireCoders.addWireCoder, I suggest to make the coder
> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
> >
> > 5) Currently if a coder is not defined in StandardCoders, it will be
> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
> coders are defined in StandardCoders. It means that for most coder, a
> length will be added to the serialized bytes which is not necessary in my
> thoughts. My suggestion is maybe we can add some interfaces or tags for the
> coder which indicate whether the coder is needed a length prefix or not.
> >
> > 6) Set log level according to PipelineOption in Python SDK Harness.
> Currently the log level is set to INFO by 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-31 Thread Maximilian Michels
Hi Jincheng,

Thanks for getting back to us.

> For the next major release of Flink, we plan to add Python user defined 
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
> portability framework and think that it is perfect for our requirements. 
> However we also find some improvements needed for Beam:

That sounds great! The improvement list contains very reasonable
suggestions, some of them which are already on our TODO list. I think
Thomas and Robert already provided the answers you were looking for.

> Open questions:
> -
> 1) Which coders should be / can be defined in StandardCoders?

The ones which are present now those are:

  BYTES_CODER
  INT64_CODER
  STRING_UTF8
  ITERABLE_CODER
  TIMER_CODER
  KV_CODER
  LENGTH_PREFIX_CODER
  GLOBAL_WINDOW_CODER
  INTERVAL_WINDOW_CODER
  WINDOWED_VALUE_CODER
  DOUBLE_CODER

Note, that this is just across SDK borders. If you stay within one SDK,
you can use any coder. If a Runner wants to replace a particular coder
with its own coder implementation, it could do that. Flink may want to
use its own set of coders for the sake of coder migration. Another
option Robert alluded to, would be to make use of Schema were possible,
which has been built with migration in mind.

Thanks,
Max

> 
> Must Have:
> 
> 1) Currently only BagState is supported in gRPC protocol and I think we 
> should support more kinds of state types, such as MapState, ValueState, 
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's because 
> these kinds of state will be used in both user-defined function or Flink 
> Python DataStream API.
> 
> 2) There are warnings that Python 3 is not fully supported in Beam 
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
> portability framework due to Python 2 will be not supported officially.
> 
> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
> the runner side. Why I think it's  must to have is because when the 
> environment type is "PROCESS", the default value "/tmp" may become a big 
> problem. 
> 
> 4) The buffer size configure policy should be improved, such as: 
>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver is 
> size based. We should also support time based especially for the streaming 
> case.
>At Python SDK Harness, the buffer size is not configurable in 
> GrpcDataService. The input queue size of the input buffer in Python SDK 
> Harness is not size limited.
>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the threshold 
> configurable and support time based threshold.
> 
> Nice To Have:
> ---
> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
> etc, to change the parameter type from WindowedValue to T. (We have 
> already discussed in the previous mails)
> 
> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
> because there are a few classes in beam-sdks-java-core are used in 
> beam-runners-java-fn-execution, such as:
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
> BeamFileSystemArtifactRetrievalService.
> It means maybe we can add a new module such as beam-sdks-java-common to hold 
> the classes used by both runner and SDK.
> 
> 3) State cache is not shared between bundles which is performance critical 
> for streaming jobs.
> 
> 4) The coder of WindowedValue cannot be configured and most of time we don't 
> need to serialize and deserialize the timestamp, window and pane properties 
> in Flink. But currently FullWindowedValueCoder is used by default in 
> WireCoders.addWireCoder, I suggest to make the coder configurable (i.e. 
> allowing to use ValueOnlyWindowedValueCoder)
> 
> 5) Currently if a coder is not defined in StandardCoders, it will be wrapped 
> with LengthPrefixedCoder (WireCoders.addWireCoder -> 
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few coders 
> are defined in StandardCoders. It means that for most coder, a length will be 
> added to the serialized bytes which is not necessary in my thoughts. My 
> suggestion is maybe we can add some interfaces or tags for the coder which 
> indicate whether the coder is needed a length prefix or not.
> 
> 6) Set log level according to PipelineOption in Python SDK Harness. Currently 
> the log level is set to INFO by default.
> 
> 7) Allows to start up StatusServer according to PipelineOption in Python SDK 
> Harness. Currently the StatusServer is start up by default.
> 
> Although I put 3) 4) 5) into the "Nice to Have" as they are performance 
> related, I still think they are very critical for Python UDF execution 
> performance.
> 
> Open 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-31 Thread Robert Bradshaw
Yep, Python support under active development, e.g.
https://github.com/apache/beam/pull/9188

On Wed, Jul 31, 2019 at 9:24 AM jincheng sun 
wrote:

> Thanks a lot for sharing the link. I take a quick look at the design and
> the implementation in Java and think it could address my concern. It seems
> that it's still not supported in the Python SDK Harness. Is there any plan
> on that?
>
> Robert Bradshaw  于2019年7月30日周二 下午12:33写道:
>
>> On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
>> wrote:
>>
>>>
> Is it possible to add an interface such as `isSelfContained()` to the
> `Coder`? This interface indicates
> whether the serialized bytes are self contained. If it returns true,
> then there is no need to add a prefixing length.
> In this way, there is no need to introduce an extra protocol,  Please
> correct me if I missed something :)
>

 The question is how it is self contained. E.g. DoubleCoder is self
 contained because it always uses exactly 8 bytes, but one needs to know the
 double coder to leverage this. VarInt coder is self-contained a different
 way, as is StringCoder (which does just do prefixing).

>>>
>>> Yes, you are right! I think it again that we can not add such interface
>>> for the coder, due to runner can not call it. And just one more thought:
>>> does it make sense to add a method such as "registerSelfContained
>>> Coder(xxx)" or so to let users register the coders which can be processed
>>> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
>>> that the coder is supported.
>>>
>>
>> Basically, a "please don't add length prefixing to this coder, assume
>> everyone else can understand it (and errors will ensue if anyone doesn't)"
>> at the user level? Seems a bit dangerous. Also, there is not "the
>> SDK"--there may be multiple other SDKs in general, and of course runner
>> components, some of which may understand the coder in question and some of
>> which may not.
>>
>> I would say that if this becomes a problem, we could look at the pros and
>> cons of various remedies, this being one alternative.
>>
>>
>>>
>>>
 I am hopeful that schemas give us a rich enough way to encode the vast
 majority of types that we will want to transmit across language barriers
 (possibly with some widening promotions). For high performance one will
 want to use formats like arrow rather than one-off coders as well, which
 also biases us towards the schema work. The set of StandardCoders is not
 closed, and nor is the possibility of figuring out a way to communicate
 outside this set for a particular pair of languages, but I think it makes
 sense to avoid going that direction unless we have to due to the increased
 API surface aread and complexity it imposes on all runners and SDKs.

>>>
>>> Great! Could you share some links about the schema work. It seems very
>>> interesting and promising.
>>>
>>
>> https://beam.apache.org/contribute/design-documents/#sql--schema and of
>> particular relevance https://s.apache.org/beam-schemas
>>
>>
>>
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-31 Thread jincheng sun
Thanks a lot for sharing the link. I take a quick look at the design and
the implementation in Java and think it could address my concern. It seems
that it's still not supported in the Python SDK Harness. Is there any plan
on that?

Robert Bradshaw  于2019年7月30日周二 下午12:33写道:

> On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
> wrote:
>
>>
 Is it possible to add an interface such as `isSelfContained()` to the
 `Coder`? This interface indicates
 whether the serialized bytes are self contained. If it returns true,
 then there is no need to add a prefixing length.
 In this way, there is no need to introduce an extra protocol,  Please
 correct me if I missed something :)

>>>
>>> The question is how it is self contained. E.g. DoubleCoder is self
>>> contained because it always uses exactly 8 bytes, but one needs to know the
>>> double coder to leverage this. VarInt coder is self-contained a different
>>> way, as is StringCoder (which does just do prefixing).
>>>
>>
>> Yes, you are right! I think it again that we can not add such interface
>> for the coder, due to runner can not call it. And just one more thought:
>> does it make sense to add a method such as "registerSelfContained
>> Coder(xxx)" or so to let users register the coders which can be processed
>> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
>> that the coder is supported.
>>
>
> Basically, a "please don't add length prefixing to this coder, assume
> everyone else can understand it (and errors will ensue if anyone doesn't)"
> at the user level? Seems a bit dangerous. Also, there is not "the
> SDK"--there may be multiple other SDKs in general, and of course runner
> components, some of which may understand the coder in question and some of
> which may not.
>
> I would say that if this becomes a problem, we could look at the pros and
> cons of various remedies, this being one alternative.
>
>
>>
>>
>>> I am hopeful that schemas give us a rich enough way to encode the vast
>>> majority of types that we will want to transmit across language barriers
>>> (possibly with some widening promotions). For high performance one will
>>> want to use formats like arrow rather than one-off coders as well, which
>>> also biases us towards the schema work. The set of StandardCoders is not
>>> closed, and nor is the possibility of figuring out a way to communicate
>>> outside this set for a particular pair of languages, but I think it makes
>>> sense to avoid going that direction unless we have to due to the increased
>>> API surface aread and complexity it imposes on all runners and SDKs.
>>>
>>
>> Great! Could you share some links about the schema work. It seems very
>> interesting and promising.
>>
>
> https://beam.apache.org/contribute/design-documents/#sql--schema and of
> particular relevance https://s.apache.org/beam-schemas
>
>
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-30 Thread Robert Bradshaw
On Tue, Jul 30, 2019 at 11:52 AM jincheng sun 
wrote:

>
>>> Is it possible to add an interface such as `isSelfContained()` to the
>>> `Coder`? This interface indicates
>>> whether the serialized bytes are self contained. If it returns true,
>>> then there is no need to add a prefixing length.
>>> In this way, there is no need to introduce an extra protocol,  Please
>>> correct me if I missed something :)
>>>
>>
>> The question is how it is self contained. E.g. DoubleCoder is self
>> contained because it always uses exactly 8 bytes, but one needs to know the
>> double coder to leverage this. VarInt coder is self-contained a different
>> way, as is StringCoder (which does just do prefixing).
>>
>
> Yes, you are right! I think it again that we can not add such interface
> for the coder, due to runner can not call it. And just one more thought:
> does it make sense to add a method such as "registerSelfContained
> Coder(xxx)" or so to let users register the coders which can be processed
> in the SDK Harness?  It's the responsibility of the SDK harness to ensure
> that the coder is supported.
>

Basically, a "please don't add length prefixing to this coder, assume
everyone else can understand it (and errors will ensue if anyone doesn't)"
at the user level? Seems a bit dangerous. Also, there is not "the
SDK"--there may be multiple other SDKs in general, and of course runner
components, some of which may understand the coder in question and some of
which may not.

I would say that if this becomes a problem, we could look at the pros and
cons of various remedies, this being one alternative.


>
>
>> I am hopeful that schemas give us a rich enough way to encode the vast
>> majority of types that we will want to transmit across language barriers
>> (possibly with some widening promotions). For high performance one will
>> want to use formats like arrow rather than one-off coders as well, which
>> also biases us towards the schema work. The set of StandardCoders is not
>> closed, and nor is the possibility of figuring out a way to communicate
>> outside this set for a particular pair of languages, but I think it makes
>> sense to avoid going that direction unless we have to due to the increased
>> API surface aread and complexity it imposes on all runners and SDKs.
>>
>
> Great! Could you share some links about the schema work. It seems very
> interesting and promising.
>

https://beam.apache.org/contribute/design-documents/#sql--schema and of
particular relevance https://s.apache.org/beam-schemas


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread Robert Bradshaw
On Mon, Jul 29, 2019 at 4:14 PM jincheng sun 
wrote:

> Hi Robert,
>
> Thanks for your detail comments, I would have added a few pointers inline.
>
> Best,
> Jincheng
>
> Robert Bradshaw  于2019年7月29日周一 下午12:35写道:
>
>> On Sun, Jul 28, 2019 at 6:51 AM jincheng sun 
>> wrote:
>> >
>> > Hi, Thomas & Robert, Thanks for your comments and providing relevant
>> discussions and JIRA links, very helpful to me!
>> >
>> > I am glad to see your affirmative response,  And I am glad to add my
>> thoughts on the comment you left:
>> > -
>> >
>> > >> There are two distinct levels at which one can talk about a certain
>> type of state being supported: the user-visible SDK's API and the runner's
>> API. For example, BagState, ValueState, ReducingState, CombiningState,  can
>> all be implemented on top of a runner-offered MapState in the SDK. On the
>> one hand, there's a desire to keep the number of "primitive" states types
>> to a minimum (to ease the use of authoring runners), but if a runner can
>> perform a specific optimization due to knowing about the particular state
>> type it might be preferable to pass it through rather than emulate it in
>> the SDK.
>> > ---
>> > Agree. Regarding MapState, it's definitely needed as it cannot be
>> implemented on top of the existing BagState.
>> > Regarding ValueState, it can be implemented on top of BagState.
>> However, we can do optimization if we know a state is ValueState.
>> > For example, if a key is updated with a new value, if the ValueState is
>> implemented on top of BagState, two RPC calls are needed
>> > to write the new value back to runner: clear + append; if we know it's
>> ValueState, just one RPC call is enough: set.
>> > We can discuss case by case whether a state type is needed.
>>
>> In the Beam APIs [1] multiple state requests are consumed as a stream
>> in a single RPC, so clear followed by append still has low overhead.
>> Is that optimization not sufficient?
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573
>>
>>
> Actually there are two kinds of overhead:
> 1) the RPC overhead(I think in this point  may be sufficient for RPC)
> 2) the state read/write overhead, i.e., If there is no optimization, the
> runner needs to clear the state firstly and then set a new value for the
> state.
>

It's certainly an option to keep open. I'd avoid prematurely optimizing
until we have evidence that it matters.


>
> > ---
>> >
>> > >> Note that in the protos, the GRPC ports have a coder attribute
>> > specifically to allow this kind of customization (and the SDKs should
>> > be respecting that). We've also talked about going beyond per-element
>> > encodings (e.g. using arrow to serialize entire batches across the
>> > whire). I think all the runner code simply uses the default and we
>> > could be more intelligent there.
>> > ---
>> >
>> > Yes, the gRPC allows to use customization coder. However, I'm afraid
>> that this is not enough as we want to use
>> > Beam's portability framework by depending on the modules used
>> (beam-runners-java-fn-execution and the Python SDK Harness) instead
>> > of copying that part of code to Flink. So it should also allow to use
>> the customization coder in beam-runners-java-fn-execution.
>> > Otherwise, we have to copy a lot of code to Flink to use the
>> customization coder.
>>
>> Agreed, beam-runners-java-fn-execution does not take advantage of the
>> full flexibility of the protocol, and would make a lot of sense to
>> enhance it to be able to.
>>
>> > ---
>> >
>> > >> I'm wary of having too many buffer size configuration options (is
>> > there a compelling reason to make it bigger or smaller?) but something
>> > timebased would be very useful.
>> > ---
>> >
>> > I think the default values of buffer size are not needed to change for
>> most cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
>> > Will 1MB makes more sense?
>>
>> IIRC, 10MB was the point at which, according to benchmarks Luke did
>> quite a while ago, there was clearly no performance benefit in making
>> it larger. Coupled with a time-based threshold, I don't see much of an
>> advantage to lowering it.
>
>
> My concern is that the SDK harness may be shared by a lot of runners and
> there will be at least one write buffer for each runner. Is it possible
> that there are too many write buffers used which take up a lot of memory
> and users want to lower it? Nevertheless, I think this problem is not
> critical considering that we all agree a time-based threshold should be
> supported. :)
>

Yeah, we can cross that bridge when (if) we come to it.


>
>
>> > ---
>> >
>> > >> The idea of StandardCoders is it's a set of coders that all runners
>> > and SDKs can be assumed to understand. If you have an element encoded
>> > with something other Coder, then there's no way to know 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread jincheng sun
Hi Robert,

Thanks for your detail comments, I would have added a few pointers inline.

Best,
Jincheng

Robert Bradshaw  于2019年7月29日周一 下午12:35写道:

> On Sun, Jul 28, 2019 at 6:51 AM jincheng sun 
> wrote:
> >
> > Hi, Thomas & Robert, Thanks for your comments and providing relevant
> discussions and JIRA links, very helpful to me!
> >
> > I am glad to see your affirmative response,  And I am glad to add my
> thoughts on the comment you left:
> > -
> >
> > >> There are two distinct levels at which one can talk about a certain
> type of state being supported: the user-visible SDK's API and the runner's
> API. For example, BagState, ValueState, ReducingState, CombiningState,  can
> all be implemented on top of a runner-offered MapState in the SDK. On the
> one hand, there's a desire to keep the number of "primitive" states types
> to a minimum (to ease the use of authoring runners), but if a runner can
> perform a specific optimization due to knowing about the particular state
> type it might be preferable to pass it through rather than emulate it in
> the SDK.
> > ---
> > Agree. Regarding MapState, it's definitely needed as it cannot be
> implemented on top of the existing BagState.
> > Regarding ValueState, it can be implemented on top of BagState. However,
> we can do optimization if we know a state is ValueState.
> > For example, if a key is updated with a new value, if the ValueState is
> implemented on top of BagState, two RPC calls are needed
> > to write the new value back to runner: clear + append; if we know it's
> ValueState, just one RPC call is enough: set.
> > We can discuss case by case whether a state type is needed.
>
> In the Beam APIs [1] multiple state requests are consumed as a stream
> in a single RPC, so clear followed by append still has low overhead.
> Is that optimization not sufficient?
>
> [1]
> https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573
>
>
Actually there are two kinds of overhead:
1) the RPC overhead(I think in this point  may be sufficient for RPC)
2) the state read/write overhead, i.e., If there is no optimization, the
runner needs to clear the state firstly and then set a new value for the
state.


> > ---
> >
> > >> Note that in the protos, the GRPC ports have a coder attribute
> > specifically to allow this kind of customization (and the SDKs should
> > be respecting that). We've also talked about going beyond per-element
> > encodings (e.g. using arrow to serialize entire batches across the
> > whire). I think all the runner code simply uses the default and we
> > could be more intelligent there.
> > ---
> >
> > Yes, the gRPC allows to use customization coder. However, I'm afraid
> that this is not enough as we want to use
> > Beam's portability framework by depending on the modules used
> (beam-runners-java-fn-execution and the Python SDK Harness) instead
> > of copying that part of code to Flink. So it should also allow to use
> the customization coder in beam-runners-java-fn-execution.
> > Otherwise, we have to copy a lot of code to Flink to use the
> customization coder.
>
> Agreed, beam-runners-java-fn-execution does not take advantage of the
> full flexibility of the protocol, and would make a lot of sense to
> enhance it to be able to.
>
> > ---
> >
> > >> I'm wary of having too many buffer size configuration options (is
> > there a compelling reason to make it bigger or smaller?) but something
> > timebased would be very useful.
> > ---
> >
> > I think the default values of buffer size are not needed to change for
> most cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
> > Will 1MB makes more sense?
>
> IIRC, 10MB was the point at which, according to benchmarks Luke did
> quite a while ago, there was clearly no performance benefit in making
> it larger. Coupled with a time-based threshold, I don't see much of an
> advantage to lowering it.


My concern is that the SDK harness may be shared by a lot of runners and
there will be at least one write buffer for each runner. Is it possible
that there are too many write buffers used which take up a lot of memory
and users want to lower it? Nevertheless, I think this problem is not
critical considering that we all agree a time-based threshold should be
supported. :)


> > ---
> >
> > >> The idea of StandardCoders is it's a set of coders that all runners
> > and SDKs can be assumed to understand. If you have an element encoded
> > with something other Coder, then there's no way to know if the other
> > side will be able to decode it (or, indeed, even properly detect
> > element boundaries in the stream of contiguous encoded elements).
> > Adding a length prefixed coder wrapping allows the other side to at
> > least pull it out and pass it around as encoded bytes. In other words,
> > whether an encoded element needs length prefixing 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-29 Thread Robert Bradshaw
On Sun, Jul 28, 2019 at 6:51 AM jincheng sun  wrote:
>
> Hi, Thomas & Robert, Thanks for your comments and providing relevant 
> discussions and JIRA links, very helpful to me!
>
> I am glad to see your affirmative response,  And I am glad to add my thoughts 
> on the comment you left:
> -
>
> >> There are two distinct levels at which one can talk about a certain type 
> >> of state being supported: the user-visible SDK's API and the runner's API. 
> >> For example, BagState, ValueState, ReducingState, CombiningState,  can all 
> >> be implemented on top of a runner-offered MapState in the SDK. On the one 
> >> hand, there's a desire to keep the number of "primitive" states types to a 
> >> minimum (to ease the use of authoring runners), but if a runner can 
> >> perform a specific optimization due to knowing about the particular state 
> >> type it might be preferable to pass it through rather than emulate it in 
> >> the SDK.
> ---
> Agree. Regarding MapState, it's definitely needed as it cannot be implemented 
> on top of the existing BagState.
> Regarding ValueState, it can be implemented on top of BagState. However, we 
> can do optimization if we know a state is ValueState.
> For example, if a key is updated with a new value, if the ValueState is 
> implemented on top of BagState, two RPC calls are needed
> to write the new value back to runner: clear + append; if we know it's 
> ValueState, just one RPC call is enough: set.
> We can discuss case by case whether a state type is needed.

In the Beam APIs [1] multiple state requests are consumed as a stream
in a single RPC, so clear followed by append still has low overhead.
Is that optimization not sufficient?

[1] 
https://github.com/apache/beam/blob/release-2.14.0/model/fn-execution/src/main/proto/beam_fn_api.proto#L573

> ---
>
> >> Note that in the protos, the GRPC ports have a coder attribute
> specifically to allow this kind of customization (and the SDKs should
> be respecting that). We've also talked about going beyond per-element
> encodings (e.g. using arrow to serialize entire batches across the
> whire). I think all the runner code simply uses the default and we
> could be more intelligent there.
> ---
>
> Yes, the gRPC allows to use customization coder. However, I'm afraid that 
> this is not enough as we want to use
> Beam's portability framework by depending on the modules used 
> (beam-runners-java-fn-execution and the Python SDK Harness) instead
> of copying that part of code to Flink. So it should also allow to use the 
> customization coder in beam-runners-java-fn-execution.
> Otherwise, we have to copy a lot of code to Flink to use the customization 
> coder.

Agreed, beam-runners-java-fn-execution does not take advantage of the
full flexibility of the protocol, and would make a lot of sense to
enhance it to be able to.

> ---
>
> >> I'm wary of having too many buffer size configuration options (is
> there a compelling reason to make it bigger or smaller?) but something
> timebased would be very useful.
> ---
>
> I think the default values of buffer size are not needed to change for most 
> cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
> Will 1MB makes more sense?

IIRC, 10MB was the point at which, according to benchmarks Luke did
quite a while ago, there was clearly no performance benefit in making
it larger. Coupled with a time-based threshold, I don't see much of an
advantage to lowering it.

> ---
>
> >> The idea of StandardCoders is it's a set of coders that all runners
> and SDKs can be assumed to understand. If you have an element encoded
> with something other Coder, then there's no way to know if the other
> side will be able to decode it (or, indeed, even properly detect
> element boundaries in the stream of contiguous encoded elements).
> Adding a length prefixed coder wrapping allows the other side to at
> least pull it out and pass it around as encoded bytes. In other words,
> whether an encoded element needs length prefixing is a function of the
> other process you're trying to communicate with (and we don't have the
> mechanisms, and I'm not sure it's worth the complexity, to do some
> kind of coder negotiation between the processes here.) Of course for a
> UDF, if the other side does not know about the element type in
> question it'd be difficult (in general) to meaningfully process the
> element.
>
> The work on schemas, and making those portable, will result in a much
> richer set of element types that can be passed through "standard"
> coders.
> ---
>
> The design makes sense to me. My concern is that if a coder is not among the 
> StandardCoders, it will be prefixed with a length even if the harness knows 
> how to decode it.

If the harness knows how to decode it, the length prefixing is just a
lost optimization opportunity, but it'll still work. Whether this 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-27 Thread jincheng sun
Hi, Thomas & Robert, Thanks for your comments and providing relevant
discussions and JIRA links, very helpful to me!

I am glad to see your affirmative response,  And I am glad to add my
thoughts on the comment you left:
-

>> There are two distinct levels at which one can talk about a certain type
of state being supported: the user-visible SDK's API and the runner's API.
For example, BagState, ValueState, ReducingState, CombiningState,  can all
be implemented on top of a runner-offered MapState in the SDK. On the one
hand, there's a desire to keep the number of "primitive" states types to a
minimum (to ease the use of authoring runners), but if a runner can perform
a specific optimization due to knowing about the particular state type it
might be preferable to pass it through rather than emulate it in the SDK.
---
Agree. Regarding MapState, it's definitely needed as it cannot be
implemented on top of the existing BagState.
Regarding ValueState, it can be implemented on top of BagState. However, we
can do optimization if we know a state is ValueState.
For example, if a key is updated with a new value, if the ValueState is
implemented on top of BagState, two RPC calls are needed
to write the new value back to runner: clear + append; if we know it's
ValueState, just one RPC call is enough: set.
We can discuss case by case whether a state type is needed.

---

>> Note that in the protos, the GRPC ports have a coder attribute
specifically to allow this kind of customization (and the SDKs should
be respecting that). We've also talked about going beyond per-element
encodings (e.g. using arrow to serialize entire batches across the
whire). I think all the runner code simply uses the default and we
could be more intelligent there.
---

Yes, the gRPC allows to use customization coder. However, I'm afraid that
this is not enough as we want to use
Beam's portability framework by depending on the modules used
(beam-runners-java-fn-execution and the Python SDK Harness) instead
of copying that part of code to Flink. So it should also allow to use the
customization coder in beam-runners-java-fn-execution.
Otherwise, we have to copy a lot of code to Flink to use the customization
coder.

---

>> I'm wary of having too many buffer size configuration options (is
there a compelling reason to make it bigger or smaller?) but something
timebased would be very useful.
---

I think the default values of buffer size are not needed to change for most
cases. I'm not sure for just one case: _DEFAULT_FLUSH_THRESHOLD=10MB.
Will 1MB makes more sense?


---

>> The idea of StandardCoders is it's a set of coders that all runners
and SDKs can be assumed to understand. If you have an element encoded
with something other Coder, then there's no way to know if the other
side will be able to decode it (or, indeed, even properly detect
element boundaries in the stream of contiguous encoded elements).
Adding a length prefixed coder wrapping allows the other side to at
least pull it out and pass it around as encoded bytes. In other words,
whether an encoded element needs length prefixing is a function of the
other process you're trying to communicate with (and we don't have the
mechanisms, and I'm not sure it's worth the complexity, to do some
kind of coder negotiation between the processes here.) Of course for a
UDF, if the other side does not know about the element type in
question it'd be difficult (in general) to meaningfully process the
element.

The work on schemas, and making those portable, will result in a much
richer set of element types that can be passed through "standard"
coders.
---

The design makes sense to me. My concern is that if a coder is not among
the StandardCoders, it will be prefixed with a length even if the harness
knows how to decode it. Besides, I'm also curious about the standard
whether a coder can be put into StandardCoders.
For example, I noticed that FLOAT is not among StandardCoders, while DOUBLE
is among it.

Best, Jincheng

Robert Bradshaw  于2019年7月25日周四 下午2:00写道:

> On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise  wrote:
> >
> > Hi Jincheng,
> >
> > It is very exciting to see this follow-up, that you have done your
> research on the current state and that there is the intention to join
> forces on the portability effort!
> >
> > I have added a few pointers inline.
> >
> > Several of the issues you identified affect our usage of Beam as well.
> These present an opportunity for collaboration.
>
> +1, a lot of this aligns with improvements we'd like to make as well.
>
> > On Wed, Jul 24, 2019 at 2:53 AM jincheng sun 
> wrote:
> >>
> >> Hi all,
> >>
> >> Thanks Max and all of your kind words. :)
> >>
> >> Sorry for the late reply as I'm busy working on the Flink 1.9 release.
> For the next major release of Flink, we plan to add Python user defined
> functions(UDF, UDTF, UDAF) support in 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-25 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 5:31 AM Thomas Weise  wrote:
>
> Hi Jincheng,
>
> It is very exciting to see this follow-up, that you have done your research 
> on the current state and that there is the intention to join forces on the 
> portability effort!
>
> I have added a few pointers inline.
>
> Several of the issues you identified affect our usage of Beam as well. These 
> present an opportunity for collaboration.

+1, a lot of this aligns with improvements we'd like to make as well.

> On Wed, Jul 24, 2019 at 2:53 AM jincheng sun  wrote:
>>
>> Hi all,
>>
>> Thanks Max and all of your kind words. :)
>>
>> Sorry for the late reply as I'm busy working on the Flink 1.9 release. For 
>> the next major release of Flink, we plan to add Python user defined 
>> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam 
>> portability framework and think that it is perfect for our requirements. 
>> However we also find some improvements needed for Beam:
>>
>> Must Have:
>> 
>> 1) Currently only BagState is supported in gRPC protocol and I think we 
>> should support more kinds of state types, such as MapState, ValueState, 
>> ReducingState, CombiningState(AggregatingState in Flink), etc. That's 
>> because these kinds of state will be used in both user-defined function or 
>> Flink Python DataStream API.
>
> There has been discussion about the need for different state types and to 
> efficiently support those on the runner side there may be a need to look at 
> the over the wire representation also.
>
> https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
> https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E

There are two distinct levels at which one can talk about a certain
type of state being supported: the user-visible SDK's API and the
runner's API. For example, BagState, ValueState, ReducingState,
CombiningState,  can all be implemented on top of a runner-offered
MapState in the SDK. On the one hand, there's a desire to keep the
number of "primitive" states types to a minimum (to ease the use of
authoring runners), but if a runner can perform a specific
optimization due to knowing about the particular state type it might
be preferable to pass it through rather than emulate it in the SDK.

>> 2) There are warnings that Python 3 is not fully supported in Beam 
>> (beam/sdks/python/setup.py). We should support Python 3.x for the beam 
>> portability framework due to Python 2 will be not supported officially.
>
> This must be obsolete per latest comments on: 
> https://issues.apache.org/jira/browse/BEAM-1251
>
>> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at 
>> the runner side. Why I think it's  must to have is because when the 
>> environment type is "PROCESS", the default value "/tmp" may become a big 
>> problem.

There are still some issues to be worked out around exactly how
environments are set up (most notably around dependencies that are
external to the docker images, but also things like this).

>> 4) The buffer size configure policy should be improved, such as:
>>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver 
>> is size based. We should also support time based especially for the 
>> streaming case.
>>At Python SDK Harness, the buffer size is not configurable in 
>> GrpcDataService. The input queue size of the input buffer in Python SDK 
>> Harness is not size limited.
>>   The flush threshold of the output buffer in Python SDK Harness is 10 MB by 
>> default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the 
>> threshold configurable and support time based threshold.

I'm wary of having too many buffer size configuration options (is
there a compelling reason to make it bigger or smaller?) but something
timebased would be very useful.

>> Nice To Have:
>> ---
>> 1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle, 
>> etc, to change the parameter type from WindowedValue to T. (We have 
>> already discussed in the previous mails)
>>
>> 2) Refactor the code to avoid unnecessary dependencies pull in. For example, 
>> beam-sdks-java-core(11MB) is a package for Java SDK users and it is pull in 
>> because there are a few classes in beam-sdks-java-core are used in 
>> beam-runners-java-fn-execution, such as:
>> PipelineOptions used in DefaultJobBundleFactory FileSystems used in 
>> BeamFileSystemArtifactRetrievalService.
>> It means maybe we can add a new module such as beam-sdks-java-common to hold 
>> the classes used by both runner and SDK.
>>
>> 3) State cache is not shared between bundles which is performance critical 
>> for streaming jobs.
>
> This is rather important to address:
>
> https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E
>
>>
>>
>> 4) The coder of 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-24 Thread Thomas Weise
Hi Jincheng,

It is very exciting to see this follow-up, that you have done your research
on the current state and that there is the intention to join forces on the
portability effort!

I have added a few pointers inline.

Several of the issues you identified affect our usage of Beam as well.
These present an opportunity for collaboration.

Thanks,
Thomas


On Wed, Jul 24, 2019 at 2:53 AM jincheng sun 
wrote:

> Hi all,
>
> Thanks Max and all of your kind words. :)
>
> Sorry for the late reply as I'm busy working on the Flink 1.9 release. For
> the next major release of Flink, we plan to add Python user defined
> functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
> portability framework and think that it is perfect for our requirements.
> However we also find some improvements needed for Beam:
>
> Must Have:
> 
> 1) Currently only BagState is supported in gRPC protocol and I think we
> should support more kinds of state types, such as MapState, ValueState,
> ReducingState, CombiningState(AggregatingState in Flink), etc. That's
> because these kinds of state will be used in both user-defined function or
> Flink Python DataStream API.
>

There has been discussion about the need for different state types and to
efficiently support those on the runner side there may be a need to look at
the over the wire representation also.

https://lists.apache.org/thread.html/ccc0d548e440b63897b6784cd7896c266498df64c9c63ce6c52ae098@%3Cdev.beam.apache.org%3E
https://lists.apache.org/thread.html/ccf8529a49003a7be622b4d3403eba2c633caeaf5ced033e25d4c2e2@%3Cdev.beam.apache.org%3E


2) There are warnings that Python 3 is not fully supported in Beam
> (beam/sdks/python/setup.py). We should support Python 3.x for the beam
> portability framework due to Python 2 will be not supported officially.
>

This must be obsolete per latest comments on:
https://issues.apache.org/jira/browse/BEAM-1251


>
> 3) The configuration "semi_persist_dir" is not set in EnvironmentFactory
> at the runner side. Why I think it's  must to have is because when the
> environment type is "PROCESS", the default value "/tmp" may become a big
> problem.
>
> 4) The buffer size configure policy should be improved, such as:
>At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver
> is size based. We should also support time based especially for the
> streaming case.
>At Python SDK Harness, the buffer size is not configurable in
> GrpcDataService. The input queue size of the input buffer in Python SDK
> Harness is not size limited.
>   The flush threshold of the output buffer in Python SDK Harness is 10 MB
> by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
> threshold configurable and support time based threshold.
>
> Nice To Have:
> ---
> 1) Improves the interfaces of FnDataService, BundleProcessor,
> ActiveBundle, etc, to change the parameter type from WindowedValue to T.
> (We have already discussed in the previous mails)
>
> 2) Refactor the code to avoid unnecessary dependencies pull in. For
> example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
> is pull in because there are a few classes in beam-sdks-java-core are used
> in beam-runners-java-fn-execution, such as:
> PipelineOptions used in DefaultJobBundleFactory FileSystems used in
> BeamFileSystemArtifactRetrievalService.
> It means maybe we can add a new module such as beam-sdks-java-common to
> hold the classes used by both runner and SDK.
>
> 3) State cache is not shared between bundles which is performance critical
> for streaming jobs.
>

This is rather important to address:

https://lists.apache.org/thread.html/caa8d9bc6ca871d13de2c5e6ba07fdc76f85d26497d95d90893aa1f6@%3Cdev.beam.apache.org%3E


>
> 4) The coder of WindowedValue cannot be configured and most of time we
> don't need to serialize and deserialize the timestamp, window and pane
> properties in Flink. But currently FullWindowedValueCoder is used by
> default in WireCoders.addWireCoder, I suggest to make the coder
> configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)
>
> 5) Currently if a coder is not defined in StandardCoders, it will be
> wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
> LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
> coders are defined in StandardCoders. It means that for most coder, a
> length will be added to the serialized bytes which is not necessary in my
> thoughts. My suggestion is maybe we can add some interfaces or tags for the
> coder which indicate whether the coder is needed a length prefix or not.
>
> 6) Set log level according to PipelineOption in Python SDK Harness.
> Currently the log level is set to INFO by default.
>

https://issues.apache.org/jira/browse/BEAM-5468


>
> 7) Allows to start up StatusServer according to PipelineOption in Python
> SDK Harness. Currently the StatusServer is start up by default.
>
> Although I put 3) 4) 5) 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-07-24 Thread jincheng sun
Hi all,

Thanks Max and all of your kind words. :)

Sorry for the late reply as I'm busy working on the Flink 1.9 release. For
the next major release of Flink, we plan to add Python user defined
functions(UDF, UDTF, UDAF) support in Flink and I have go over the Beam
portability framework and think that it is perfect for our requirements.
However we also find some improvements needed for Beam:

Must Have:

1) Currently only BagState is supported in gRPC protocol and I think we
should support more kinds of state types, such as MapState, ValueState,
ReducingState, CombiningState(AggregatingState in Flink), etc. That's
because these kinds of state will be used in both user-defined function or
Flink Python DataStream API.

2) There are warnings that Python 3 is not fully supported in Beam
(beam/sdks/python/setup.py). We should support Python 3.x for the beam
portability framework due to Python 2 will be not supported officially.

3) The configuration "semi_persist_dir" is not set in EnvironmentFactory at
the runner side. Why I think it's  must to have is because when the
environment type is "PROCESS", the default value "/tmp" may become a big
problem.

4) The buffer size configure policy should be improved, such as:
   At runner side, the buffer limit in BeamFnDataBufferingOutboundObserver
is size based. We should also support time based especially for the
streaming case.
   At Python SDK Harness, the buffer size is not configurable in
GrpcDataService. The input queue size of the input buffer in Python SDK
Harness is not size limited.
  The flush threshold of the output buffer in Python SDK Harness is 10 MB
by default (_DEFAULT_FLUSH_THRESHOLD=10MB). My suggestion is: make the
threshold configurable and support time based threshold.

Nice To Have:
---
1) Improves the interfaces of FnDataService, BundleProcessor, ActiveBundle,
etc, to change the parameter type from WindowedValue to T. (We have
already discussed in the previous mails)

2) Refactor the code to avoid unnecessary dependencies pull in. For
example, beam-sdks-java-core(11MB) is a package for Java SDK users and it
is pull in because there are a few classes in beam-sdks-java-core are used
in beam-runners-java-fn-execution, such as:
PipelineOptions used in DefaultJobBundleFactory FileSystems used in
BeamFileSystemArtifactRetrievalService.
It means maybe we can add a new module such as beam-sdks-java-common to
hold the classes used by both runner and SDK.

3) State cache is not shared between bundles which is performance critical
for streaming jobs.

4) The coder of WindowedValue cannot be configured and most of time we
don't need to serialize and deserialize the timestamp, window and pane
properties in Flink. But currently FullWindowedValueCoder is used by
default in WireCoders.addWireCoder, I suggest to make the coder
configurable (i.e. allowing to use ValueOnlyWindowedValueCoder)

5) Currently if a coder is not defined in StandardCoders, it will be
wrapped with LengthPrefixedCoder (WireCoders.addWireCoder ->
LengthPrefixUnknownCoders.addLengthPrefixedCoder). However, only a few
coders are defined in StandardCoders. It means that for most coder, a
length will be added to the serialized bytes which is not necessary in my
thoughts. My suggestion is maybe we can add some interfaces or tags for the
coder which indicate whether the coder is needed a length prefix or not.

6) Set log level according to PipelineOption in Python SDK Harness.
Currently the log level is set to INFO by default.

7) Allows to start up StatusServer according to PipelineOption in Python
SDK Harness. Currently the StatusServer is start up by default.

Although I put 3) 4) 5) into the "Nice to Have" as they are performance
related, I still think they are very critical for Python UDF execution
performance.

Open questions:
-
1) Which coders should be / can be defined in StandardCoders?

Currently we are preparing the design of how to support Python UDF in Flink
based on the Beam portability framework and we will bring up the discussion
in Flink community. We may propose more changes for Beam during that time
and may need more support from Beam community.

To be honest, I'm not an expert of Beam and so please feel free to correct
me if my understanding is wrong. Welcome any feedback.

Best,
Jincheng

Maximilian Michels  于2019年4月25日周四 上午12:14写道:

> Fully agree that this is an effort that goes beyond changing a type
> parameter but I think we have a chance here to cooperate between the two
> projects. I would be happy to help out where I can.
>
> I'm not sure at this point what exactly is feasible for reuse but I
> would imagine the Runner-related code to be useful as well for the
> interaction with the SDK Harness. There are some fundamental differences
> in the model, e.g. how windowing works, which might be challenging to
> work around.
>
> Thanks,
> Max
>
> On 24.04.19 12:03, jincheng sun wrote:
> >
> > Hi Kenn, I think you are 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-24 Thread Maximilian Michels
Fully agree that this is an effort that goes beyond changing a type 
parameter but I think we have a chance here to cooperate between the two 
projects. I would be happy to help out where I can.


I'm not sure at this point what exactly is feasible for reuse but I 
would imagine the Runner-related code to be useful as well for the 
interaction with the SDK Harness. There are some fundamental differences 
in the model, e.g. how windowing works, which might be challenging to 
work around.


Thanks,
Max

On 24.04.19 12:03, jincheng sun wrote:


Hi Kenn, I think you are right, the Python SDK harness can be shared to 
Flink, and also need to add some new primitive operations. Regarding 
runner-side, I think most of the code which in runners/java-fun- 
Execution is can be shared(but need some improvement, such as 
FnDataService), some of them cannot be shared, such as job submission 
code. So, we may need to set up a document to clearly analyze which ones 
can be shared, which ones can be shared but need to do some changes, and 
which ones are definitely cannot be shared.


Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn 
service as a library, also willing to do more efforts for this.
 From the view of the current code, abstracting Fn Service into a class 
library that other projects can rely on requires a lot of effort from 
the Beam community. Turn `WindowedValue` into `T` is just the 
beginning of this effort. If the Beam community is willing on 
abstracting Fn Service into a class library that can be relied upon by 
other projects, I can try to draft a document, of course during this 
period may need a lot of help from you, Kenn, Lukasz, and the Beam 
community. (I am a recruit in the Beam community :-))


What do you think?

Regards,
Jincheng

Kenneth Knowles mailto:k...@apache.org>> 于2019年4月24 
日周三 上午3:32写道:


It seems to me that the most valuable code to share and keep up with
is the Python/Go/etc SDK harness; they would need to be enhanced
with new primitive operations. So you would want to depend directly
and share the original proto-generated classes too, which Beam
publishes as separate artifacts for Java. Is the runner-side support
code that valuable for direct integration into Flink? I would expect
once you get past trivial wrappers (that you can copy/paste with no
loss) you would hit differences in architecture so you would diverge
anyhow.

Kenn

On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi Jincheng,

Copying code is a solution for the short term. In the long run
I'd like
the Fn services to be a library not only for the Beam
portability layer
but also for other projects which want to leverage it. We should
thus
make an effort to make it more generic/extensible where
necessary and
feasible.

Since you are investigating reuse of Beam portability in the
context of
Flink, do you think it would make sense to setup a document
where we
collect ideas and challenges?

Thanks,
Max

On 23.04.19 13:00, jincheng sun wrote:
 > Hi Reuven,
 >
 > I think you have provided an optional solution for other
community which
 > wants to take advantage of Beam's existing
achievements. Thank you very
 > much!
 >
 > I think the Flink community can choose to copy from Beam's
code or
 > choose to rely directly on the beam's class library. The
Flink community
 > also initiated a discussion, more info can be found here
 >


 >
 > The purpose of Turns `WindowedValue` into `T` is to
promote the
 > interface design of Beam more versatile, so that other open
source
 > projects have the opportunity to take advantage of Beam's
existing
 > achievements. Of course, just changing the `WindowedValue`
into `T`
 > is not enough to be shared by other projects in the form of a
class
 > library, we need to do more efforts. If Beam can provide a
class library
 > in the future, other community contributors will also have the
 > willingness to contribute to the beam community. This will
benefit both
 > the community that wants to take advantage of Beam's existing
 > achievements and the Beam community itself. And thanks to
Thomas for
 > that he has also made a lot of efforts in this regard.
 >
 > Thanks again for your valuable suggestion, and welcome any
feedback!
 >
 > Best,
 > Jincheng
 >
 > Reuven Lax 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-24 Thread jincheng sun
Hi Kenn, I think you are right, the Python SDK harness can be shared to
Flink, and also need to add some new primitive operations. Regarding
runner-side, I think most of the code which in runners/java-fun- Execution
is can be shared(but need some improvement, such as FnDataService), some of
them cannot be shared, such as job submission code. So, we may need to set
up a document to clearly analyze which ones can be shared, which ones can
be shared but need to do some changes, and which ones are definitely cannot
be shared.

Hi Max, Thanks for sharing your opinion, I also prefer to using beam Fn
service as a library, also willing to do more efforts for this.
>From the view of the current code, abstracting Fn Service into a class
library that other projects can rely on requires a lot of effort from the
Beam community. Turn `WindowedValue` into `T` is just the beginning of
this effort. If the Beam community is willing on abstracting Fn Service
into a class library that can be relied upon by other projects, I can try
to draft a document, of course during this period may need a lot of help
from you, Kenn, Lukasz, and the Beam community. (I am a recruit in the Beam
community :-))

What do you think?

Regards,
Jincheng

Kenneth Knowles  于2019年4月24日周三 上午3:32写道:

> It seems to me that the most valuable code to share and keep up with is
> the Python/Go/etc SDK harness; they would need to be enhanced with new
> primitive operations. So you would want to depend directly and share the
> original proto-generated classes too, which Beam publishes as separate
> artifacts for Java. Is the runner-side support code that valuable for
> direct integration into Flink? I would expect once you get past trivial
> wrappers (that you can copy/paste with no loss) you would hit differences
> in architecture so you would diverge anyhow.
>
> Kenn
>
> On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels  wrote:
>
>> Hi Jincheng,
>>
>> Copying code is a solution for the short term. In the long run I'd like
>> the Fn services to be a library not only for the Beam portability layer
>> but also for other projects which want to leverage it. We should thus
>> make an effort to make it more generic/extensible where necessary and
>> feasible.
>>
>> Since you are investigating reuse of Beam portability in the context of
>> Flink, do you think it would make sense to setup a document where we
>> collect ideas and challenges?
>>
>> Thanks,
>> Max
>>
>> On 23.04.19 13:00, jincheng sun wrote:
>> > Hi Reuven,
>> >
>> > I think you have provided an optional solution for other community
>> which
>> > wants to take advantage of Beam's existing achievements. Thank you very
>> > much!
>> >
>> > I think the Flink community can choose to copy from Beam's code or
>> > choose to rely directly on the beam's class library. The Flink
>> community
>> > also initiated a discussion, more info can be found here
>> > <
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
>> >
>> >
>> > The purpose of Turns `WindowedValue` into `T` is to promote the
>> > interface design of Beam more versatile, so that other open source
>> > projects have the opportunity to take advantage of Beam's existing
>> > achievements. Of course, just changing the `WindowedValue` into `T`
>> > is not enough to be shared by other projects in the form of a class
>> > library, we need to do more efforts. If Beam can provide a class
>> library
>> > in the future, other community contributors will also have the
>> > willingness to contribute to the beam community. This will benefit both
>> > the community that wants to take advantage of Beam's existing
>> > achievements and the Beam community itself. And thanks to Thomas for
>> > that he has also made a lot of efforts in this regard.
>> >
>> > Thanks again for your valuable suggestion, and welcome any feedback!
>> >
>> > Best,
>> > Jincheng
>> >
>> > Reuven Lax mailto:re...@google.com>> 于2019年4月23日
>> > 周二 上午1:00写道:
>> >
>> > One concern here: these interfaces are intended for use within the
>> > Beam project. Beam may decide to make specific changes to them to
>> > support needed functionality in Beam. If they are being reused by
>> > other projects, then those changes risk breaking those other
>> > projects in unexpected ways. I don't think we can guarantee that we
>> > don't do that. If this is useful in Flink, it would be safer to copy
>> > the code IMO rather than to directly depend on it.
>> >
>> > On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
>> > mailto:sunjincheng...@gmail.com>> wrote:
>> >
>> > Hi Kenn,
>> >
>> > Thanks for your reply, and explained the design of WindowValue
>> > clearly!
>> >
>> > At present, the definitions of `FnDataService` and
>> > `BeamFnDataClient` in Data Plane are very clear and universal,
>> > such as: send(...)/receive(...). If it is only 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-23 Thread Kenneth Knowles
It seems to me that the most valuable code to share and keep up with is the
Python/Go/etc SDK harness; they would need to be enhanced with new
primitive operations. So you would want to depend directly and share the
original proto-generated classes too, which Beam publishes as separate
artifacts for Java. Is the runner-side support code that valuable for
direct integration into Flink? I would expect once you get past trivial
wrappers (that you can copy/paste with no loss) you would hit differences
in architecture so you would diverge anyhow.

Kenn

On Tue, Apr 23, 2019 at 5:32 AM Maximilian Michels  wrote:

> Hi Jincheng,
>
> Copying code is a solution for the short term. In the long run I'd like
> the Fn services to be a library not only for the Beam portability layer
> but also for other projects which want to leverage it. We should thus
> make an effort to make it more generic/extensible where necessary and
> feasible.
>
> Since you are investigating reuse of Beam portability in the context of
> Flink, do you think it would make sense to setup a document where we
> collect ideas and challenges?
>
> Thanks,
> Max
>
> On 23.04.19 13:00, jincheng sun wrote:
> > Hi Reuven,
> >
> > I think you have provided an optional solution for other community which
> > wants to take advantage of Beam's existing achievements. Thank you very
> > much!
> >
> > I think the Flink community can choose to copy from Beam's code or
> > choose to rely directly on the beam's class library. The Flink community
> > also initiated a discussion, more info can be found here
> > <
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
> >
> >
> > The purpose of Turns `WindowedValue` into `T` is to promote the
> > interface design of Beam more versatile, so that other open source
> > projects have the opportunity to take advantage of Beam's existing
> > achievements. Of course, just changing the `WindowedValue` into `T`
> > is not enough to be shared by other projects in the form of a class
> > library, we need to do more efforts. If Beam can provide a class library
> > in the future, other community contributors will also have the
> > willingness to contribute to the beam community. This will benefit both
> > the community that wants to take advantage of Beam's existing
> > achievements and the Beam community itself. And thanks to Thomas for
> > that he has also made a lot of efforts in this regard.
> >
> > Thanks again for your valuable suggestion, and welcome any feedback!
> >
> > Best,
> > Jincheng
> >
> > Reuven Lax mailto:re...@google.com>> 于2019年4月23日
> > 周二 上午1:00写道:
> >
> > One concern here: these interfaces are intended for use within the
> > Beam project. Beam may decide to make specific changes to them to
> > support needed functionality in Beam. If they are being reused by
> > other projects, then those changes risk breaking those other
> > projects in unexpected ways. I don't think we can guarantee that we
> > don't do that. If this is useful in Flink, it would be safer to copy
> > the code IMO rather than to directly depend on it.
> >
> > On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
> > mailto:sunjincheng...@gmail.com>> wrote:
> >
> > Hi Kenn,
> >
> > Thanks for your reply, and explained the design of WindowValue
> > clearly!
> >
> > At present, the definitions of `FnDataService` and
> > `BeamFnDataClient` in Data Plane are very clear and universal,
> > such as: send(...)/receive(...). If it is only applied in the
> > project of Beam, it is already very good. Because `WindowValue`
> > is a very basic data structure in the Beam project, both the
> > Runner and the SDK harness have define the WindowedValue data
> > structure.
> >
> > The reason I want to change the interface parameter from
> > `WindowedValue` to T is because I want to make the `Data
> > Plane` interface into a class library that can be used by other
> > projects (such as Apache Flink), so that other projects Can have
> > its own `FnDataService` implementation. However, the definition
> > of `WindowedValue` does not apply to all projects. For example,
> > Apache Flink also has a definition similar to WindowedValue. For
> > example, Apache Flink Stream has StreamRecord. If we change
> > `WindowedValue` to T, then other project's implementation
> > does not need to wrap WindowedValue, the interface will become
> > more concise.  Furthermore,  we only need one T, such as the
> > Apache Flink DataSet operator.
> >
> > So, I agree with your understanding, I don't expect
> > `WindowedValueXXX` in the FnDataService interface, I hope to
> > just use a `T`.
> >
> > Have you seen some problem if we change the interface parameter
> > 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-23 Thread Maximilian Michels

Hi Jincheng,

Copying code is a solution for the short term. In the long run I'd like 
the Fn services to be a library not only for the Beam portability layer 
but also for other projects which want to leverage it. We should thus 
make an effort to make it more generic/extensible where necessary and 
feasible.


Since you are investigating reuse of Beam portability in the context of 
Flink, do you think it would make sense to setup a document where we 
collect ideas and challenges?


Thanks,
Max

On 23.04.19 13:00, jincheng sun wrote:

Hi Reuven,

I think you have provided an optional solution for other community which 
wants to take advantage of Beam's existing achievements. Thank you very 
much!


I think the Flink community can choose to copy from Beam's code or 
choose to rely directly on the beam's class library. The Flink community 
also initiated a discussion, more info can be found here 



The purpose of Turns `WindowedValue` into `T` is to promote the 
interface design of Beam more versatile, so that other open source 
projects have the opportunity to take advantage of Beam's existing 
achievements. Of course, just changing the `WindowedValue` into `T` 
is not enough to be shared by other projects in the form of a class 
library, we need to do more efforts. If Beam can provide a class library 
in the future, other community contributors will also have the 
willingness to contribute to the beam community. This will benefit both 
the community that wants to take advantage of Beam's existing 
achievements and the Beam community itself. And thanks to Thomas for 
that he has also made a lot of efforts in this regard.


Thanks again for your valuable suggestion, and welcome any feedback!

Best,
Jincheng

Reuven Lax mailto:re...@google.com>> 于2019年4月23日 
周二 上午1:00写道:


One concern here: these interfaces are intended for use within the
Beam project. Beam may decide to make specific changes to them to
support needed functionality in Beam. If they are being reused by
other projects, then those changes risk breaking those other
projects in unexpected ways. I don't think we can guarantee that we
don't do that. If this is useful in Flink, it would be safer to copy
the code IMO rather than to directly depend on it.

On Mon, Apr 22, 2019 at 12:08 AM jincheng sun
mailto:sunjincheng...@gmail.com>> wrote:

Hi Kenn,

Thanks for your reply, and explained the design of WindowValue
clearly!

At present, the definitions of `FnDataService` and
`BeamFnDataClient` in Data Plane are very clear and universal,
such as: send(...)/receive(...). If it is only applied in the
project of Beam, it is already very good. Because `WindowValue`
is a very basic data structure in the Beam project, both the
Runner and the SDK harness have define the WindowedValue data
structure.

The reason I want to change the interface parameter from
`WindowedValue` to T is because I want to make the `Data
Plane` interface into a class library that can be used by other
projects (such as Apache Flink), so that other projects Can have
its own `FnDataService` implementation. However, the definition
of `WindowedValue` does not apply to all projects. For example,
Apache Flink also has a definition similar to WindowedValue. For
example, Apache Flink Stream has StreamRecord. If we change
`WindowedValue` to T, then other project's implementation
does not need to wrap WindowedValue, the interface will become
more concise.  Furthermore,  we only need one T, such as the
Apache Flink DataSet operator.

So, I agree with your understanding, I don't expect
`WindowedValueXXX` in the FnDataService interface, I hope to
just use a `T`.

Have you seen some problem if we change the interface parameter
from `WindowedValue` to T?

Thanks,
Jincheng

Kenneth Knowles mailto:k...@apache.org>> 于
2019年4月20日周六 上午2:38写道:

WindowedValue has always been an interface, not a concrete
representation:

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java

.
It is an abstract class because we started in Java 7 where
you could not have default methods, and just due to legacy
style concerns. it is not just discussed, but implemented,
that there are WindowedValue implementations with fewer
allocations.
At the coder level, it was also always intended to have

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-23 Thread jincheng sun
Hi Reuven,

I think you have provided an optional solution for other community which
wants to take advantage of Beam's existing achievements. Thank you very
much!

I think the Flink community can choose to copy from Beam's code or choose
to rely directly on the beam's class library. The Flink community also
initiated a discussion, more info can be found here


The purpose of Turns `WindowedValue` into `T` is to promote the
interface design of Beam more versatile, so that other open source projects
have the opportunity to take advantage of Beam's existing achievements. Of
course, just changing the `WindowedValue` into `T` is not enough to be
shared by other projects in the form of a class library, we need to do more
efforts. If Beam can provide a class library in the future, other community
contributors will also have the willingness to contribute to the beam
community. This will benefit both the community that wants to take
advantage of Beam's existing achievements and the Beam community itself.
And thanks to Thomas for that he has also made a lot of efforts in this
regard.

Thanks again for your valuable suggestion, and welcome any feedback!

Best,
Jincheng

Reuven Lax  于2019年4月23日周二 上午1:00写道:

> One concern here: these interfaces are intended for use within the Beam
> project. Beam may decide to make specific changes to them to support needed
> functionality in Beam. If they are being reused by other projects, then
> those changes risk breaking those other projects in unexpected ways. I
> don't think we can guarantee that we don't do that. If this is useful in
> Flink, it would be safer to copy the code IMO rather than to directly
> depend on it.
>
> On Mon, Apr 22, 2019 at 12:08 AM jincheng sun 
> wrote:
>
>> Hi Kenn,
>>
>> Thanks for your reply, and explained the design of WindowValue clearly!
>>
>> At present, the definitions of `FnDataService` and `BeamFnDataClient` in
>> Data Plane are very clear and universal, such as: send(...)/receive(...).
>> If it is only applied in the project of Beam, it is already very good.
>> Because `WindowValue` is a very basic data structure in the Beam project,
>> both the Runner and the SDK harness have define the WindowedValue data
>> structure.
>>
>> The reason I want to change the interface parameter from
>> `WindowedValue` to T is because I want to make the `Data Plane`
>> interface into a class library that can be used by other projects (such as
>> Apache Flink), so that other projects Can have its own `FnDataService`
>> implementation. However, the definition of `WindowedValue` does not apply
>> to all projects. For example, Apache Flink also has a definition similar to
>> WindowedValue. For example, Apache Flink Stream has StreamRecord. If we
>> change `WindowedValue` to T, then other project's implementation does
>> not need to wrap WindowedValue, the interface will become more concise.
>> Furthermore,  we only need one T, such as the Apache Flink DataSet operator.
>>
>> So, I agree with your understanding, I don't expect `WindowedValueXXX`
>> in the FnDataService interface, I hope to just use a `T`.
>>
>> Have you seen some problem if we change the interface parameter from
>> `WindowedValue` to T?
>>
>> Thanks,
>> Jincheng
>>
>> Kenneth Knowles  于2019年4月20日周六 上午2:38写道:
>>
>>> WindowedValue has always been an interface, not a concrete
>>> representation:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>>> .
>>> It is an abstract class because we started in Java 7 where you could not
>>> have default methods, and just due to legacy style concerns. it is not just
>>> discussed, but implemented, that there are WindowedValue implementations
>>> with fewer allocations.
>>> At the coder level, it was also always intended to have multiple
>>> encodings. We already do have separate encodings based on whether there is
>>> 1 window or multiple windows. The coder for a particular kind of
>>> WindowedValue should decide this. Before the Fn API none of this had to be
>>> standardized, because the runner could just choose whatever it wants. Now
>>> we have to standardize any encodings that runners and harnesses both need
>>> to know. There should be many, and adding more should be just a matter of
>>> standardization, no new design.
>>>
>>> None of this should be user-facing or in the runner API / pipeline graph
>>> - that is critical to making it flexible on the backend between the runner
>>> & SDK harness.
>>>
>>> If I understand it, from our offline discussion, you are interested in
>>> the case where you issue a ProcessBundleRequest to the SDK harness and none
>>> of the primitives in the subgraph will ever observe the metadata. So you
>>> 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-22 Thread Kenneth Knowles
Makes sense to me. I don't see any problem.

Kenn

On Mon, Apr 22, 2019 at 12:08 AM jincheng sun 
wrote:

> Hi Kenn,
>
> Thanks for your reply, and explained the design of WindowValue clearly!
>
> At present, the definitions of `FnDataService` and `BeamFnDataClient` in
> Data Plane are very clear and universal, such as: send(...)/receive(...).
> If it is only applied in the project of Beam, it is already very good.
> Because `WindowValue` is a very basic data structure in the Beam project,
> both the Runner and the SDK harness have define the WindowedValue data
> structure.
>
> The reason I want to change the interface parameter from
> `WindowedValue` to T is because I want to make the `Data Plane`
> interface into a class library that can be used by other projects (such as
> Apache Flink), so that other projects Can have its own `FnDataService`
> implementation. However, the definition of `WindowedValue` does not apply
> to all projects. For example, Apache Flink also has a definition similar to
> WindowedValue. For example, Apache Flink Stream has StreamRecord. If we
> change `WindowedValue` to T, then other project's implementation does
> not need to wrap WindowedValue, the interface will become more concise.
> Furthermore,  we only need one T, such as the Apache Flink DataSet operator.
>
> So, I agree with your understanding, I don't expect `WindowedValueXXX`
> in the FnDataService interface, I hope to just use a `T`.
>
> Have you seen some problem if we change the interface parameter from
> `WindowedValue` to T?
>
> Thanks,
> Jincheng
>
> Kenneth Knowles  于2019年4月20日周六 上午2:38写道:
>
>> WindowedValue has always been an interface, not a concrete
>> representation:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
>> .
>> It is an abstract class because we started in Java 7 where you could not
>> have default methods, and just due to legacy style concerns. it is not just
>> discussed, but implemented, that there are WindowedValue implementations
>> with fewer allocations.
>> At the coder level, it was also always intended to have multiple
>> encodings. We already do have separate encodings based on whether there is
>> 1 window or multiple windows. The coder for a particular kind of
>> WindowedValue should decide this. Before the Fn API none of this had to be
>> standardized, because the runner could just choose whatever it wants. Now
>> we have to standardize any encodings that runners and harnesses both need
>> to know. There should be many, and adding more should be just a matter of
>> standardization, no new design.
>>
>> None of this should be user-facing or in the runner API / pipeline graph
>> - that is critical to making it flexible on the backend between the runner
>> & SDK harness.
>>
>> If I understand it, from our offline discussion, you are interested in
>> the case where you issue a ProcessBundleRequest to the SDK harness and none
>> of the primitives in the subgraph will ever observe the metadata. So you
>> want to not even have a tiny
>> "WindowedValueWithNoMetadata". Is that accurate?
>>
>> Kenn
>>
>> On Fri, Apr 19, 2019 at 10:17 AM jincheng sun 
>> wrote:
>>
>>> Thank you! And have a nice weekend!
>>>
>>>
>>> Lukasz Cwik  于2019年4月20日周六 上午1:14写道:
>>>
 I have added you as a contributor.

 On Fri, Apr 19, 2019 at 9:56 AM jincheng sun 
 wrote:

> Hi Lukasz,
>
> Thanks for your affirmation and provide more contextual information. :)
>
> Would you please give me the contributor permission?  My JIRA ID is
> sunjincheng121.
>
> I would like to create/assign tickets for this work.
>
> Thanks,
> Jincheng
>
> Lukasz Cwik  于2019年4月20日周六 上午12:26写道:
>
>> Since I don't think this is a contentious change.
>>
>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik  wrote:
>>
>>> Yes, using T makes sense.
>>>
>>> The WindowedValue was meant to be a context object in the SDK
>>> harness that propagates various information about the current element. 
>>> We
>>> have discussed in the past about:
>>> * making optimizations which would pass around less of the context
>>> information if we know that the DoFns don't need it (for example, all 
>>> the
>>> values share the same window).
>>> * versioning the encoding separately from the WindowedValue context
>>> object (see recent discussion about element timestamp precision [1])
>>> * the runner may want its own representation of a context object
>>> that makes sense for it which isn't a WindowedValue necessarily.
>>>
>>> Feel free to cut a JIRA about this and start working on a change
>>> towards this.
>>>
>>> 1:
>>> 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-22 Thread jincheng sun
Hi Kenn,

Thanks for your reply, and explained the design of WindowValue clearly!

At present, the definitions of `FnDataService` and `BeamFnDataClient` in
Data Plane are very clear and universal, such as: send(...)/receive(...).
If it is only applied in the project of Beam, it is already very good.
Because `WindowValue` is a very basic data structure in the Beam project,
both the Runner and the SDK harness have define the WindowedValue data
structure.

The reason I want to change the interface parameter from `WindowedValue`
to T is because I want to make the `Data Plane` interface into a class
library that can be used by other projects (such as Apache Flink), so that
other projects Can have its own `FnDataService` implementation. However,
the definition of `WindowedValue` does not apply to all projects. For
example, Apache Flink also has a definition similar to WindowedValue. For
example, Apache Flink Stream has StreamRecord. If we change
`WindowedValue` to T, then other project's implementation does not need
to wrap WindowedValue, the interface will become more concise.
Furthermore,  we only need one T, such as the Apache Flink DataSet operator.

So, I agree with your understanding, I don't expect `WindowedValueXXX`
in the FnDataService interface, I hope to just use a `T`.

Have you seen some problem if we change the interface parameter from
`WindowedValue` to T?

Thanks,
Jincheng

Kenneth Knowles  于2019年4月20日周六 上午2:38写道:

> WindowedValue has always been an interface, not a concrete representation:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
> .
> It is an abstract class because we started in Java 7 where you could not
> have default methods, and just due to legacy style concerns. it is not just
> discussed, but implemented, that there are WindowedValue implementations
> with fewer allocations.
> At the coder level, it was also always intended to have multiple
> encodings. We already do have separate encodings based on whether there is
> 1 window or multiple windows. The coder for a particular kind of
> WindowedValue should decide this. Before the Fn API none of this had to be
> standardized, because the runner could just choose whatever it wants. Now
> we have to standardize any encodings that runners and harnesses both need
> to know. There should be many, and adding more should be just a matter of
> standardization, no new design.
>
> None of this should be user-facing or in the runner API / pipeline graph -
> that is critical to making it flexible on the backend between the runner &
> SDK harness.
>
> If I understand it, from our offline discussion, you are interested in the
> case where you issue a ProcessBundleRequest to the SDK harness and none of
> the primitives in the subgraph will ever observe the metadata. So you want
> to not even have a tiny
> "WindowedValueWithNoMetadata". Is that accurate?
>
> Kenn
>
> On Fri, Apr 19, 2019 at 10:17 AM jincheng sun 
> wrote:
>
>> Thank you! And have a nice weekend!
>>
>>
>> Lukasz Cwik  于2019年4月20日周六 上午1:14写道:
>>
>>> I have added you as a contributor.
>>>
>>> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun 
>>> wrote:
>>>
 Hi Lukasz,

 Thanks for your affirmation and provide more contextual information. :)

 Would you please give me the contributor permission?  My JIRA ID is
 sunjincheng121.

 I would like to create/assign tickets for this work.

 Thanks,
 Jincheng

 Lukasz Cwik  于2019年4月20日周六 上午12:26写道:

> Since I don't think this is a contentious change.
>
> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik  wrote:
>
>> Yes, using T makes sense.
>>
>> The WindowedValue was meant to be a context object in the SDK harness
>> that propagates various information about the current element. We have
>> discussed in the past about:
>> * making optimizations which would pass around less of the context
>> information if we know that the DoFns don't need it (for example, all the
>> values share the same window).
>> * versioning the encoding separately from the WindowedValue context
>> object (see recent discussion about element timestamp precision [1])
>> * the runner may want its own representation of a context object that
>> makes sense for it which isn't a WindowedValue necessarily.
>>
>> Feel free to cut a JIRA about this and start working on a change
>> towards this.
>>
>> 1:
>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>>
>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun <
>> sunjincheng...@gmail.com> wrote:
>>
>>> Hi Beam devs,
>>>
>>> I read some of the docs about `Communicating over the Fn API` in
>>> Beam. I feel that Beam 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-19 Thread Kenneth Knowles
WindowedValue has always been an interface, not a concrete representation:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
.
It is an abstract class because we started in Java 7 where you could not
have default methods, and just due to legacy style concerns. it is not just
discussed, but implemented, that there are WindowedValue implementations
with fewer allocations.
At the coder level, it was also always intended to have multiple encodings.
We already do have separate encodings based on whether there is 1 window or
multiple windows. The coder for a particular kind of WindowedValue should
decide this. Before the Fn API none of this had to be standardized, because
the runner could just choose whatever it wants. Now we have to standardize
any encodings that runners and harnesses both need to know. There should be
many, and adding more should be just a matter of standardization, no new
design.

None of this should be user-facing or in the runner API / pipeline graph -
that is critical to making it flexible on the backend between the runner &
SDK harness.

If I understand it, from our offline discussion, you are interested in the
case where you issue a ProcessBundleRequest to the SDK harness and none of
the primitives in the subgraph will ever observe the metadata. So you want
to not even have a tiny
"WindowedValueWithNoMetadata". Is that accurate?

Kenn

On Fri, Apr 19, 2019 at 10:17 AM jincheng sun 
wrote:

> Thank you! And have a nice weekend!
>
>
> Lukasz Cwik  于2019年4月20日周六 上午1:14写道:
>
>> I have added you as a contributor.
>>
>> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun 
>> wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks for your affirmation and provide more contextual information. :)
>>>
>>> Would you please give me the contributor permission?  My JIRA ID is
>>> sunjincheng121.
>>>
>>> I would like to create/assign tickets for this work.
>>>
>>> Thanks,
>>> Jincheng
>>>
>>> Lukasz Cwik  于2019年4月20日周六 上午12:26写道:
>>>
 Since I don't think this is a contentious change.

 On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik  wrote:

> Yes, using T makes sense.
>
> The WindowedValue was meant to be a context object in the SDK harness
> that propagates various information about the current element. We have
> discussed in the past about:
> * making optimizations which would pass around less of the context
> information if we know that the DoFns don't need it (for example, all the
> values share the same window).
> * versioning the encoding separately from the WindowedValue context
> object (see recent discussion about element timestamp precision [1])
> * the runner may want its own representation of a context object that
> makes sense for it which isn't a WindowedValue necessarily.
>
> Feel free to cut a JIRA about this and start working on a change
> towards this.
>
> 1:
> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>
> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun 
> wrote:
>
>> Hi Beam devs,
>>
>> I read some of the docs about `Communicating over the Fn API` in
>> Beam. I feel that Beam has a very good design for Control Plane/Data
>> Plane/State Plane/Logging services, and it is described in > and
>> receive data> document. When communicating between Runner and SDK 
>> Harness,
>> the DataPlane API will be WindowedValue(An immutable triple of value,
>> timestamp, and windows.) As a contract object between Runner and SDK
>> Harness. I see the interface definitions for sending and receiving data 
>> in
>> the code as follows:
>>
>> - org.apache.beam.runners.fnexecution.data.FnDataService
>>
>> public interface FnDataService {
>>>InboundDataClient receive(LogicalEndpoint inputLocation,
>>> Coder> coder, FnDataReceiver> 
>>> listener);
>>>CloseableFnDataReceiver> send(
>>>   LogicalEndpoint outputLocation, Coder> coder);
>>> }
>>
>>
>>
>> - org.apache.beam.fn.harness.data.BeamFnDataClient
>>
>> public interface BeamFnDataClient {
>>>InboundDataClient receive(ApiServiceDescriptor
>>> apiServiceDescriptor, LogicalEndpoint inputLocation,
>>> Coder> coder, FnDataReceiver> 
>>> receiver);
>>>CloseableFnDataReceiver>
>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor
>>> apiServiceDescriptor, LogicalEndpoint outputLocation,
>>> Coder> coder);
>>> }
>>
>>
>> Both `Coder>` and `FnDataReceiver>`
>> use `WindowedValue` as the data structure that both sides of Runner and 
>> SDK
>> Harness know each other. Control Plane/Data Plane/State Plane/Logging is 
>> a
>> 

Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-19 Thread jincheng sun
Thank you! And have a nice weekend!


Lukasz Cwik  于2019年4月20日周六 上午1:14写道:

> I have added you as a contributor.
>
> On Fri, Apr 19, 2019 at 9:56 AM jincheng sun 
> wrote:
>
>> Hi Lukasz,
>>
>> Thanks for your affirmation and provide more contextual information. :)
>>
>> Would you please give me the contributor permission?  My JIRA ID is
>> sunjincheng121.
>>
>> I would like to create/assign tickets for this work.
>>
>> Thanks,
>> Jincheng
>>
>> Lukasz Cwik  于2019年4月20日周六 上午12:26写道:
>>
>>> Since I don't think this is a contentious change.
>>>
>>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik  wrote:
>>>
 Yes, using T makes sense.

 The WindowedValue was meant to be a context object in the SDK harness
 that propagates various information about the current element. We have
 discussed in the past about:
 * making optimizations which would pass around less of the context
 information if we know that the DoFns don't need it (for example, all the
 values share the same window).
 * versioning the encoding separately from the WindowedValue context
 object (see recent discussion about element timestamp precision [1])
 * the runner may want its own representation of a context object that
 makes sense for it which isn't a WindowedValue necessarily.

 Feel free to cut a JIRA about this and start working on a change
 towards this.

 1:
 https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E

 On Fri, Apr 19, 2019 at 3:18 AM jincheng sun 
 wrote:

> Hi Beam devs,
>
> I read some of the docs about `Communicating over the Fn API` in Beam.
> I feel that Beam has a very good design for Control Plane/Data Plane/State
> Plane/Logging services, and it is described in  data> document. When communicating between Runner and SDK Harness, the
> DataPlane API will be WindowedValue(An immutable triple of value,
> timestamp, and windows.) As a contract object between Runner and SDK
> Harness. I see the interface definitions for sending and receiving data in
> the code as follows:
>
> - org.apache.beam.runners.fnexecution.data.FnDataService
>
> public interface FnDataService {
>>InboundDataClient receive(LogicalEndpoint inputLocation,
>> Coder> coder, FnDataReceiver> 
>> listener);
>>CloseableFnDataReceiver> send(
>>   LogicalEndpoint outputLocation, Coder> coder);
>> }
>
>
>
> - org.apache.beam.fn.harness.data.BeamFnDataClient
>
> public interface BeamFnDataClient {
>>InboundDataClient receive(ApiServiceDescriptor
>> apiServiceDescriptor, LogicalEndpoint inputLocation,
>> Coder> coder, FnDataReceiver> 
>> receiver);
>>CloseableFnDataReceiver>
>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor
>> apiServiceDescriptor, LogicalEndpoint outputLocation,
>> Coder> coder);
>> }
>
>
> Both `Coder>` and `FnDataReceiver>`
> use `WindowedValue` as the data structure that both sides of Runner and 
> SDK
> Harness know each other. Control Plane/Data Plane/State Plane/Logging is a
> highly abstraction, such as Control Plane and Logging, these are common
> requirements for all multi-language platforms. For example, the Flink
> community is also discussing how to support Python UDF, as well as how to
> deal with docker environment. how to data transfer, how to state access,
> how to logging etc. If Beam can further abstract these service interfaces,
> i.e., interface definitions are compatible with multiple engines, and
> finally provided to other projects in the form of class libraries, it
> definitely will help other platforms that want to support multiple
> languages. So could beam can further abstract the interface definition of
> FnDataService's BeamFnDataClient? Here I am to throw out a minnow to catch
> a whale, take the FnDataService#receive interface as an example, and turn
> `WindowedValue` into `T` so that other platforms can be extended
> arbitrarily, as follows:
>
>  InboundDataClient receive(LogicalEndpoint inputLocation, Coder
> coder, FnDataReceiver> listener);
>
> What do you think?
>
> Feel free to correct me if there any incorrect understanding. And
> welcome any feedback!
>
>
> Regards,
> Jincheng
>



Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-19 Thread Lukasz Cwik
I have added you as a contributor.

On Fri, Apr 19, 2019 at 9:56 AM jincheng sun 
wrote:

> Hi Lukasz,
>
> Thanks for your affirmation and provide more contextual information. :)
>
> Would you please give me the contributor permission?  My JIRA ID is
> sunjincheng121.
>
> I would like to create/assign tickets for this work.
>
> Thanks,
> Jincheng
>
> Lukasz Cwik  于2019年4月20日周六 上午12:26写道:
>
>> Since I don't think this is a contentious change.
>>
>> On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik  wrote:
>>
>>> Yes, using T makes sense.
>>>
>>> The WindowedValue was meant to be a context object in the SDK harness
>>> that propagates various information about the current element. We have
>>> discussed in the past about:
>>> * making optimizations which would pass around less of the context
>>> information if we know that the DoFns don't need it (for example, all the
>>> values share the same window).
>>> * versioning the encoding separately from the WindowedValue context
>>> object (see recent discussion about element timestamp precision [1])
>>> * the runner may want its own representation of a context object that
>>> makes sense for it which isn't a WindowedValue necessarily.
>>>
>>> Feel free to cut a JIRA about this and start working on a change towards
>>> this.
>>>
>>> 1:
>>> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>>>
>>> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun 
>>> wrote:
>>>
 Hi Beam devs,

 I read some of the docs about `Communicating over the Fn API` in Beam.
 I feel that Beam has a very good design for Control Plane/Data Plane/State
 Plane/Logging services, and it is described in >>> data> document. When communicating between Runner and SDK Harness, the
 DataPlane API will be WindowedValue(An immutable triple of value,
 timestamp, and windows.) As a contract object between Runner and SDK
 Harness. I see the interface definitions for sending and receiving data in
 the code as follows:

 - org.apache.beam.runners.fnexecution.data.FnDataService

 public interface FnDataService {
>InboundDataClient receive(LogicalEndpoint inputLocation,
> Coder> coder, FnDataReceiver> listener);
>CloseableFnDataReceiver> send(
>   LogicalEndpoint outputLocation, Coder> coder);
> }



 - org.apache.beam.fn.harness.data.BeamFnDataClient

 public interface BeamFnDataClient {
>InboundDataClient receive(ApiServiceDescriptor
> apiServiceDescriptor, LogicalEndpoint inputLocation,
> Coder> coder, FnDataReceiver> receiver);
>CloseableFnDataReceiver>
> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor
> apiServiceDescriptor, LogicalEndpoint outputLocation,
> Coder> coder);
> }


 Both `Coder>` and `FnDataReceiver>`
 use `WindowedValue` as the data structure that both sides of Runner and SDK
 Harness know each other. Control Plane/Data Plane/State Plane/Logging is a
 highly abstraction, such as Control Plane and Logging, these are common
 requirements for all multi-language platforms. For example, the Flink
 community is also discussing how to support Python UDF, as well as how to
 deal with docker environment. how to data transfer, how to state access,
 how to logging etc. If Beam can further abstract these service interfaces,
 i.e., interface definitions are compatible with multiple engines, and
 finally provided to other projects in the form of class libraries, it
 definitely will help other platforms that want to support multiple
 languages. So could beam can further abstract the interface definition of
 FnDataService's BeamFnDataClient? Here I am to throw out a minnow to catch
 a whale, take the FnDataService#receive interface as an example, and turn
 `WindowedValue` into `T` so that other platforms can be extended
 arbitrarily, as follows:

  InboundDataClient receive(LogicalEndpoint inputLocation, Coder
 coder, FnDataReceiver> listener);

 What do you think?

 Feel free to correct me if there any incorrect understanding. And
 welcome any feedback!


 Regards,
 Jincheng

>>>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-19 Thread Lukasz Cwik
Since I don't think this is a contentious change.

On Fri, Apr 19, 2019 at 9:25 AM Lukasz Cwik  wrote:

> Yes, using T makes sense.
>
> The WindowedValue was meant to be a context object in the SDK harness that
> propagates various information about the current element. We have discussed
> in the past about:
> * making optimizations which would pass around less of the context
> information if we know that the DoFns don't need it (for example, all the
> values share the same window).
> * versioning the encoding separately from the WindowedValue context object
> (see recent discussion about element timestamp precision [1])
> * the runner may want its own representation of a context object that
> makes sense for it which isn't a WindowedValue necessarily.
>
> Feel free to cut a JIRA about this and start working on a change towards
> this.
>
> 1:
> https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E
>
> On Fri, Apr 19, 2019 at 3:18 AM jincheng sun 
> wrote:
>
>> Hi Beam devs,
>>
>> I read some of the docs about `Communicating over the Fn API` in Beam. I
>> feel that Beam has a very good design for Control Plane/Data Plane/State
>> Plane/Logging services, and it is described in > data> document. When communicating between Runner and SDK Harness, the
>> DataPlane API will be WindowedValue(An immutable triple of value,
>> timestamp, and windows.) As a contract object between Runner and SDK
>> Harness. I see the interface definitions for sending and receiving data in
>> the code as follows:
>>
>> - org.apache.beam.runners.fnexecution.data.FnDataService
>>
>> public interface FnDataService {
>>>InboundDataClient receive(LogicalEndpoint inputLocation,
>>> Coder> coder, FnDataReceiver> listener);
>>>CloseableFnDataReceiver> send(
>>>   LogicalEndpoint outputLocation, Coder> coder);
>>> }
>>
>>
>>
>> - org.apache.beam.fn.harness.data.BeamFnDataClient
>>
>> public interface BeamFnDataClient {
>>>InboundDataClient receive(ApiServiceDescriptor
>>> apiServiceDescriptor, LogicalEndpoint inputLocation,
>>> Coder> coder, FnDataReceiver> receiver);
>>>CloseableFnDataReceiver>
>>> send(BeamFnDataGrpcClient Endpoints.ApiServiceDescriptor
>>> apiServiceDescriptor, LogicalEndpoint outputLocation,
>>> Coder> coder);
>>> }
>>
>>
>> Both `Coder>` and `FnDataReceiver>` use
>> `WindowedValue` as the data structure that both sides of Runner and SDK
>> Harness know each other. Control Plane/Data Plane/State Plane/Logging is a
>> highly abstraction, such as Control Plane and Logging, these are common
>> requirements for all multi-language platforms. For example, the Flink
>> community is also discussing how to support Python UDF, as well as how to
>> deal with docker environment. how to data transfer, how to state access,
>> how to logging etc. If Beam can further abstract these service interfaces,
>> i.e., interface definitions are compatible with multiple engines, and
>> finally provided to other projects in the form of class libraries, it
>> definitely will help other platforms that want to support multiple
>> languages. So could beam can further abstract the interface definition of
>> FnDataService's BeamFnDataClient? Here I am to throw out a minnow to catch
>> a whale, take the FnDataService#receive interface as an example, and turn
>> `WindowedValue` into `T` so that other platforms can be extended
>> arbitrarily, as follows:
>>
>>  InboundDataClient receive(LogicalEndpoint inputLocation, Coder
>> coder, FnDataReceiver> listener);
>>
>> What do you think?
>>
>> Feel free to correct me if there any incorrect understanding. And welcome
>> any feedback!
>>
>>
>> Regards,
>> Jincheng
>>
>


Re: [DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-19 Thread Lukasz Cwik
Yes, using T makes sense.

The WindowedValue was meant to be a context object in the SDK harness that
propagates various information about the current element. We have discussed
in the past about:
* making optimizations which would pass around less of the context
information if we know that the DoFns don't need it (for example, all the
values share the same window).
* versioning the encoding separately from the WindowedValue context object
(see recent discussion about element timestamp precision [1])
* the runner may want its own representation of a context object that makes
sense for it which isn't a WindowedValue necessarily.

Feel free to cut a JIRA about this and start working on a change towards
this.

1:
https://lists.apache.org/thread.html/221b06e81bba335d0ea8d770212cc7ee047dba65bec7978368a51473@%3Cdev.beam.apache.org%3E

On Fri, Apr 19, 2019 at 3:18 AM jincheng sun 
wrote:

> Hi Beam devs,
>
> I read some of the docs about `Communicating over the Fn API` in Beam. I
> feel that Beam has a very good design for Control Plane/Data Plane/State
> Plane/Logging services, and it is described in  data> document. When communicating between Runner and SDK Harness, the
> DataPlane API will be WindowedValue(An immutable triple of value,
> timestamp, and windows.) As a contract object between Runner and SDK
> Harness. I see the interface definitions for sending and receiving data in
> the code as follows:
>
> - org.apache.beam.runners.fnexecution.data.FnDataService
>
> public interface FnDataService {
>>InboundDataClient receive(LogicalEndpoint inputLocation,
>> Coder> coder, FnDataReceiver> listener);
>>CloseableFnDataReceiver> send(
>>   LogicalEndpoint outputLocation, Coder> coder);
>> }
>
>
>
> - org.apache.beam.fn.harness.data.BeamFnDataClient
>
> public interface BeamFnDataClient {
>>InboundDataClient receive(ApiServiceDescriptor
>> apiServiceDescriptor, LogicalEndpoint inputLocation,
>> Coder> coder, FnDataReceiver> receiver);
>>CloseableFnDataReceiver> send(BeamFnDataGrpcClient
>> Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint
>> outputLocation, Coder> coder);
>> }
>
>
> Both `Coder>` and `FnDataReceiver>` use
> `WindowedValue` as the data structure that both sides of Runner and SDK
> Harness know each other. Control Plane/Data Plane/State Plane/Logging is a
> highly abstraction, such as Control Plane and Logging, these are common
> requirements for all multi-language platforms. For example, the Flink
> community is also discussing how to support Python UDF, as well as how to
> deal with docker environment. how to data transfer, how to state access,
> how to logging etc. If Beam can further abstract these service interfaces,
> i.e., interface definitions are compatible with multiple engines, and
> finally provided to other projects in the form of class libraries, it
> definitely will help other platforms that want to support multiple
> languages. So could beam can further abstract the interface definition of
> FnDataService's BeamFnDataClient? Here I am to throw out a minnow to catch
> a whale, take the FnDataService#receive interface as an example, and turn
> `WindowedValue` into `T` so that other platforms can be extended
> arbitrarily, as follows:
>
>  InboundDataClient receive(LogicalEndpoint inputLocation, Coder
> coder, FnDataReceiver> listener);
>
> What do you think?
>
> Feel free to correct me if there any incorrect understanding. And welcome
> any feedback!
>
>
> Regards,
> Jincheng
>


[DISCUSS] Turn `WindowedValue` into `T` in the FnDataService and BeamFnDataClient interface definition

2019-04-19 Thread jincheng sun
Hi Beam devs,

I read some of the docs about `Communicating over the Fn API` in Beam. I
feel that Beam has a very good design for Control Plane/Data Plane/State
Plane/Logging services, and it is described in  document. When communicating between Runner and SDK Harness, the
DataPlane API will be WindowedValue(An immutable triple of value,
timestamp, and windows.) As a contract object between Runner and SDK
Harness. I see the interface definitions for sending and receiving data in
the code as follows:

- org.apache.beam.runners.fnexecution.data.FnDataService

public interface FnDataService {
>InboundDataClient receive(LogicalEndpoint inputLocation,
> Coder> coder, FnDataReceiver> listener);
>CloseableFnDataReceiver> send(
>   LogicalEndpoint outputLocation, Coder> coder);
> }



- org.apache.beam.fn.harness.data.BeamFnDataClient

public interface BeamFnDataClient {
>InboundDataClient receive(ApiServiceDescriptor apiServiceDescriptor,
> LogicalEndpoint inputLocation, Coder> coder,
> FnDataReceiver> receiver);
>CloseableFnDataReceiver> send(BeamFnDataGrpcClient
> Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint
> outputLocation, Coder> coder);
> }


Both `Coder>` and `FnDataReceiver>` use
`WindowedValue` as the data structure that both sides of Runner and SDK
Harness know each other. Control Plane/Data Plane/State Plane/Logging is a
highly abstraction, such as Control Plane and Logging, these are common
requirements for all multi-language platforms. For example, the Flink
community is also discussing how to support Python UDF, as well as how to
deal with docker environment. how to data transfer, how to state access,
how to logging etc. If Beam can further abstract these service interfaces,
i.e., interface definitions are compatible with multiple engines, and
finally provided to other projects in the form of class libraries, it
definitely will help other platforms that want to support multiple
languages. So could beam can further abstract the interface definition of
FnDataService's BeamFnDataClient? Here I am to throw out a minnow to catch
a whale, take the FnDataService#receive interface as an example, and turn
`WindowedValue` into `T` so that other platforms can be extended
arbitrarily, as follows:

 InboundDataClient receive(LogicalEndpoint inputLocation, Coder
coder, FnDataReceiver> listener);

What do you think?

Feel free to correct me if there any incorrect understanding. And welcome
any feedback!


Regards,
Jincheng