Re: Multi Environment Support

2021-12-15 Thread Ke Wu
Thanks for the pointer! I missed the part that pipeline’s options is copied 
from expansion options! 

Then, I think I just need a way to disable the auto creation of expansion 
service during job server start up.


> On Dec 15, 2021, at 11:29 AM, Chamikara Jayalath  wrote:
> 
> 
> 
> On Wed, Dec 15, 2021 at 10:38 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thanks for Cham to chime in, having each expansion service instance to be 
> able to serve a distinct environment which is configurable is what I am 
> looking for, however, I don’t think it is practical atm.
> 
> The pipeline option it uses to register environment is NOT the pipeline 
> option of expansion service but the option of the pipeline that is created 
> with default, i.e. not configurable.
> 
> 
> AFAICT we get the options object to register the environment from the 
> Pipeline  here: 
> https://github.com/apache/beam/blob/bc82f6f84e72c520be397977edf4bf645782ac8f/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L527
>  
> <https://github.com/apache/beam/blob/bc82f6f84e72c520be397977edf4bf645782ac8f/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L527>
> 
> We set a copy of the input options passed to the expansion service in the 
> pipeline object here: 
> https://github.com/apache/beam/blob/bc82f6f84e72c520be397977edf4bf645782ac8f/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L583
>  
> <https://github.com/apache/beam/blob/bc82f6f84e72c520be397977edf4bf645782ac8f/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L583>
> 
> So probably you just need to make sure the options you need for configuring 
> the environment are copied over when creating "effectiveOpts". Which specific 
> options are you trying to use (assuming Java expansion service) ?
> 
> Thanks,
> Cham
> 
> 
>> On Dec 14, 2021, at 8:49 PM, Chamikara Jayalath > <mailto:chamik...@google.com>> wrote:
>> 
>> 
>> 
>> On Mon, Dec 13, 2021 at 11:12 AM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Agreed, that is why I was hesitant to complicate ServerConfiguration such 
>> that it can configure the expansion service it brings up.
>> 
>> In this case, do you think we may just add one more option to disable the 
>> expansion service setup so for complicated use cases, expansion service can 
>> be brought up manually with desired options?
>> 
>> Another question is, if we update java Expansion service to use its own 
>> option instead of blindly uses pipeline’s option [1], will this cause any 
>> backward compatibility concern?
>> 
>> I haven't fully followed the thread but this indeed will be a backwards 
>> incompatible change. I believe currently we assume each expansion service 
>> instance to serve a distinct environment. Also we artificially limit the set 
>> of PipelineOptions that apply performing the expansion [1] since arbitrary 
>> PipelineOptions specified through the ExpansionService do not get propagated 
>> to the SDK that submits the pipeline to the runner. But we can expand this 
>> list as needed so that the environment of each expansion service is fully 
>> configurable through PipelineOptions
>> 
>> Thanks,
>> Cham
>> 
>> [1] 
>> https://github.com/apache/beam/blob/f9b0d6d6290436ff2a357fcebff8a281cc383025/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L572
>>  
>> <https://github.com/apache/beam/blob/f9b0d6d6290436ff2a357fcebff8a281cc383025/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L572>
>> 
>> 
>> Best,
>> Ke
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L528
>>  
>> <https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L528>
>> 
>> pipeline.getOptions() -> this.pipelineOptions
>> 
>>> On Dec 13, 2021, at 10:40 AM, Robert Bradshaw >> <mailto:rober...@google.com>> wrote:
>>> 
>>> I think bringing up the expansion service as part of the job server is
>>> just a convenience for the simple case.
>>> 
>>> On Mon, Dec 13, 2021 at 9:54 AM Ke Wu >> <mailto:ke.wu...@gmail.com>> wrote:
>>>> 
>>>> Awesome, I am

Re: Multi Environment Support

2021-12-15 Thread Ke Wu
Thanks for Cham to chime in, having each expansion service instance to be able 
to serve a distinct environment which is configurable is what I am looking for, 
however, I don’t think it is practical atm.

The pipeline option it uses to register environment is NOT the pipeline option 
of expansion service but the option of the pipeline that is created with 
default, i.e. not configurable.

> On Dec 14, 2021, at 8:49 PM, Chamikara Jayalath  wrote:
> 
> 
> 
> On Mon, Dec 13, 2021 at 11:12 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Agreed, that is why I was hesitant to complicate ServerConfiguration such 
> that it can configure the expansion service it brings up.
> 
> In this case, do you think we may just add one more option to disable the 
> expansion service setup so for complicated use cases, expansion service can 
> be brought up manually with desired options?
> 
> Another question is, if we update java Expansion service to use its own 
> option instead of blindly uses pipeline’s option [1], will this cause any 
> backward compatibility concern?
> 
> I haven't fully followed the thread but this indeed will be a backwards 
> incompatible change. I believe currently we assume each expansion service 
> instance to serve a distinct environment. Also we artificially limit the set 
> of PipelineOptions that apply performing the expansion [1] since arbitrary 
> PipelineOptions specified through the ExpansionService do not get propagated 
> to the SDK that submits the pipeline to the runner. But we can expand this 
> list as needed so that the environment of each expansion service is fully 
> configurable through PipelineOptions
> 
> Thanks,
> Cham
> 
> [1] 
> https://github.com/apache/beam/blob/f9b0d6d6290436ff2a357fcebff8a281cc383025/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L572
>  
> <https://github.com/apache/beam/blob/f9b0d6d6290436ff2a357fcebff8a281cc383025/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L572>
> 
> 
> Best,
> Ke
> 
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L528
>  
> <https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L528>
> 
> pipeline.getOptions() -> this.pipelineOptions
> 
>> On Dec 13, 2021, at 10:40 AM, Robert Bradshaw > <mailto:rober...@google.com>> wrote:
>> 
>> I think bringing up the expansion service as part of the job server is
>> just a convenience for the simple case.
>> 
>> On Mon, Dec 13, 2021 at 9:54 AM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>>> 
>>> Awesome, I am happy to make the update.
>>> 
>>> One more question, I noticed that expansion service is currently being 
>>> brought up as part of job server in Java [1], if this is the preferred 
>>> approach, does it mean, we should update ServerConfiguration to include 
>>> such configurations, like expansion server port [2]
>>> 
>>> Best,
>>> Ke
>>> 
>>> [1] 
>>> https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L58
>>>  
>>> <https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L58>
>>> [2] 
>>> https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L88
>>>  
>>> <https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L88>
>>> 
>>> 
>>> On Dec 9, 2021, at 4:34 PM, Robert Bradshaw >> <mailto:rober...@google.com>> wrote:
>>> 
>>> An expansion service should be able to specify its own environment, as
>>> is done in Python, not blindly inherit it from the caller. Java should
>>> be updated to have the same flexibility.
>>> 
>>> On Thu, Dec 9, 2021 at 4:27 PM Ke Wu >> <mailto:ke.wu...@gmail.com>> wrote:
>>> 
>>> 
>>> Hi Robert,
>>> 
>>> After some more digging and exploration, I realized that it is not clean 
>>> and straightforward to update PipelineOption to support such.
>>> 
>>> Therefore, I spent more time exploring the external transform & expansion 
>>> service

Re: Multi Environment Support

2021-12-13 Thread Ke Wu
Agreed, that is why I was hesitant to complicate ServerConfiguration such that 
it can configure the expansion service it brings up.

In this case, do you think we may just add one more option to disable the 
expansion service setup so for complicated use cases, expansion service can be 
brought up manually with desired options?

Another question is, if we update java Expansion service to use its own option 
instead of blindly uses pipeline’s option [1], will this cause any backward 
compatibility concern?

Best,
Ke

[1] 
https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L528
 
<https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L528>

pipeline.getOptions() -> this.pipelineOptions

> On Dec 13, 2021, at 10:40 AM, Robert Bradshaw  wrote:
> 
> I think bringing up the expansion service as part of the job server is
> just a convenience for the simple case.
> 
> On Mon, Dec 13, 2021 at 9:54 AM Ke Wu  wrote:
>> 
>> Awesome, I am happy to make the update.
>> 
>> One more question, I noticed that expansion service is currently being 
>> brought up as part of job server in Java [1], if this is the preferred 
>> approach, does it mean, we should update ServerConfiguration to include such 
>> configurations, like expansion server port [2]
>> 
>> Best,
>> Ke
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L58
>> [2] 
>> https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L88
>> 
>> 
>> On Dec 9, 2021, at 4:34 PM, Robert Bradshaw  wrote:
>> 
>> An expansion service should be able to specify its own environment, as
>> is done in Python, not blindly inherit it from the caller. Java should
>> be updated to have the same flexibility.
>> 
>> On Thu, Dec 9, 2021 at 4:27 PM Ke Wu  wrote:
>> 
>> 
>> Hi Robert,
>> 
>> After some more digging and exploration, I realized that it is not clean and 
>> straightforward to update PipelineOption to support such.
>> 
>> Therefore, I spent more time exploring the external transform & expansion 
>> service route. However, I noticed that this approach has the same limitation 
>> where we cannot configure environments ourselves.
>> 
>> This is because when ExpansionService register environment [1], it uses the 
>> pipeline options of the original pipeline. This limitation not only restrict 
>> our capability to configure a different external environment in external 
>> mode but also restrict Docker mode to specify docker image for different 
>> SDKs but use defaults.
>> 
>> Is this expected or do you think ExpansionService should be updated to be 
>> configurable of the environment it registers ?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1]
>> 
>> Java: 
>> https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L526
>> 
>> Python:
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py#L39
>> 
>> 
>> 
>> On Nov 17, 2021, at 1:02 PM, Robert Bradshaw  wrote:
>> 
>> Could you give a concrete example of what such a specification would look 
>> like?
>> 
>> On Wed, Nov 17, 2021 at 11:05 AM Ke Wu  wrote:
>> 
>> 
>> Hi Robert,
>> 
>> Thanks for the pointer, using expansion service hack seems to work!
>> 
>> On the other hand, since PipelineOptions is the place to configure external 
>> service address anyway, do you think it makes sense to expand it so it is 
>> capable of specifying multiple external environment to external service 
>> address mapping?
>> 
>> Best,
>> Ke
>> 
>> On Oct 6, 2021, at 2:09 PM, Robert Bradshaw  wrote:
>> 
>> On Wed, Oct 6, 2021 at 1:12 PM Ke Wu  wrote:
>> 
>> 
>> I have a quick follow up questions.
>> 
>> When using multiple external environments, is there a way to configure the 
>> multiple external service address? It looks like the current PipelineOptions 
>> only supports specifying one external address.
>> 
>> 
>> PipelineOptions wasn't really built with the idea of multiple distinct
>> environments in mind. One hack you could do is put one of the
>> environments in an ex

Re: Multi Environment Support

2021-12-13 Thread Ke Wu
Awesome, I am happy to make the update.

One more question, I noticed that expansion service is currently being brought 
up as part of job server in Java [1], if this is the preferred approach, does 
it mean, we should update ServerConfiguration to include such configurations, 
like expansion server port [2]

Best,
Ke

[1] 
https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L58
 
<https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L58>
 
[2] 
https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L88
 
<https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L88>
 


> On Dec 9, 2021, at 4:34 PM, Robert Bradshaw  wrote:
> 
> An expansion service should be able to specify its own environment, as
> is done in Python, not blindly inherit it from the caller. Java should
> be updated to have the same flexibility.
> 
> On Thu, Dec 9, 2021 at 4:27 PM Ke Wu  wrote:
>> 
>> Hi Robert,
>> 
>> After some more digging and exploration, I realized that it is not clean and 
>> straightforward to update PipelineOption to support such.
>> 
>> Therefore, I spent more time exploring the external transform & expansion 
>> service route. However, I noticed that this approach has the same limitation 
>> where we cannot configure environments ourselves.
>> 
>> This is because when ExpansionService register environment [1], it uses the 
>> pipeline options of the original pipeline. This limitation not only restrict 
>> our capability to configure a different external environment in external 
>> mode but also restrict Docker mode to specify docker image for different 
>> SDKs but use defaults.
>> 
>> Is this expected or do you think ExpansionService should be updated to be 
>> configurable of the environment it registers ?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1]
>> 
>> Java: 
>> https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L526
>> 
>> Python:
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py#L39
>> 
>> 
>> 
>> On Nov 17, 2021, at 1:02 PM, Robert Bradshaw  wrote:
>> 
>> Could you give a concrete example of what such a specification would look 
>> like?
>> 
>> On Wed, Nov 17, 2021 at 11:05 AM Ke Wu  wrote:
>> 
>> 
>> Hi Robert,
>> 
>> Thanks for the pointer, using expansion service hack seems to work!
>> 
>> On the other hand, since PipelineOptions is the place to configure external 
>> service address anyway, do you think it makes sense to expand it so it is 
>> capable of specifying multiple external environment to external service 
>> address mapping?
>> 
>> Best,
>> Ke
>> 
>> On Oct 6, 2021, at 2:09 PM, Robert Bradshaw  wrote:
>> 
>> On Wed, Oct 6, 2021 at 1:12 PM Ke Wu  wrote:
>> 
>> 
>> I have a quick follow up questions.
>> 
>> When using multiple external environments, is there a way to configure the 
>> multiple external service address? It looks like the current PipelineOptions 
>> only supports specifying one external address.
>> 
>> 
>> PipelineOptions wasn't really built with the idea of multiple distinct
>> environments in mind. One hack you could do is put one of the
>> environments in an expansion service with its own environment (as if
>> it were written in a different language) and configure that
>> environment separately.
>> 
>> On Oct 4, 2021, at 4:12 PM, Ke Wu  wrote:
>> 
>> This is great, let me try it out.
>> 
>> Best,
>> Ke
>> 
>> On Sep 30, 2021, at 6:06 PM, Robert Bradshaw  wrote:
>> 
>> On Thu, Sep 30, 2021 at 6:00 PM Ke Wu  wrote:
>> 
>> 
>> I am able to annotate/mark a java transform by setting its resource hints 
>> [1] as well, which resulted in a different environment id, e.g.
>> 
>> beam:env:docker:v1 VS beam:env:docker:v11
>> 
>> Is this on the right track?
>> 
>> 
>> Yep.
>> 
>> If Yes, I suppose then I need to configure job bundle factory to be able to 
>> understand multiple environments and configure them separately as well.
>> 
>> 
>> It should already do the right thing here.

Re: Multi Environment Support

2021-12-09 Thread Ke Wu
Hi Robert,

After some more digging and exploration, I realized that it is not clean and 
straightforward to update PipelineOption to support such. 

Therefore, I spent more time exploring the external transform & expansion 
service route. However, I noticed that this approach has the same limitation 
where we cannot configure environments   ourselves.

This is because when ExpansionService register environment [1], it uses the 
pipeline options of the original pipeline. This limitation not only restrict 
our capability to configure a different external environment in external mode 
but also restrict Docker mode to specify docker image for different SDKs but 
use defaults. 

Is this expected or do you think ExpansionService should be updated to be 
configurable of the environment it registers ?

Best,
Ke


[1]

Java: 
https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L526
 
<https://github.com/apache/beam/blob/master/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L526>
 

Python: 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py#L39
 
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service.py#L39>



> On Nov 17, 2021, at 1:02 PM, Robert Bradshaw  wrote:
> 
> Could you give a concrete example of what such a specification would look 
> like?
> 
> On Wed, Nov 17, 2021 at 11:05 AM Ke Wu  wrote:
>> 
>> Hi Robert,
>> 
>> Thanks for the pointer, using expansion service hack seems to work!
>> 
>> On the other hand, since PipelineOptions is the place to configure external 
>> service address anyway, do you think it makes sense to expand it so it is 
>> capable of specifying multiple external environment to external service 
>> address mapping?
>> 
>> Best,
>> Ke
>> 
>>> On Oct 6, 2021, at 2:09 PM, Robert Bradshaw  wrote:
>>> 
>>> On Wed, Oct 6, 2021 at 1:12 PM Ke Wu  wrote:
>>>> 
>>>> I have a quick follow up questions.
>>>> 
>>>> When using multiple external environments, is there a way to configure the 
>>>> multiple external service address? It looks like the current 
>>>> PipelineOptions only supports specifying one external address.
>>> 
>>> PipelineOptions wasn't really built with the idea of multiple distinct
>>> environments in mind. One hack you could do is put one of the
>>> environments in an expansion service with its own environment (as if
>>> it were written in a different language) and configure that
>>> environment separately.
>>> 
>>>>> On Oct 4, 2021, at 4:12 PM, Ke Wu  wrote:
>>>>> 
>>>>> This is great, let me try it out.
>>>>> 
>>>>> Best,
>>>>> Ke
>>>>> 
>>>>>> On Sep 30, 2021, at 6:06 PM, Robert Bradshaw  wrote:
>>>>>> 
>>>>>> On Thu, Sep 30, 2021 at 6:00 PM Ke Wu  wrote:
>>>>>>> 
>>>>>>> I am able to annotate/mark a java transform by setting its resource 
>>>>>>> hints [1] as well, which resulted in a different environment id, e.g.
>>>>>>> 
>>>>>>> beam:env:docker:v1 VS beam:env:docker:v11
>>>>>>> 
>>>>>>> Is this on the right track?
>>>>>> 
>>>>>> Yep.
>>>>>> 
>>>>>>> If Yes, I suppose then I need to configure job bundle factory to be 
>>>>>>> able to understand multiple environments and configure them separately 
>>>>>>> as well.
>>>>>> 
>>>>>> It should already do the right thing here. That's how multi-language 
>>>>>> works.
>>>>>> 
>>>>>>> [1] 
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java#L218
>>>>>>> 
>>>>>>> On Sep 30, 2021, at 10:34 AM, Robert Bradshaw  
>>>>>>> wrote:
>>>>>>> 
>>>>>>> On Thu, Sep 30, 2021 at 9:25 AM Ke Wu  wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> Ideally, we do not want to expose anything directly to users and we, as 
>>>>>>> the framework and platform provider, separate things out under the hood.
>>>>>>> 
>>>>>>

Re: Multi Environment Support

2021-11-17 Thread Ke Wu
Hi Robert,

Thanks for the pointer, using expansion service hack seems to work!

On the other hand, since PipelineOptions is the place to configure external 
service address anyway, do you think it makes sense to expand it so it is 
capable of specifying multiple external environment to external service address 
mapping?

Best,
Ke

> On Oct 6, 2021, at 2:09 PM, Robert Bradshaw  wrote:
> 
> On Wed, Oct 6, 2021 at 1:12 PM Ke Wu  wrote:
>> 
>> I have a quick follow up questions.
>> 
>> When using multiple external environments, is there a way to configure the 
>> multiple external service address? It looks like the current PipelineOptions 
>> only supports specifying one external address.
> 
> PipelineOptions wasn't really built with the idea of multiple distinct
> environments in mind. One hack you could do is put one of the
> environments in an expansion service with its own environment (as if
> it were written in a different language) and configure that
> environment separately.
> 
>>> On Oct 4, 2021, at 4:12 PM, Ke Wu  wrote:
>>> 
>>> This is great, let me try it out.
>>> 
>>> Best,
>>> Ke
>>> 
>>>> On Sep 30, 2021, at 6:06 PM, Robert Bradshaw  wrote:
>>>> 
>>>> On Thu, Sep 30, 2021 at 6:00 PM Ke Wu  wrote:
>>>>> 
>>>>> I am able to annotate/mark a java transform by setting its resource hints 
>>>>> [1] as well, which resulted in a different environment id, e.g.
>>>>> 
>>>>> beam:env:docker:v1 VS beam:env:docker:v11
>>>>> 
>>>>> Is this on the right track?
>>>> 
>>>> Yep.
>>>> 
>>>>> If Yes, I suppose then I need to configure job bundle factory to be able 
>>>>> to understand multiple environments and configure them separately as well.
>>>> 
>>>> It should already do the right thing here. That's how multi-language works.
>>>> 
>>>>> [1] 
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java#L218
>>>>> 
>>>>> On Sep 30, 2021, at 10:34 AM, Robert Bradshaw  wrote:
>>>>> 
>>>>> On Thu, Sep 30, 2021 at 9:25 AM Ke Wu  wrote:
>>>>> 
>>>>> 
>>>>> Ideally, we do not want to expose anything directly to users and we, as 
>>>>> the framework and platform provider, separate things out under the hood.
>>>>> 
>>>>> I would expect users to author their DoFn(s) in the same way as they do 
>>>>> right now, but we expect to change the DoFn(s) that we provide, will be 
>>>>> annotated/marked so that it can be recognized during runtime.
>>>>> 
>>>>> In our use case, application is executed in Kubernetes environment 
>>>>> therefore, we are expecting to directly use different docker image to 
>>>>> isolate dependencies.
>>>>> 
>>>>> e.g. we have docker image A, which is beam core, that is used to start 
>>>>> job server and runner process. We have a docker image B, which contains 
>>>>> DoFn(s) that platform provides to serve as a external worker pool service 
>>>>> to execute platform provided DoFn(s), last but not least, users would 
>>>>> have their own docker image represent their application, which will be 
>>>>> used to start the external worker pool service to handle their own UDF 
>>>>> execution.
>>>>> 
>>>>> Does this make sense ?
>>>>> 
>>>>> 
>>>>> In Python it's pretty trivial to annotate transforms (e.g. the
>>>>> "platform" transforms) which could be used to mark their environments
>>>>> prior to optimization (e.g. fusion). As mentioned, you could use
>>>>> resource hints (even a "dummy" hint like
>>>>> "use_platform_environment=True") to force these into a separate docker
>>>>> image as well.
>>>>> 
>>>>> On Sep 29, 2021, at 1:09 PM, Luke Cwik  wrote:
>>>>> 
>>>>> That sounds neat. I think that before you try to figure out how to change 
>>>>> Beam to fit this usecase is to think about what would be the best way for 
>>>>> users to specify these requirements when they are constructing the 
>>>>> pipeline. Once you have some samples that you could share the community 
>>>>> would probably be able to 

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-07 Thread Ke Wu
@Robert @Luke @Jan, could you help take an early look at 
https://github.com/apache/beam/pull/15665 
<https://github.com/apache/beam/pull/15665> there are some test failures that 
are to be resolved, but most likely not related to the PR because I observes 
the same failures across PRs.



> On Oct 6, 2021, at 11:03 AM, Robert Burke  wrote:
> 
> The GoSDK handles the urn as unkeyed. 
> 
> That is, reshuffling a PCollection will ignore the keys, and produce a 
> PCollection> with the random keys. This would split user keys up 
> to multiple partitions. This is the same as though it were unkeyed.
> 
> Doing anything with the user key specifically would seem to me to defeat the 
> point of a reshuffle, vs just using a GBK which would align keys to bundles 
> in it's output.
> 
> 
> On Wed, Oct 6, 2021, 10:54 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
>> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
>> with a single key and the use case there would benefit from being replaced 
>> with GroupIntoBatches instead.
> 
> I think there are more use cases for keyed reshuffle , e.g. in Samza runner, 
> it is also used when we rekeyed elements, in addition, since states are 
> partitioned by key, so it is important to reshuffle after a PCollection is 
> assigned with a different key so elements with the same new key could end up 
> in the same partition.
> 
>> I believe the intent was for the existing reshuffle URN to represent the 
>> keyed variant. This was since the unkeyed reshuffle was a composite built on 
>> top of the keyed reshuffle in the Java SDK. The existing overrides in 
>> Flink/Spark/Samza confirm this.
> 
> I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are 
> authored in Java, which is expecting keyed Reshuffle.
> 
>> I believe the intent was for the existing reshuffle URN to represent the 
>> keyed variant.
>> And from the Python side I thought the intent was for the reshuffle
>> URN to represent the unkeyed variant, as the keyed one isn't anything
>> novel
> 
> 
> This is exactly what is confusing, the same urn is currently representing 
> keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. 
> @Luke do you think it makes since to have two separately Urns representing 
> two different reshuffles? Unkeyed reshuffle is still expected to be a 
> composite transform of keyed transform and runners can decided which 
> (keyed/unkeyd) reshuffle they want to translate.
> 
> Best,
> Ke
> 
>> On Oct 6, 2021, at 10:38 AM, Reuven Lax > <mailto:re...@google.com>> wrote:
>> 
>> I think it's used with the destination as a key, no? In various places 
>> Reshuffle is used as a standin for RequiresStableInput
>> 
>> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik > <mailto:lc...@google.com>> wrote:
>> I believe the intent was for the existing reshuffle URN to represent the 
>> keyed variant. This was since the unkeyed reshuffle was a composite built on 
>> top of the keyed reshuffle in the Java SDK. The existing overrides in 
>> Flink/Spark/Samza confirm this.
>> 
>> Thinking about this more I wish we had went only with the unkeyed variant as 
>> I don't know how much benefit users get from having their records grouped by 
>> the key they choose and it also limits the optimization capabilities of the 
>> runner a lot as to how to materialize the data.
>> 
>> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
>> with a single key and the use case there would benefit from being replaced 
>> with GroupIntoBatches instead.
>> 
>> 
>> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke > <mailto:rob...@frantil.com>> wrote:
>> I can handle the Go SDK change once the urn is decided. I'm cleaning up a 
>> change to add the combine_global urn in the Go SDK so this can slip in along 
>> side it.
>> 
>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Created https://issues.apache.org/jira/browse/BEAM-12999 
>> <https://issues.apache.org/jira/browse/BEAM-12999> 
>> 
>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw >> <mailto:rober...@google.com>> wrote:
>>> 
>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>> 
>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu >> <mailto:ke.wu...@gmail.com>> wrote:
>>>> 
>>>> Let me add two new urns representing reshuffle via random key and 
>>>> reshuffle using key. I will share the PR later here, would need some help 
>>>> on

Re: Multi Environment Support

2021-10-06 Thread Ke Wu
I have a quick follow up questions. 

When using multiple external environments, is there a way to configure the 
multiple external service address? It looks like the current PipelineOptions 
only supports specifying one external address.

Best,
Ke

> On Oct 4, 2021, at 4:12 PM, Ke Wu  wrote:
> 
> This is great, let me try it out.
> 
> Best,
> Ke
> 
>> On Sep 30, 2021, at 6:06 PM, Robert Bradshaw  wrote:
>> 
>> On Thu, Sep 30, 2021 at 6:00 PM Ke Wu  wrote:
>>> 
>>> I am able to annotate/mark a java transform by setting its resource hints 
>>> [1] as well, which resulted in a different environment id, e.g.
>>> 
>>> beam:env:docker:v1 VS beam:env:docker:v11
>>> 
>>> Is this on the right track?
>> 
>> Yep.
>> 
>>> If Yes, I suppose then I need to configure job bundle factory to be able to 
>>> understand multiple environments and configure them separately as well.
>> 
>> It should already do the right thing here. That's how multi-language works.
>> 
>>> [1] 
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java#L218
>>> 
>>> On Sep 30, 2021, at 10:34 AM, Robert Bradshaw  wrote:
>>> 
>>> On Thu, Sep 30, 2021 at 9:25 AM Ke Wu  wrote:
>>> 
>>> 
>>> Ideally, we do not want to expose anything directly to users and we, as the 
>>> framework and platform provider, separate things out under the hood.
>>> 
>>> I would expect users to author their DoFn(s) in the same way as they do 
>>> right now, but we expect to change the DoFn(s) that we provide, will be 
>>> annotated/marked so that it can be recognized during runtime.
>>> 
>>> In our use case, application is executed in Kubernetes environment 
>>> therefore, we are expecting to directly use different docker image to 
>>> isolate dependencies.
>>> 
>>> e.g. we have docker image A, which is beam core, that is used to start job 
>>> server and runner process. We have a docker image B, which contains DoFn(s) 
>>> that platform provides to serve as a external worker pool service to 
>>> execute platform provided DoFn(s), last but not least, users would have 
>>> their own docker image represent their application, which will be used to 
>>> start the external worker pool service to handle their own UDF execution.
>>> 
>>> Does this make sense ?
>>> 
>>> 
>>> In Python it's pretty trivial to annotate transforms (e.g. the
>>> "platform" transforms) which could be used to mark their environments
>>> prior to optimization (e.g. fusion). As mentioned, you could use
>>> resource hints (even a "dummy" hint like
>>> "use_platform_environment=True") to force these into a separate docker
>>> image as well.
>>> 
>>> On Sep 29, 2021, at 1:09 PM, Luke Cwik  wrote:
>>> 
>>> That sounds neat. I think that before you try to figure out how to change 
>>> Beam to fit this usecase is to think about what would be the best way for 
>>> users to specify these requirements when they are constructing the 
>>> pipeline. Once you have some samples that you could share the community 
>>> would probably be able to give you more pointed advice.
>>> For example will they be running one application with a complicated class 
>>> loader setup, if so then we could probably do away with multiple 
>>> environments and try to have DoFn's recognize their specific class loader 
>>> configuration and replicate it on the SDK harness side.
>>> 
>>> Also, for performance reasons users may want to resolve their dependency 
>>> issues to create a maximally fused graph to limit performance impact due to 
>>> the encoding/decoding boundaries at the edges of those fused graphs.
>>> 
>>> Finally, this could definitely apply to languages like Python and Go (now 
>>> that Go has support for modules) as dependency issues are a common problem.
>>> 
>>> 
>>> On Wed, Sep 29, 2021 at 11:47 AM Ke Wu  wrote:
>>> 
>>> 
>>> Thanks for the advice.
>>> 
>>> Here are some more background:
>>> 
>>> We are building a feature called “split deployment” such that, we can 
>>> isolate framework/platform core from user code/dependencies to address 
>>> couple of operational challenges such as dependency conflict, 
>>> alert/exception triaging.
>>> 
>>> With Beam’s po

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-06 Thread Ke Wu
> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
> with a single key and the use case there would benefit from being replaced 
> with GroupIntoBatches instead.

I think there are more use cases for keyed reshuffle , e.g. in Samza runner, it 
is also used when we rekeyed elements, in addition, since states are 
partitioned by key, so it is important to reshuffle after a PCollection is 
assigned with a different key so elements with the same new key could end up in 
the same partition.

> I believe the intent was for the existing reshuffle URN to represent the 
> keyed variant. This was since the unkeyed reshuffle was a composite built on 
> top of the keyed reshuffle in the Java SDK. The existing overrides in 
> Flink/Spark/Samza confirm this.

I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are 
authored in Java, which is expecting keyed Reshuffle.

> I believe the intent was for the existing reshuffle URN to represent the 
> keyed variant.
> And from the Python side I thought the intent was for the reshuffle
> URN to represent the unkeyed variant, as the keyed one isn't anything
> novel


This is exactly what is confusing, the same urn is currently representing keyed 
reshuffle in Java SDK but unkeyed reshuffle in Python SDK. 
@Luke do you think it makes since to have two separately Urns representing two 
different reshuffles? Unkeyed reshuffle is still expected to be a composite 
transform of keyed transform and runners can decided which (keyed/unkeyd) 
reshuffle they want to translate.

Best,
Ke

> On Oct 6, 2021, at 10:38 AM, Reuven Lax  wrote:
> 
> I think it's used with the destination as a key, no? In various places 
> Reshuffle is used as a standin for RequiresStableInput
> 
> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik  <mailto:lc...@google.com>> wrote:
> I believe the intent was for the existing reshuffle URN to represent the 
> keyed variant. This was since the unkeyed reshuffle was a composite built on 
> top of the keyed reshuffle in the Java SDK. The existing overrides in 
> Flink/Spark/Samza confirm this.
> 
> Thinking about this more I wish we had went only with the unkeyed variant as 
> I don't know how much benefit users get from having their records grouped by 
> the key they choose and it also limits the optimization capabilities of the 
> runner a lot as to how to materialize the data.
> 
> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
> with a single key and the use case there would benefit from being replaced 
> with GroupIntoBatches instead.
> 
> 
> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke  <mailto:rob...@frantil.com>> wrote:
> I can handle the Go SDK change once the urn is decided. I'm cleaning up a 
> change to add the combine_global urn in the Go SDK so this can slip in along 
> side it.
> 
> On Mon, Oct 4, 2021, 3:57 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Created https://issues.apache.org/jira/browse/BEAM-12999 
> <https://issues.apache.org/jira/browse/BEAM-12999> 
> 
>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw > <mailto:rober...@google.com>> wrote:
>> 
>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>> 
>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>>> 
>>> Let me add two new urns representing reshuffle via random key and reshuffle 
>>> using key. I will share the PR later here, would need some help on 
>>> Python/Go SDK changes too since I am not very familiar with them.
>>> 
>>> Best,
>>> Ke
>>> 
>>> 
>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw >> <mailto:rober...@google.com>> wrote:
>>> 
>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský >> <mailto:je...@seznam.cz>> wrote:
>>> 
>>> 
>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>> 
>>> Oh, yes.
>>> 
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>> 
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>> 
>>> 
>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>> might be opinionated.
>>> 
>>> 
>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>> to me than vaiRandomKey(), but probably not worth changing so the
>>> question becomes whether to be stilted or consistent.)
>>> 
>>> More importantly - could we undeprecate Reshuffle
>>> (in

Re: Help needed in Go validates runner tests

2021-10-04 Thread Ke Wu
Thanks for looking into this! 

I am curious why `TEST_EXIT_CODE` is logged to be 0 if there are actually test 
failures, I did not looking into any test failures because of it. Do you happen 
to know if this is expected or not?

Best,
Ke

> On Oct 4, 2021, at 5:43 PM, Robert Burke  wrote:
> 
> Looking at it now that looks like it covers the failures for samza (it's 
> passing). So we can merge that in if you'll review it.
> 
> On Mon, Oct 4, 2021, 5:41 PM Robert Burke  <mailto:rob...@frantil.com>> wrote:
> I was looking into these too. Some unfiltered out tests are failing (like a 
> newly added check for post job metrics, is getting back all 0s from Samza).
> 
> In the linked case, one of the Cross Language tests failed. (Search for "--- 
> F" failing test runs.
> 
> I have a PR that I'm using to iterate on and disabled the failing tests 
> https://github.com/apache/beam/pull/15659 
> <https://github.com/apache/beam/pull/15659>
> 
> But Jenkins had not started execution after 16 minutes so i called it a day.
> 
> On Mon, Oct 4, 2021, 5:30 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello All,
> 
> Samza Go Validates Runner started fail around 10/01/2021[1], the three 
> commits that contributes to the first failure [2] looks innocent, I also 
> tried to revert them but still got the same failure. 
> 
> What is confusing is the console log:
> 
> 11:15:43 --- PASS: TestEmitParDoAfterGBK (5.70s)
> 11:15:43 PASS
> 11:15:43 ok   github.com/apache/beam/sdks/v2/go/test/regression 
> <http://github.com/apache/beam/sdks/v2/go/test/regression>38.517s
> 11:15:43 $ TEST_EXIT_CODE=0
> 11:15:43 $ cd ../..
> 11:15:43 $ exit 1
> 11:15:43 $ exit 1
> 
> Which suggests that tests all passed and expected to exit with code 0, 
> however, for some reason, the go script exited 1 instead [3], however, 
> reading from the script, if TEST_EXIT_CODE printed out to be 0, then it is 
> supposed to be exited 0.
> 
> Does anyone have idea why this happens?
> 
> Best,
> Ke
> 
> 
> [1] https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/ 
> <https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/> 
> [2] https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/319/ 
> <https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/319/> 
> [3] 
> https://github.com/apache/beam/blob/master/sdks/go/test/run_validatesrunner_tests.sh#L399
>  
> <https://github.com/apache/beam/blob/master/sdks/go/test/run_validatesrunner_tests.sh#L399>
>  



Help needed in Go validates runner tests

2021-10-04 Thread Ke Wu
Hello All,

Samza Go Validates Runner started fail around 10/01/2021[1], the three commits 
that contributes to the first failure [2] looks innocent, I also tried to 
revert them but still got the same failure. 

What is confusing is the console log:

11:15:43 --- PASS: TestEmitParDoAfterGBK (5.70s)
11:15:43 PASS
11:15:43 ok github.com/apache/beam/sdks/v2/go/test/regression   38.517s
11:15:43 $ TEST_EXIT_CODE=0
11:15:43 $ cd ../..
11:15:43 $ exit 1
11:15:43 $ exit 1

Which suggests that tests all passed and expected to exit with code 0, however, 
for some reason, the go script exited 1 instead [3], however, reading from the 
script, if TEST_EXIT_CODE printed out to be 0, then it is supposed to be exited 
0.

Does anyone have idea why this happens?

Best,
Ke


[1] https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/ 
 
[2] https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Samza/319/ 
 
[3] 
https://github.com/apache/beam/blob/master/sdks/go/test/run_validatesrunner_tests.sh#L399
 

 

Re: Multi Environment Support

2021-10-04 Thread Ke Wu
This is great, let me try it out.

Best,
Ke

> On Sep 30, 2021, at 6:06 PM, Robert Bradshaw  wrote:
> 
> On Thu, Sep 30, 2021 at 6:00 PM Ke Wu  wrote:
>> 
>> I am able to annotate/mark a java transform by setting its resource hints 
>> [1] as well, which resulted in a different environment id, e.g.
>> 
>> beam:env:docker:v1 VS beam:env:docker:v11
>> 
>> Is this on the right track?
> 
> Yep.
> 
>> If Yes, I suppose then I need to configure job bundle factory to be able to 
>> understand multiple environments and configure them separately as well.
> 
> It should already do the right thing here. That's how multi-language works.
> 
>> [1] 
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java#L218
>> 
>> On Sep 30, 2021, at 10:34 AM, Robert Bradshaw  wrote:
>> 
>> On Thu, Sep 30, 2021 at 9:25 AM Ke Wu  wrote:
>> 
>> 
>> Ideally, we do not want to expose anything directly to users and we, as the 
>> framework and platform provider, separate things out under the hood.
>> 
>> I would expect users to author their DoFn(s) in the same way as they do 
>> right now, but we expect to change the DoFn(s) that we provide, will be 
>> annotated/marked so that it can be recognized during runtime.
>> 
>> In our use case, application is executed in Kubernetes environment 
>> therefore, we are expecting to directly use different docker image to 
>> isolate dependencies.
>> 
>> e.g. we have docker image A, which is beam core, that is used to start job 
>> server and runner process. We have a docker image B, which contains DoFn(s) 
>> that platform provides to serve as a external worker pool service to execute 
>> platform provided DoFn(s), last but not least, users would have their own 
>> docker image represent their application, which will be used to start the 
>> external worker pool service to handle their own UDF execution.
>> 
>> Does this make sense ?
>> 
>> 
>> In Python it's pretty trivial to annotate transforms (e.g. the
>> "platform" transforms) which could be used to mark their environments
>> prior to optimization (e.g. fusion). As mentioned, you could use
>> resource hints (even a "dummy" hint like
>> "use_platform_environment=True") to force these into a separate docker
>> image as well.
>> 
>> On Sep 29, 2021, at 1:09 PM, Luke Cwik  wrote:
>> 
>> That sounds neat. I think that before you try to figure out how to change 
>> Beam to fit this usecase is to think about what would be the best way for 
>> users to specify these requirements when they are constructing the pipeline. 
>> Once you have some samples that you could share the community would probably 
>> be able to give you more pointed advice.
>> For example will they be running one application with a complicated class 
>> loader setup, if so then we could probably do away with multiple 
>> environments and try to have DoFn's recognize their specific class loader 
>> configuration and replicate it on the SDK harness side.
>> 
>> Also, for performance reasons users may want to resolve their dependency 
>> issues to create a maximally fused graph to limit performance impact due to 
>> the encoding/decoding boundaries at the edges of those fused graphs.
>> 
>> Finally, this could definitely apply to languages like Python and Go (now 
>> that Go has support for modules) as dependency issues are a common problem.
>> 
>> 
>> On Wed, Sep 29, 2021 at 11:47 AM Ke Wu  wrote:
>> 
>> 
>> Thanks for the advice.
>> 
>> Here are some more background:
>> 
>> We are building a feature called “split deployment” such that, we can 
>> isolate framework/platform core from user code/dependencies to address 
>> couple of operational challenges such as dependency conflict, 
>> alert/exception triaging.
>> 
>> With Beam’s portability framework, runner and sdk worker process naturally 
>> decouples beam core and user UDFs(DoFn), which is awesome! On top of this, 
>> we could further distinguish DoFn(s) that end user authors from DoFn(s) that 
>> platform provides, therefore, we would like these DoFn(s) to be executed in 
>> different environments, even in the same language, e.g. Java.
>> 
>> Therefore, I am exploring approaches and recommendations what are the proper 
>> way to do that.
>> 
>> Let me know your thoughts, any feedback/advice is welcome.
>> 
>> Best,
>> Ke
>> 
>> On Sep 27, 2021, at 11:56 AM, Luke Cw

Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-04 Thread Ke Wu
Created https://issues.apache.org/jira/browse/BEAM-12999 
<https://issues.apache.org/jira/browse/BEAM-12999> 

> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw  wrote:
> 
> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
> 
> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu  wrote:
>> 
>> Let me add two new urns representing reshuffle via random key and reshuffle 
>> using key. I will share the PR later here, would need some help on Python/Go 
>> SDK changes too since I am not very familiar with them.
>> 
>> Best,
>> Ke
>> 
>> 
>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw  wrote:
>> 
>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský  wrote:
>> 
>> 
>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>> 
>> Oh, yes.
>> 
>> Java Reshuffle.of() = Python ReshufflePerKey()
>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>> 
>> We generally try to avoid this kind of discrepancy. It could make
>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>> 
>> 
>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>> might be opinionated.
>> 
>> 
>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>> to me than vaiRandomKey(), but probably not worth changing so the
>> question becomes whether to be stilted or consistent.)
>> 
>> More importantly - could we undeprecate Reshuffle
>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>> has undocumented and non-portable side-effects, but is still makes sense
>> for various use-cases (e.g. fan-out, or SDF).
>> 
>> 
>> +1
>> 
>> 
>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu  wrote:
>> 
>> I should have said that the descrepency lives in SDK not Class vs Portable.
>> 
>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input 
>> to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>> [2] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>> 
>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw  wrote:
>> 
>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>> that. This is true for both Java and Python. This shouldn't depend on
>> classic vs. portable mode. It sounds like there's an issue in
>> translation.
>> 
>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu  wrote:
>> 
>> 
>> Hello All,
>> 
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an 
>> interesting fact that Reshuffle Transform in classic pipeline requires the 
>> input to be KV while portable pipeline does not, where Reshuffle in portable 
>> mode it has an extra step to append a random key [3].
>> 
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to 
>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
>> questions on this:
>> 
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement 
>> translators for such transforms?
>> 3. If No, is this something we can improve?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> 
>> 



Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-04 Thread Ke Wu
Let me add two new urns representing reshuffle via random key and reshuffle 
using key. I will share the PR later here, would need some help on Python/Go 
SDK changes too since I am not very familiar with them.

Best,
Ke


> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw  wrote:
> 
> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský  <mailto:je...@seznam.cz>> wrote:
>> 
>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>> Oh, yes.
>>> 
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>> 
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>> 
>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>> might be opinionated.
> 
> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
> to me than vaiRandomKey(), but probably not worth changing so the
> question becomes whether to be stilted or consistent.)
> 
>> More importantly - could we undeprecate Reshuffle
>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>> has undocumented and non-portable side-effects, but is still makes sense
>> for various use-cases (e.g. fan-out, or SDF).
> 
> +1
> 
>>> 
>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu  wrote:
>>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>>> 
>>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the 
>>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>> 
>>>> 
>>>> [1] 
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>>> [2] 
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>>>> 
>>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw  wrote:
>>>> 
>>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>>> that. This is true for both Java and Python. This shouldn't depend on
>>>> classic vs. portable mode. It sounds like there's an issue in
>>>> translation.
>>>> 
>>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu  wrote:
>>>> 
>>>> 
>>>> Hello All,
>>>> 
>>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an 
>>>> interesting fact that Reshuffle Transform in classic pipeline requires the 
>>>> input to be KV while portable pipeline does not, where Reshuffle in 
>>>> portable mode it has an extra step to append a random key [3].
>>>> 
>>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to 
>>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
>>>> questions on this:
>>>> 
>>>> 1. Is such SDK/API discrepancy expected?
>>>> 2. If Yes, then, what are the advised approach for runners to implement 
>>>> translators for such transforms?
>>>> 3. If No, is this something we can improve?
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> 
>>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>>>> [3] 
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730



Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-04 Thread Ke Wu
Thanks for the confirmation.

For translation purpose, I think the issue is that

“beam:transform:reshuffle:v1” corresponds to Java Reshuffle.of() and Python 
Reshuffle(), where one is expecting KV but not the other. 

Ideally, it should be [Java Reshuffle.of() and Python ReshufflePerKey()] or 
[Java Reshuffle.viaRandomKey() and Python Reshuffle()]. In addition, there 
could be another Urn to represent the other pair. e.g. 
"beam:transform:reshuffle_per_key:v1” or 
“beam:transform:reshuffle_via_random_key:v1"

Any thoughts on this?

Best,
Ke


> On Oct 4, 2021, at 2:43 PM, Robert Bradshaw  wrote:
> 
> Oh, yes.
> 
> Java Reshuffle.of() = Python ReshufflePerKey()
> Java Reshuffle.viaRandomKey() == Python Reshuffle()
> 
> We generally try to avoid this kind of discrepancy. It could make
> sense to rename Reshuffle.of() to Reshuffle.viaKey().
> 
> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu  wrote:
>> 
>> I should have said that the descrepency lives in SDK not Class vs Portable.
>> 
>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the input 
>> to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>> [2] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122
>> 
>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw  wrote:
>> 
>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>> that. This is true for both Java and Python. This shouldn't depend on
>> classic vs. portable mode. It sounds like there's an issue in
>> translation.
>> 
>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu  wrote:
>> 
>> 
>> Hello All,
>> 
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an 
>> interesting fact that Reshuffle Transform in classic pipeline requires the 
>> input to be KV while portable pipeline does not, where Reshuffle in portable 
>> mode it has an extra step to append a random key [3].
>> 
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to 
>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
>> questions on this:
>> 
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement 
>> translators for such transforms?
>> 3. If No, is this something we can improve?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>> 
>> 



Re: Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-04 Thread Ke Wu
I should have said that the descrepency lives in SDK not Class vs Portable.

Correct me if I am wrong, Reshuffle transform in Java SDK requires the input to 
be KV [1] while Reshuffle in Python [2] and Go [3] does not.


[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53>
 
[2] 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
 
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
 
[3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 
<https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122> 

> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw  wrote:
> 
> Reshuffle is not keyed, there is a separate reshuffle-per-key for
> that. This is true for both Java and Python. This shouldn't depend on
> classic vs. portable mode. It sounds like there's an issue in
> translation.
> 
> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu  wrote:
>> 
>> Hello All,
>> 
>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an 
>> interesting fact that Reshuffle Transform in classic pipeline requires the 
>> input to be KV while portable pipeline does not, where Reshuffle in portable 
>> mode it has an extra step to append a random key [3].
>> 
>> This suggests that Reshuffle in classic mode is, sort of, equivalent to 
>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
>> questions on this:
>> 
>> 1. Is such SDK/API discrepancy expected?
>> 2. If Yes, then, what are the advised approach for runners to implement 
>> translators for such transforms?
>> 3. If No, is this something we can improve?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/
>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/
>> [3] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730



Reshuffle Discrepancy in Classic vs Portable Pipeline

2021-10-04 Thread Ke Wu
Hello All, 

Recent Samza Runner tests failure in python/xlang [1][2] reveals an interesting 
fact that Reshuffle Transform in classic pipeline requires the input to be KV 
while portable pipeline does not, where Reshuffle in portable mode it has an 
extra step to append a random key [3].

This suggests that Reshuffle in classic mode is, sort of, equivalent to 
ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
questions on this:

1. Is such SDK/API discrepancy expected?
2. If Yes, then, what are the advised approach for runners to implement 
translators for such transforms?
3. If No, is this something we can improve?

Best,
Ke


[1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ 
 
[2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ 
 
[3] 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
 

 

Re: Multi Environment Support

2021-09-30 Thread Ke Wu
I am able to annotate/mark a java transform by setting its resource hints [1] 
as well, which resulted in a different environment id, e.g. 

beam:env:docker:v1 VS beam:env:docker:v11


Is this on the right track? If Yes, I suppose then I need to configure job 
bundle factory to be able to understand multiple environments and configure 
them separately as well. 

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java#L218
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java#L218>
 

> On Sep 30, 2021, at 10:34 AM, Robert Bradshaw  wrote:
> 
> On Thu, Sep 30, 2021 at 9:25 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
>> 
>> Ideally, we do not want to expose anything directly to users and we, as the 
>> framework and platform provider, separate things out under the hood.
>> 
>> I would expect users to author their DoFn(s) in the same way as they do 
>> right now, but we expect to change the DoFn(s) that we provide, will be 
>> annotated/marked so that it can be recognized during runtime.
>> 
>> In our use case, application is executed in Kubernetes environment 
>> therefore, we are expecting to directly use different docker image to 
>> isolate dependencies.
>> 
>> e.g. we have docker image A, which is beam core, that is used to start job 
>> server and runner process. We have a docker image B, which contains DoFn(s) 
>> that platform provides to serve as a external worker pool service to execute 
>> platform provided DoFn(s), last but not least, users would have their own 
>> docker image represent their application, which will be used to start the 
>> external worker pool service to handle their own UDF execution.
>> 
>> Does this make sense ?
> 
> In Python it's pretty trivial to annotate transforms (e.g. the
> "platform" transforms) which could be used to mark their environments
> prior to optimization (e.g. fusion). As mentioned, you could use
> resource hints (even a "dummy" hint like
> "use_platform_environment=True") to force these into a separate docker
> image as well.
> 
>> On Sep 29, 2021, at 1:09 PM, Luke Cwik  wrote:
>> 
>> That sounds neat. I think that before you try to figure out how to change 
>> Beam to fit this usecase is to think about what would be the best way for 
>> users to specify these requirements when they are constructing the pipeline. 
>> Once you have some samples that you could share the community would probably 
>> be able to give you more pointed advice.
>> For example will they be running one application with a complicated class 
>> loader setup, if so then we could probably do away with multiple 
>> environments and try to have DoFn's recognize their specific class loader 
>> configuration and replicate it on the SDK harness side.
>> 
>> Also, for performance reasons users may want to resolve their dependency 
>> issues to create a maximally fused graph to limit performance impact due to 
>> the encoding/decoding boundaries at the edges of those fused graphs.
>> 
>> Finally, this could definitely apply to languages like Python and Go (now 
>> that Go has support for modules) as dependency issues are a common problem.
>> 
>> 
>> On Wed, Sep 29, 2021 at 11:47 AM Ke Wu  wrote:
>>> 
>>> Thanks for the advice.
>>> 
>>> Here are some more background:
>>> 
>>> We are building a feature called “split deployment” such that, we can 
>>> isolate framework/platform core from user code/dependencies to address 
>>> couple of operational challenges such as dependency conflict, 
>>> alert/exception triaging.
>>> 
>>> With Beam’s portability framework, runner and sdk worker process naturally 
>>> decouples beam core and user UDFs(DoFn), which is awesome! On top of this, 
>>> we could further distinguish DoFn(s) that end user authors from DoFn(s) 
>>> that platform provides, therefore, we would like these DoFn(s) to be 
>>> executed in different environments, even in the same language, e.g. Java.
>>> 
>>> Therefore, I am exploring approaches and recommendations what are the 
>>> proper way to do that.
>>> 
>>> Let me know your thoughts, any feedback/advice is welcome.
>>> 
>>> Best,
>>> Ke
>>> 
>>> On Sep 27, 2021, at 11:56 AM, Luke Cwik  wrote:
>>> 
>>> Resource hints have a limited use case and might fit your need.
>>> You could also try to use the expansion service XLang route to br

Re: Multi Environment Support

2021-09-30 Thread Ke Wu
Ideally, we do not want to expose anything directly to users and we, as the 
framework and platform provider, separate things out under the hood. 

I would expect users to author their DoFn(s) in the same way as they do right 
now, but we expect to change the DoFn(s) that we provide, will be 
annotated/marked so that it can be recognized during runtime.

In our use case, application is executed in Kubernetes environment therefore, 
we are expecting to directly use different docker image to isolate 
dependencies. 

e.g. we have docker image A, which is beam core, that is used to start job 
server and runner process. We have a docker image B, which contains DoFn(s) 
that platform provides to serve as a external worker pool service to execute 
platform provided DoFn(s), last but not least, users would have their own 
docker image represent their application, which will be used to start the 
external worker pool service to handle their own UDF execution.

Does this make sense ?

> On Sep 29, 2021, at 1:09 PM, Luke Cwik  wrote:
> 
> That sounds neat. I think that before you try to figure out how to change 
> Beam to fit this usecase is to think about what would be the best way for 
> users to specify these requirements when they are constructing the pipeline. 
> Once you have some samples that you could share the community would probably 
> be able to give you more pointed advice.
> For example will they be running one application with a complicated class 
> loader setup, if so then we could probably do away with multiple environments 
> and try to have DoFn's recognize their specific class loader configuration 
> and replicate it on the SDK harness side.
> 
> Also, for performance reasons users may want to resolve their dependency 
> issues to create a maximally fused graph to limit performance impact due to 
> the encoding/decoding boundaries at the edges of those fused graphs.
> 
> Finally, this could definitely apply to languages like Python and Go (now 
> that Go has support for modules) as dependency issues are a common problem.
> 
> 
> On Wed, Sep 29, 2021 at 11:47 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thanks for the advice.
> 
> Here are some more background:
> 
> We are building a feature called “split deployment” such that, we can isolate 
> framework/platform core from user code/dependencies to address couple of 
> operational challenges such as dependency conflict, alert/exception triaging.
> 
> With Beam’s portability framework, runner and sdk worker process naturally 
> decouples beam core and user UDFs(DoFn), which is awesome! On top of this, we 
> could further distinguish DoFn(s) that end user authors from DoFn(s) that 
> platform provides, therefore, we would like these DoFn(s) to be executed in 
> different environments, even in the same language, e.g. Java.
> 
> Therefore, I am exploring approaches and recommendations what are the proper 
> way to do that.
> 
> Let me know your thoughts, any feedback/advice is welcome. 
> 
> Best,
> Ke
> 
>> On Sep 27, 2021, at 11:56 AM, Luke Cwik > <mailto:lc...@google.com>> wrote:
>> 
>> Resource hints have a limited use case and might fit your need.
>> You could also try to use the expansion service XLang route to bring in a 
>> different Java environment.
>> Finally, you could modify the pipeline proto that is generated directly to 
>> choose which environment is used for which PTransform.
>> 
>> Can you provide additional details as to why you would want to have two 
>> separate java environments (e.g. incompatible versions of libraries)?
>> 
>> On Wed, Sep 22, 2021 at 3:41 PM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Thanks Luke for the reply, do you know what is the preferred way to 
>> configure a PTransform to be executed in a different environment from 
>> another PTransform when both are in the same SDK, e.g. Java ?
>> 
>> Best,
>> Ke
>> 
>>> On Sep 21, 2021, at 9:48 PM, Luke Cwik >> <mailto:lc...@google.com>> wrote:
>>> 
>>> Environments that aren't exactly the same are already in separate 
>>> ExecutableStages. The GreedyPCollectionFuser ensures that today[1].
>>> 
>>> Workarounds like getOnlyEnvironmentId would need to be removed. It may also 
>>> be effectively dead-code.
>>> 
>>> 1: 
>>> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java#L144
>>>  
>>> <https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/runners/core-construction-java/src/main/java/org/apache

Re: Multi Environment Support

2021-09-29 Thread Ke Wu
Thanks for the advice.

Here are some more background:

We are building a feature called “split deployment” such that, we can isolate 
framework/platform core from user code/dependencies to address couple of 
operational challenges such as dependency conflict, alert/exception triaging.

With Beam’s portability framework, runner and sdk worker process naturally 
decouples beam core and user UDFs(DoFn), which is awesome! On top of this, we 
could further distinguish DoFn(s) that end user authors from DoFn(s) that 
platform provides, therefore, we would like these DoFn(s) to be executed in 
different environments, even in the same language, e.g. Java.

Therefore, I am exploring approaches and recommendations what are the proper 
way to do that.

Let me know your thoughts, any feedback/advice is welcome. 

Best,
Ke

> On Sep 27, 2021, at 11:56 AM, Luke Cwik  wrote:
> 
> Resource hints have a limited use case and might fit your need.
> You could also try to use the expansion service XLang route to bring in a 
> different Java environment.
> Finally, you could modify the pipeline proto that is generated directly to 
> choose which environment is used for which PTransform.
> 
> Can you provide additional details as to why you would want to have two 
> separate java environments (e.g. incompatible versions of libraries)?
> 
> On Wed, Sep 22, 2021 at 3:41 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thanks Luke for the reply, do you know what is the preferred way to configure 
> a PTransform to be executed in a different environment from another 
> PTransform when both are in the same SDK, e.g. Java ?
> 
> Best,
> Ke
> 
>> On Sep 21, 2021, at 9:48 PM, Luke Cwik > <mailto:lc...@google.com>> wrote:
>> 
>> Environments that aren't exactly the same are already in separate 
>> ExecutableStages. The GreedyPCollectionFuser ensures that today[1].
>> 
>> Workarounds like getOnlyEnvironmentId would need to be removed. It may also 
>> be effectively dead-code.
>> 
>> 1: 
>> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java#L144
>>  
>> <https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java#L144>
>> On Tue, Sep 21, 2021 at 1:45 PM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Hello All,
>> 
>> We have a use case where in a java portable pipeline, we would like to have 
>> multiple environments setup in order that some executable stage runs in one 
>> environment while some other executable stages runs in another environment. 
>> Couple of questions on this:
>> 
>> 1. Is this current supported? I noticed a TODO in [1] which suggests it is 
>> feature pending support
>> 2. If we did support it, what would the ideal mechanism to distinguish 
>> ParDo/ExecutableStage to be executed in different environment, is it through 
>> ResourceHints?
>> 
>> 
>> Best,
>> Ke 
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java#L344
>>  
>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java#L344>
>>  
> 



Re: Multi Environment Support

2021-09-22 Thread Ke Wu
Thanks Luke for the reply, do you know what is the preferred way to configure a 
PTransform to be executed in a different environment from another PTransform 
when both are in the same SDK, e.g. Java ?

Best,
Ke

> On Sep 21, 2021, at 9:48 PM, Luke Cwik  wrote:
> 
> Environments that aren't exactly the same are already in separate 
> ExecutableStages. The GreedyPCollectionFuser ensures that today[1].
> 
> Workarounds like getOnlyEnvironmentId would need to be removed. It may also 
> be effectively dead-code.
> 
> 1: 
> https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java#L144
>  
> <https://github.com/apache/beam/blob/ebf2aacf37b97fc85b167271f184f61f5b06ddc3/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java#L144>
> On Tue, Sep 21, 2021 at 1:45 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello All,
> 
> We have a use case where in a java portable pipeline, we would like to have 
> multiple environments setup in order that some executable stage runs in one 
> environment while some other executable stages runs in another environment. 
> Couple of questions on this:
> 
> 1. Is this current supported? I noticed a TODO in [1] which suggests it is 
> feature pending support
> 2. If we did support it, what would the ideal mechanism to distinguish 
> ParDo/ExecutableStage to be executed in different environment, is it through 
> ResourceHints?
> 
> 
> Best,
> Ke 
> 
> 
> [1] 
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java#L344
>  
> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java#L344>
>  



Multi Environment Support

2021-09-21 Thread Ke Wu
Hello All,

We have a use case where in a java portable pipeline, we would like to have 
multiple environments setup in order that some executable stage runs in one 
environment while some other executable stages runs in another environment. 
Couple of questions on this:

1. Is this current supported? I noticed a TODO in [1] which suggests it is 
feature pending support
2. If we did support it, what would the ideal mechanism to distinguish 
ParDo/ExecutableStage to be executed in different environment, is it through 
ResourceHints?


Best,
Ke 


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java#L344
 

 

Re: Percentile metrics in Beam

2021-09-17 Thread Ke Wu
+1 would love to see a PR/Proposal out. 

This is a highly demanding feature our users at LinkedIn are asking for as 
well. 

> On Sep 17, 2021, at 10:56, Pablo Estrada  wrote:
> 
> 
> Thanks for working on this!
> In the past, we have avoided adding complex metrics because metrics tend to 
> be aggregated in the control path rather than the data path - and we worried 
> about overwhelming the metrics backends - however users have in the past 
> asked for more information in the distribution metric itself. I think it 
> makes sense to provide more info, while allowing runners to report as much as 
> they see fit. I'd love to see a proposal / PR for this.
> 
> fyi @Robert Bradshaw 
> 
>> On Wed, Sep 15, 2021 at 10:37 AM Ajo Thomas  wrote:
>> Thanks for the response, Alexey and Ke. 
>> Agree with your point to introduce a new metric type (say Percentiles) 
>> instead of altering the Distribution metric type to ensure compatibility 
>> across runners and sdks.
>> I am currently working on a prototype to add this new metric type to the 
>> metrics API and testing it with samza runner. I can share a design doc with 
>> the community with possible solutions very soon.
>> 
>> Thanks
>> Ajo
>> 
>>> On Wed, Sep 15, 2021 at 9:26 AM Alexey Romanenko  
>>> wrote:
>>> I agree with Ke Wu in the way that we need to keep compatibility across all 
>>> runners and the same metrics. So, it seems that it would be better to 
>>> create another metric type in this case.
>>> 
>>> Also, to discuss it in details, I’d recommend to create a design document 
>>> with possible solutions and examples.
>>> 
>>> —
>>> Alexey
>>> 
>>>> On 14 Sep 2021, at 19:04, Ke Wu  wrote:
>>>> 
>>>> I prefer adding a new metrics type instead of enhancing the existing 
>>>> Distribution [1] to support percentiles etc in order to ensure better 
>>>> compatibility. 
>>>> 
>>>> @Luke @Kyle what are your thoughts on this? 
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> [1] 
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
>>>>  
>>>> 
>>>>> On Sep 7, 2021, at 1:28 PM, Ajo Thomas  wrote:
>>>>> 
>>>>> Hi All,
>>>>> 
>>>>> I am working on adding support for some additional distribution metrics 
>>>>> like std dev, percentiles to the Metrics API. The runner of interest here 
>>>>> is Samza runner. I wanted to get the opinion of fellow beam devs on this.
>>>>> 
>>>>> One way to do this would be to make changes to the existing Distribution 
>>>>> metric:
>>>>> - Add additional metrics to Distribution metric- custom percentiles, std 
>>>>> dev, mean. Use Dropwizard Histogram under the hood in DistributionData to 
>>>>> track the distribution of the data.
>>>>> - This also means changes to accompanying classes like DistributionData, 
>>>>> DistributionResult which might involve runner specific changes.
>>>>> 
>>>>> Is this an acceptable change or would you suggest something else? Is the 
>>>>> Distribution metric only intended to track the metrics that it is 
>>>>> currently tracking- sum, min, max, count?
>>>>> 
>>>>> Thanks
>>>>> Ajo
>>>>> 
>>>> 
>>> 


Re: Percentile metrics in Beam

2021-09-14 Thread Ke Wu
I prefer adding a new metrics type instead of enhancing the existing 
Distribution [1] to support percentiles etc in order to ensure better 
compatibility. 

@Luke @Kyle what are your thoughts on this? 

Best,
Ke

[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
 

 

> On Sep 7, 2021, at 1:28 PM, Ajo Thomas  wrote:
> 
> Hi All,
> 
> I am working on adding support for some additional distribution metrics like 
> std dev, percentiles to the Metrics API. The runner of interest here is Samza 
> runner. I wanted to get the opinion of fellow beam devs on this.
> 
> One way to do this would be to make changes to the existing Distribution 
> metric:
> - Add additional metrics to Distribution metric- custom percentiles, std dev, 
> mean. Use Dropwizard Histogram under the hood in DistributionData to track 
> the distribution of the data.
> - This also means changes to accompanying classes like DistributionData, 
> DistributionResult which might involve runner specific changes.
> 
> Is this an acceptable change or would you suggest something else? Is the 
> Distribution metric only intended to track the metrics that it is currently 
> tracking- sum, min, max, count?
> 
> Thanks
> Ajo
> 



Re: MapState API

2021-09-13 Thread Ke Wu
Awesome, looking forward to it.

> On Sep 10, 2021, at 10:30 AM, Luke Cwik  wrote:
> 
> Yes, https://github.com/apache/beam/pull/15238 
> <https://github.com/apache/beam/pull/15238> is in progress for MapState and 
> SetState.
> 
> On Fri, Sep 10, 2021 at 9:22 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Another question on this topic, is any work planned to add the map state 
> support in portable mode [1], same for set state, list state?
> 
> [1] 
> https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337
>  
> <https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337>
>  
> 
>> On Sep 9, 2021, at 9:31 AM, Luke Cwik > <mailto:lc...@google.com>> wrote:
>> 
>> The underlying state implementation that the FnApiDoFnRunner uses has a 
>> "close" method specifically meant to flush any pending operations so that 
>> part should be trivial. I don't think persisting the "changelog" would be 
>> difficult but migrating to using it in the future would cause issues for job 
>> update since we would be changing the encoding so deciding upfront which 
>> approach we want would be useful.
>> 
>> 
>> 
>> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax > <mailto:re...@google.com>> wrote:
>> I think a changelog could be kept in memory - persisting a changelog seems 
>> overly complex. We would have to make sure to flush the changelog at the end 
>> of every bundle though, so it would only help if the map key was accessed 
>> multiple times in the same bundle. I don't think it's correct that we won't 
>> be able to remove the key. In your example, all of the calls are in a single 
>> processElement or a single bundle (I'm not sure if it's legal to maintain 
>> ReadableState references between elements within the same bundle though). In 
>> this case the call to remove would also be an operation journaled into the 
>> log, so after flushing the whole log at the end of the bundle the key would 
>> be removed.
>> 
>> I don't think the above would be that hard, but IMO it's also fine to do 
>> something simpler initially.
>> 
>> Reuven
>> 
>> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok > <mailto:kiley...@google.com>> wrote:
>> Is adding the complexity of the changelog worth it instead of resolving 
>> immediately? We would be storing much more persisted data and wouldn't be 
>> able to clear the key even on a remove().
>> 
>> For example:
>> ReadableState maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState maybePut2 = mapState.putIfAbsent(keyA, value2);
>> mapState.remove(keyA); // still need to keep the changelog
>> V maybePut2Value = maybePut2.read(); // should return value1
>> 
>> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik > <mailto:lc...@google.com>> wrote:
>> I believe we should consider these bugs and fix them since they are 
>> surprising behaviors for users based upon what people are used to from 
>> similar APIs in other Map constructs.
>> 
>> The benefit of the changelog approach is to allow for putIfAbsent to have a 
>> blind append if read is never called. To expand on my example of putIfAbsent:
>> First Bundle (assuming no read in this bundle):
>> ReadableState maybePut1 = mapState.putIfAbsent(keyA, value1);
>> ReadableState maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore 
>> this since we now know that the map has an entry
>> Produces one blind append of (keyA, (PutIfAbsent, value1))
>> 
>> Second Bundle:
>> ReadableState get = mapState.get(keyA);
>> V value = get.read();
>> The underlying values will be a list of changes applied in order to this 
>> map. The first read that sees multiple values should clear the list and 
>> resolve it similar to what a combining state does, returning the only Put 
>> (since Put = clear + append) in the list or the first PutIfAbsent.
>> 
>> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok > <mailto:kiley...@google.com>> wrote:
>> Would this be a breaking change then? Going by your first examples, it's no 
>> longer a deferred read-then-write. 
>> 
>> I'm not seeing the benefit of having a changelog. If I'm reading at your 
>> examples correctly, it's saying it should evaluate putIfAbsent immediately 
>> and store the result in ReadableState until read is called?
>> 
>> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik > <mailto:lc...@google.com>> wrote:
&g

Re: MapState API

2021-09-10 Thread Ke Wu
Another question on this topic, is any work planned to add the map state 
support in portable mode [1], same for set state, list state?

[1] 
https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L337
 

 

> On Sep 9, 2021, at 9:31 AM, Luke Cwik  wrote:
> 
> The underlying state implementation that the FnApiDoFnRunner uses has a 
> "close" method specifically meant to flush any pending operations so that 
> part should be trivial. I don't think persisting the "changelog" would be 
> difficult but migrating to using it in the future would cause issues for job 
> update since we would be changing the encoding so deciding upfront which 
> approach we want would be useful.
> 
> 
> 
> On Thu, Sep 9, 2021 at 9:14 AM Reuven Lax  > wrote:
> I think a changelog could be kept in memory - persisting a changelog seems 
> overly complex. We would have to make sure to flush the changelog at the end 
> of every bundle though, so it would only help if the map key was accessed 
> multiple times in the same bundle. I don't think it's correct that we won't 
> be able to remove the key. In your example, all of the calls are in a single 
> processElement or a single bundle (I'm not sure if it's legal to maintain 
> ReadableState references between elements within the same bundle though). In 
> this case the call to remove would also be an operation journaled into the 
> log, so after flushing the whole log at the end of the bundle the key would 
> be removed.
> 
> I don't think the above would be that hard, but IMO it's also fine to do 
> something simpler initially.
> 
> Reuven
> 
> On Wed, Sep 8, 2021 at 4:13 PM Kiley Sok  > wrote:
> Is adding the complexity of the changelog worth it instead of resolving 
> immediately? We would be storing much more persisted data and wouldn't be 
> able to clear the key even on a remove().
> 
> For example:
> ReadableState maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState maybePut2 = mapState.putIfAbsent(keyA, value2);
> mapState.remove(keyA); // still need to keep the changelog
> V maybePut2Value = maybePut2.read(); // should return value1
> 
> On Wed, Sep 8, 2021 at 1:15 PM Luke Cwik  > wrote:
> I believe we should consider these bugs and fix them since they are 
> surprising behaviors for users based upon what people are used to from 
> similar APIs in other Map constructs.
> 
> The benefit of the changelog approach is to allow for putIfAbsent to have a 
> blind append if read is never called. To expand on my example of putIfAbsent:
> First Bundle (assuming no read in this bundle):
> ReadableState maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState maybePut2 = mapState.putIfAbsent(keyA, value2);  // Ignore 
> this since we now know that the map has an entry
> Produces one blind append of (keyA, (PutIfAbsent, value1))
> 
> Second Bundle:
> ReadableState get = mapState.get(keyA);
> V value = get.read();
> The underlying values will be a list of changes applied in order to this map. 
> The first read that sees multiple values should clear the list and resolve it 
> similar to what a combining state does, returning the only Put (since Put = 
> clear + append) in the list or the first PutIfAbsent.
> 
> On Wed, Sep 8, 2021 at 11:51 AM Kiley Sok  > wrote:
> Would this be a breaking change then? Going by your first examples, it's no 
> longer a deferred read-then-write. 
> 
> I'm not seeing the benefit of having a changelog. If I'm reading at your 
> examples correctly, it's saying it should evaluate putIfAbsent immediately 
> and store the result in ReadableState until read is called?
> 
> On Wed, Sep 8, 2021 at 8:54 AM Luke Cwik  > wrote:
> I agree with the comment that you linked to Kiley and your analysis of what 
> happens today. Yes I believe the write will not happen until the read() is 
> invoked on ReadableState returned from the putIfAbsent(). The rest of the 
> comment also says that if there are multiple putIfAbsent calls then which one 
> is written is dependent on which ReadableState read() is called leading to 
> this behavior:
> ReadableState maybePut1 = mapState.putIfAbsent(keyA, value1);
> ReadableState maybePut2 = mapState.putIfAbsent(keyA, value2);
> maybePut2.read(); // If keyA was absent, then it will have value2 otherwise 
> it will maintain its old value.
> maybePut1.read(); // This is effectively ignored
> 
> I started this thread because a lot of these behaviors are surprising and we 
> should fix them to have ordering semantics based upon the actual order of 
> interactions with the MapState and not the current semantics which rely on 
> read() being invoked.
> 
> Conveniently we don't have to resolve 

Tests failure on master branch

2021-08-31 Thread Ke Wu
Hello,

I noticed there are some tests that are failing in master [1]
Portable_Python ("Run Portable_Python PreCommit”) 
Python_PVR_Flink ("Run Python_PVR_Flink PreCommit")

Are they known issues? If Yes, is there a ticket for them?

Best,
Ke


[1] https://github.com/apache/beam/pull/15433 
 

Re: Unexpected in TestStream in Portable Mode

2021-08-31 Thread Ke Wu
> Read does not have translation in portability, so the implementation is that 
> it needs to be primitive transform explicitly implemented by the runner. The 
> encoding/decoding has to happen in the runner.

Could you help me understand this a bit more? IIRC, Read is NOT being 
translated in portable mode exactly means it is a composite transform instead 
of primitive because all primitive transforms are required to be translated. In 
addition, Read is a composite transform of Impulse, which produces dummy bytes 
[1] to trigger subsequent ParDo/ExecutableStage, where decoding the actual 
source happens [2]

> There seems to be no role of the SDK harness with regard to the TestStream, 
> because the elements are already encoded by the submitting SDK. The coders 
> must match nevertheless, because you can have Events of type 
> KV>> and what will and what will not get 
> length-prefixed depends on which parts exactly are "known" (model) coders and 
> which are not. Encoding the whole value as single byte array will not work 
> for the consuming SDK harness, which will see that there should be nested 
> KvCoders instead.


I don’t think I fully understand what you say here. TestStream is currently a 
primitive transform, therefore there is no role of SDK harness. This is what 
the proposal to change, to make TestStream a composite transform with a 
primitive transform and subsequent ParDo to decode to the desired format. 


[1] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Impulse.java#L39>
 
[2] 
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149
 
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L149>
 

> On Aug 31, 2021, at 3:21 PM, Jan Lukavský  wrote:
> 
> On 9/1/21 12:13 AM, Ke Wu wrote:
>> Hi Jan,
>> 
>> Here is my understanding,
>> 
>> Runner is being brought up by job server driver, which is up and running 
>> before the job submission, i.e. it is job agnostic. Therefore, the runner it 
>> brought up does not have any SDK coder available and artifact staging only 
>> happens for SDK workers. 
>> 
>> You are right that Read and TestStream are sources, however the one thing 
>> that distinguish them is that Read transform is a composite transform and 
>> the decoding happens in ParDo/ExecutableStage, i.e. on SDK worker. 
> Read does not have translation in portability, so the implementation is that 
> it needs to be primitive transform explicitly implemented by the runner. The 
> encoding/decoding has to happen in the runner.
>> 
>> The proposal here is also to make the public facing TestStream transform a 
>> composite transform instead of primitive now, so that the decoding would 
>> occur on the SDK worker side where SDK coder is available, and the primitive 
>> that powers TestStream, which will be directly translated by runner to 
>> always produce raw bytes, and these raw bytes will be decoded on the SDK 
>> worker side.
> There seems to be no role of the SDK harness with regard to the TestStream, 
> because the elements are already encoded by the submitting SDK. The coders 
> must match nevertheless, because you can have Events of type 
> KV>> and what will and what will not get 
> length-prefixed depends on which parts exactly are "known" (model) coders and 
> which are not. Encoding the whole value as single byte array will not work 
> for the consuming SDK harness, which will see that there should be nested 
> KvCoders instead.
>> 
>> Best,
>> Ke
>> 
>>> On Aug 31, 2021, at 2:56 PM, Jan Lukavský >> <mailto:je...@seznam.cz>> wrote:
>>> 
>>> Sorry if I'm missing something obvious, but I don't quite see the 
>>> difference between Read and TestStream regarding the discussed issue with 
>>> coders. Couple of thoughts:
>>> 
>>>  a) both Read and TestStream are _sources_ - they produce elements that are 
>>> consumed by downstream transforms
>>> 
>>>  b) the coder of a particular PCollection is defined by the Pipeline proto 
>>> - it is the (client side) SDK that owns the Pipeline and that defines all 
>>> the coders
>>> 
>>>  c) runners must adhere to these coders, because otherwise there is risk of 
>>> coder mismatch, most probably on edges like x-lang transforms or inlined 
>>> transforms
>>> 
>>> I tried the approach of encoding the output of Read into byte array a

Re: Unexpected in TestStream in Portable Mode

2021-08-31 Thread Ke Wu
Hi Jan,

Here is my understanding,

Runner is being brought up by job server driver, which is up and running before 
the job submission, i.e. it is job agnostic. Therefore, the runner it brought 
up does not have any SDK coder available and artifact staging only happens for 
SDK workers. 

You are right that Read and TestStream are sources, however the one thing that 
distinguish them is that Read transform is a composite transform and the 
decoding happens in ParDo/ExecutableStage, i.e. on SDK worker. 

The proposal here is also to make the public facing TestStream transform a 
composite transform instead of primitive now, so that the decoding would occur 
on the SDK worker side where SDK coder is available, and the primitive that 
powers TestStream, which will be directly translated by runner to always 
produce raw bytes, and these raw bytes will be decoded on the SDK worker side.

Best,
Ke

> On Aug 31, 2021, at 2:56 PM, Jan Lukavský  wrote:
> 
> Sorry if I'm missing something obvious, but I don't quite see the difference 
> between Read and TestStream regarding the discussed issue with coders. Couple 
> of thoughts:
> 
>  a) both Read and TestStream are _sources_ - they produce elements that are 
> consumed by downstream transforms
> 
>  b) the coder of a particular PCollection is defined by the Pipeline proto - 
> it is the (client side) SDK that owns the Pipeline and that defines all the 
> coders
> 
>  c) runners must adhere to these coders, because otherwise there is risk of 
> coder mismatch, most probably on edges like x-lang transforms or inlined 
> transforms
> 
> I tried the approach of encoding the output of Read into byte array as well, 
> but that turns out to have the problem that once there is a (partially) known 
> coder in play, this does not work, because the consuming transform 
> (executable stage) expects to see the wire coder - that is not simply byte 
> array, because the type of elements might be for instance KV, where 
> KvCoder is one of ModelCoders. That does not encode using LengthPrefixCoder 
> and as such will be incompatible with LengthPrefixCoder(ByteArrayCoder). The 
> TestStream needs to know the coder of elements, because that defines where 
> exactly must or must not be inserted length-prefixing. The logic in 
> LengthPrefixUnknownCoders [1] is recursive for ModelCoders.
> 
> [1] 
> https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48
>  
> <https://github.com/apache/beam/blob/ff70e740a2155592dfcb302ff6303cc19660a268/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/LengthPrefixUnknownCoders.java#L48>
> On 8/31/21 11:29 PM, Ke Wu wrote:
>> Awesome! Thank you Luke and Robert. 
>> 
>> Also created https://issues.apache.org/jira/browse/BEAM-12828 
>> <https://issues.apache.org/jira/browse/BEAM-12828> to track unit test 
>> conversion. I could take it after I updated Samza runner to support 
>> TestStream in portable mode.
>> 
>>> On Aug 31, 2021, at 2:05 PM, Robert Bradshaw >> <mailto:rober...@google.com>> wrote:
>>> 
>>> Created https://issues.apache.org/jira/browse/BEAM-12827 
>>> <https://issues.apache.org/jira/browse/BEAM-12827> to track this.
>>> 
>>> +1 to converting tests to just use longs for better coverage for now.
>>> 
>>> Also, yes, this is very similar to the issues encountered by Reads,
>>> but the solution is a bit simpler as there's no need for the
>>> TestStream primitive to interact with the decoded version of the
>>> elements (unlike Reads, where the sources often give elements in
>>> un-encoded form) and no user code to run.
>>> 
>>> - Robert
>>> 
>>> 
>>> 
>>> On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský >> <mailto:je...@seznam.cz>> wrote:
>>>> 
>>>> This looks (and likely has the same cause) similar to what I have 
>>>> experienced when making primitive Read supported by Flink. The final 
>>>> solution would be to make SDK coders known to the runner of the same SDK 
>>>> (already present in various different threads). But until then, the 
>>>> solution seems to be something like [1]. The root cause is that the 
>>>> executable stage expects its input to be encoded by the SDK harness, and 
>>>> that part is missing when the transform is inlined (like Read in my case, 
>>>> or TestStream in your case). The intoWireTypes method simulates precisely 
>>>> this part - it encodes the PCollection via coder defined in the SDK

Re: Unexpected in TestStream in Portable Mode

2021-08-31 Thread Ke Wu
Awesome! Thank you Luke and Robert. 

Also created https://issues.apache.org/jira/browse/BEAM-12828 
<https://issues.apache.org/jira/browse/BEAM-12828> to track unit test 
conversion. I could take it after I updated Samza runner to support TestStream 
in portable mode.

> On Aug 31, 2021, at 2:05 PM, Robert Bradshaw  wrote:
> 
> Created https://issues.apache.org/jira/browse/BEAM-12827 to track this.
> 
> +1 to converting tests to just use longs for better coverage for now.
> 
> Also, yes, this is very similar to the issues encountered by Reads,
> but the solution is a bit simpler as there's no need for the
> TestStream primitive to interact with the decoded version of the
> elements (unlike Reads, where the sources often give elements in
> un-encoded form) and no user code to run.
> 
> - Robert
> 
> 
> 
> On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský  wrote:
>> 
>> This looks (and likely has the same cause) similar to what I have 
>> experienced when making primitive Read supported by Flink. The final 
>> solution would be to make SDK coders known to the runner of the same SDK 
>> (already present in various different threads). But until then, the solution 
>> seems to be something like [1]. The root cause is that the executable stage 
>> expects its input to be encoded by the SDK harness, and that part is missing 
>> when the transform is inlined (like Read in my case, or TestStream in your 
>> case). The intoWireTypes method simulates precisely this part - it encodes 
>> the PCollection via coder defined in the SDK harness and then decodes it by 
>> coder defined by the runner (which match on binary level, but produce 
>> different types).
>> 
>> Jan
>> 
>> [1] 
>> https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
>> 
>> On 8/31/21 7:27 PM, Luke Cwik wrote:
>> 
>> I originally wasn't for making it a composite because it changes the "graph" 
>> structure but the more I thought about it the more I like it.
>> 
>> On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw  wrote:
>>> 
>>> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik  wrote:
>>>> 
>>>> On Mon, Aug 30, 2021 at 7:07 PM Ke Wu  wrote:
>>>>> 
>>>>> Hello everyone,
>>>>> 
>>>>> This is Ke. I am working on enable TestStream support for Samza Runner in 
>>>>> portable mode and discovers something unexpected.
>>>>> 
>>>>> In my implementation for Samza Runner, couple of tests are failing with 
>>>>> errors like
>>>>> 
>>>>> 
>>>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
>>>>> 
>>>>> I noticed these tests have the same symptom on Flink Runner as well, 
>>>>> which are currently excluded:
>>>>> 
>>>>> https://issues.apache.org/jira/browse/BEAM-12048
>>>>> https://issues.apache.org/jira/browse/BEAM-12050
>>>>> 
>>>>> 
>>>>> After some more digging, I realized that it is because the combination of 
>>>>> following facts:
>>>>> 
>>>>> TestStream is a primitive transform, therefore, Runners are supposed to 
>>>>> translate directly, the most intuitive implementation for each runner to 
>>>>> do is to parse the payload to decode TestStream.Event [1] on the Runner 
>>>>> process to be handed over to subsequent stages.
>>>>> When TestStream used with Integers, i.e. VarIntCoder to initialize, since 
>>>>> VarIntCoder is NOT a registered ModelCoder [2], it will be treated as 
>>>>> custom coder during conversion to protobuf pipeline [3] and will be 
>>>>> replaced with byte array coder [4] when runner sends data to SDK worker.
>>>>> Therefore an error occurs because the decoded TestStream.Event has 
>>>>> Integer as its value but the remote input receiver is expecting byte 
>>>>> array, causing java.lang.ClassCastException: java.lang.Integer cannot be 
>>>>> cast to [B
>>>>> 
>>>>> 
>>>>> In addition, I tried to update all these failed tests to use Long instead 
>>>>> of Integer, and all tests will pass since VarLongCoder is a known coder. 
>>>>> I do understand that runner process does not have user artifacts staged 
>>>>> so it can only use coders in  beam model when c

Re: Unexpected in TestStream in Portable Mode

2021-08-31 Thread Ke Wu
> The notion of "integer" vs. "long" is also language-specific detail as
> well, so not sure it makes sense as a well-known coder.

Agreed, making integer coder as a well known coder is not a long term solution 
for this issue anyway, so I think we should be fine here to keep it non known.

> On Aug 31, 2021, at 11:45 AM, Luke Cwik  wrote:
> 
> I don't think we can make Java based runners use the SDKs coder since 
> TestStream is also used within Go and Python pipelines.


Agreed. In addition, the current setup where runner does not rely on any SDK 
artifacts is a really nice feature and setup to achieve “split deployment”, 
where exception categorization, alerts triaging could be simplified with this 
set up as well. This is also what we relies on in Samza Runner at LinkedIn. 

>> 
>> +1. Rather than making coder a property of TestStream, I would be in
>> favor of the TestStream primitive always producing bytes (basically,
>> by definition), and providing a composite that consists of this
>> followed by a decoding to give us a typed TestStream.


+1 as well, this sounds like the idea path forward. 

Before we make TestStream as composite, do you think it worth the efforts to 
update unit tests with UsesTestStream annotation to use Long instead stead of 
Integer such that runners can have better coverage and actually tests if they 
are functionally working properly?

Best,
Ke


> On Aug 31, 2021, at 11:45 AM, Luke Cwik  wrote:
> 
> I don't think we can make Java based runners use the SDKs coder since 
> TestStream is also used within Go and Python pipelines.
> 
> On Tue, Aug 31, 2021 at 11:00 AM Jan Lukavský  <mailto:je...@seznam.cz>> wrote:
> This looks (and likely has the same cause) similar to what I have experienced 
> when making primitive Read supported by Flink. The final solution would be to 
> make SDK coders known to the runner of the same SDK (already present in 
> various different threads). But until then, the solution seems to be 
> something like [1]. The root cause is that the executable stage expects its 
> input to be encoded by the SDK harness, and that part is missing when the 
> transform is inlined (like Read in my case, or TestStream in your case). The 
> intoWireTypes method simulates precisely this part - it encodes the 
> PCollection via coder defined in the SDK harness and then decodes it by coder 
> defined by the runner (which match on binary level, but produce different 
> types).
> 
>  Jan
> 
> [1] 
> https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657
>  
> <https://github.com/apache/beam/blob/dd7945f9f259a2989f9396d1d7a8dcb122711a52/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L657>
> On 8/31/21 7:27 PM, Luke Cwik wrote:
>> I originally wasn't for making it a composite because it changes the "graph" 
>> structure but the more I thought about it the more I like it.
>> 
>> On Tue, Aug 31, 2021 at 10:06 AM Robert Bradshaw > <mailto:rober...@google.com>> wrote:
>> On Tue, Aug 31, 2021 at 9:18 AM Luke Cwik > <mailto:lc...@google.com>> wrote:
>> >
>> > On Mon, Aug 30, 2021 at 7:07 PM Ke Wu > > <mailto:ke.wu...@gmail.com>> wrote:
>> >>
>> >> Hello everyone,
>> >>
>> >> This is Ke. I am working on enable TestStream support for Samza Runner in 
>> >> portable mode and discovers something unexpected.
>> >>
>> >> In my implementation for Samza Runner, couple of tests are failing with 
>> >> errors like
>> >>
>> >>
>> >> java.lang.ClassCastException: java.lang.Integer cannot be cast to [B
>> >>
>> >> I noticed these tests have the same symptom on Flink Runner as well, 
>> >> which are currently excluded:
>> >>
>> >> https://issues.apache.org/jira/browse/BEAM-12048 
>> >> <https://issues.apache.org/jira/browse/BEAM-12048>
>> >> https://issues.apache.org/jira/browse/BEAM-12050 
>> >> <https://issues.apache.org/jira/browse/BEAM-12050>
>> >>
>> >>
>> >> After some more digging, I realized that it is because the combination of 
>> >> following facts:
>> >>
>> >> TestStream is a primitive transform, therefore, Runners are supposed to 
>> >> translate directly, the most intuitive implementation for each runner to 
>> >> do is to parse the payload to decode TestStream.Event [1] on the Runner 
>> >> pro

Unexpected in TestStream in Portable Mode

2021-08-30 Thread Ke Wu
Hello everyone,

This is Ke. I am working on enable TestStream support for Samza Runner in 
portable mode and discovers something unexpected. 

In my implementation for Samza Runner, couple of tests are failing with errors 
like

java.lang.ClassCastException: java.lang.Integer cannot be cast to [B

I noticed these tests have the same symptom on Flink Runner as well, which are 
currently excluded:
https://issues.apache.org/jira/browse/BEAM-12048 
 
https://issues.apache.org/jira/browse/BEAM-12050 


After some more digging, I realized that it is because the combination of 
following facts:

TestStream is a primitive transform, therefore, Runners are supposed to 
translate directly, the most intuitive implementation for each runner to do is 
to parse the payload to decode TestStream.Event [1] on the Runner process to be 
handed over to subsequent stages.
When TestStream used with Integers, i.e. VarIntCoder to initialize, since 
VarIntCoder is NOT a registered ModelCoder [2], it will be treated as custom 
coder during conversion to protobuf pipeline [3] and will be replaced with byte 
array coder [4] when runner sends data to SDK worker.
Therefore an error occurs because the decoded TestStream.Event has Integer as 
its value but the remote input receiver is expecting byte array, causing 
java.lang.ClassCastException: java.lang.Integer cannot be cast to [B

In addition, I tried to update all these failed tests to use Long instead of 
Integer, and all tests will pass since VarLongCoder is a known coder. I do 
understand that runner process does not have user artifacts staged so it can 
only use coders in  beam model when communicating with SDK worker process. 

Couple of questions on this:

1. Is it expected that VarIntegerCoder is not a known coder?
2. Is TestStream always supposed to be translated the payload as raw bytes in 
order that runner process can always send it to SDK worker with the default 
byte array coder and asks SDK worker to decode accordingly?
3. If Yes to 2), then does it mean, TestStream needs to be translated in a 
completely different way in portable mode from classic mode since in classic 
mode, translator can directly translates the payload to its final format.

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java#L52
 

 
[2] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java#L65
 

 
[3] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java#L99
 

 
[4] 
https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/wire/WireCoders.java#L93
 

 

Re: Identify primitive/native transforms in portable pipeline

2021-08-02 Thread Ke Wu
IIUC, TrivialNativeTransformExpander is introduced to prune composite nodes 
with known urns such that its sub transforms and environment id are eliminated, 
essentially making it a leaf node. I believe it is needed because when 
traversing a QueryablePipeline, only leaf nodes are being returned as the 
expectation is that only leaf nodes needs to be translated directly. So 
essentially, TrivialNativeTransformExpander  is used to make a composite 
transform a leaf node in order to give runners capability to translate it at 
runtime.

I love the idea of relying on transform urns alone to determine primitive 
transforms so make things clearer. In order for us to do that, I suppose it is 
better for us to provide a unified approach to register translators instead of 
each runner has its own way of doing such. [1][2][3]

What are your thoughts?

Best,
Ke


[1] Flink:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L144>
 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java#L122>
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L145>

[2] Spark:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99
 
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L99>
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100
 
<https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkStreamingPortablePipelineTranslator.java#L100>

[3] Samza:
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173
 
<https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java#L173>
 

> On Jul 29, 2021, at 1:58 PM, Robert Bradshaw  wrote:
> 
> OK, I have to admit that I have no idea what the purpose of
> TrivialNativeTransformExpander is--maybe something else in the
> infrastructure can't handle translating pipelines that have
> subtransforms or something like that? It seems to me the map of urn ->
> PTransformTranslator should be sufficient to do the translation
> (simply ignoring subtransforms for any known URN when walking the
> tree). In this case the whole question and infrastructure around "is
> this primitive" could simply go away.
> 
> On Wed, Jul 28, 2021 at 8:47 PM Ke Wu  wrote:
>> 
>> Hi Robert,
>> 
>> Thanks for the reply, the motivation for this is, I noticed when we need to 
>> translate a composite transform, there are two steps involved:
>> 
>> 1. Register the composite transform urn with a delicate translator. [1]
>> 2. Register the composite transform urn with @AutoServer of NativeTranforms 
>> [2]
>> 
>> I was wondering whether step 2 could be eliminated since after a composite 
>> transform urn is registered with translator, its environment id and 
>> subtrasnforms will be removed from pipeline components by 
>> TrivialNativeTransformExpander [3]
>> 
>> If environment id is not sufficient to distinguish a primitive transform, I 
>> suppose we need to keep step 2, unless we update QueryablePipeline to take 
>> known urns as an input.
>> 
>> Does this make sense to you?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
>> [2] 
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
>> [3] 
>> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
>> 
>&g

Re: Command line to run DatstoreIO integration tests for java

2021-07-29 Thread Ke Wu
“Run SQL PostCommit” is essentially running “./gradlew :sqlPostCommit” [1]

In your case where you would like to run DataStoreReadWriteIT only, it can be 
simplified [2] to 

"./gradlew :sdks:java:extensions:sql:postCommit”

Best,
Ke

[1] 
https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PostCommit_SQL.groovy#L41
 
<https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PostCommit_SQL.groovy#L41>
 
[2] https://github.com/apache/beam/blob/master/build.gradle.kts#L190 
<https://github.com/apache/beam/blob/master/build.gradle.kts#L190> 


> On Jul 29, 2021, at 9:58 AM, Alex Amato  wrote:
> 
> I was hoping for the command line to run it. So that the test could be 
> tweaked to inject an error, and ensure the error handling code works as 
> expected
> 
> On Wed, Jul 28, 2021 at 8:34 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Comment the PR with "Run SQL PostCommit” would trigger the post commit 
> integration tests for SQL, which I suppose includes DataStoreReadWriteIT
> 
> Let me know if whether or not this is sufficient.
> 
> Best,
> Ke
> 
>> On Jul 28, 2021, at 12:20 PM, Alex Amato > <mailto:ajam...@google.com>> wrote:
>> 
>> Is it possible to run a Datastore IO integration test to test this PR?
>> 
>> https://github.com/apache/beam/pull/15183/files 
>> <https://github.com/apache/beam/pull/15183/files>
>> 
>> Probably this test can be ran somehow. Though I don't know the gradle 
>> command to run it
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
>>  
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java>
>> 
>> Does anyone know how to run this test?
> 



Re: Identify primitive/native transforms in portable pipeline

2021-07-28 Thread Ke Wu
Hi Robert,

Thanks for the reply, the motivation for this is, I noticed when we need to 
translate a composite transform, there are two steps involved:

1. Register the composite transform urn with a delicate translator. [1]
2. Register the composite transform urn with @AutoServer of NativeTranforms [2]

I was wondering whether step 2 could be eliminated since after a composite 
transform urn is registered with translator, its environment id and 
subtrasnforms will be removed from pipeline components by 
TrivialNativeTransformExpander [3]

If environment id is not sufficient to distinguish a primitive transform, I 
suppose we need to keep step 2, unless we update QueryablePipeline to take 
known urns as an input.

Does this make sense to you?

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L158>
 
[2] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
 
<https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254>
 
[3] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59
 
<https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L59>
 


> On Jul 28, 2021, at 5:52 PM, Robert Bradshaw  wrote:
> 
> A composite transform is a transform that either only returns its
> inputs or has subtransforms. A primitive, or leaf, transform would be
> the complement of that. Checking environment ids is not sufficient
> (e.g. the Flatten primitive doesn't need an environment, but the ParDo
> does).
> 
> Perhaps it's worth taking a step back and trying to understand why you
> need to distinguish between primitive and nonprimitive transforms at
> all. When translating a pipeline,
> 
> (1) If you know the URN, you can implement the transform directly,
> (2) If you don't know the URN, look to see if its outputs are a subset
> of the inputs (in which case there's nothing to do) or it has
> subtransforms (in which case you can just recurse to compute the
> outputs).
> (3) If neither (1) nor (2) hold, then you throw an error. This is an
> unknown primitive.
> 
> 
> On Tue, Jul 27, 2021 at 8:30 PM Ke Wu  wrote:
>> 
>> Hello All,
>> 
>> When I am looking at translating composite transforms in potable pipeline, I 
>> realized that TrivialNativeTransformExpander[1] is being used to identify 
>> native transforms by transform urn, and it removes sub-transform and 
>> environment id in the corresponding transform node. However, 
>> QueryablePipeline seems to identify primitive transforms in a different 
>> approach [2], which requires us to register runner native transforms again 
>> [3][4] in addition to the transform translators.
>> 
>> An idea came to me that we should be able to identify primitive/native 
>> transform by look at its environment according to protobuf model [5],
>> 
>> // Environment where the current PTransform should be executed in.
>> //
>> // Transforms that are required to be implemented by a runner must omit this.
>> // All other transforms are required to specify this.
>> string environment_id = 7;
>> 
>> 
>> therefore, I updated the logic:
>> 
>>   private static boolean isPrimitiveTransform(PTransform transform) {
>> String urn = PTransformTranslation.urnForTransformOrNull(transform);
>> -return PRIMITIVE_URNS.contains(urn) || 
>> NativeTransforms.isNative(transform);
>> +   return transform.getEnvironmentId().isEmpty()
>>   }
>> 
>> 
>> However, tests started to fail on SQL cases where I found that external 
>> transforms seem to have empty environment id as well [6], which does not 
>> seem to confront the protobuf model.
>> 
>> My questions here are:
>> 
>> 1. Is NativeTranforms required to register a primitive/native transform in 
>> addition to register with translators?
>> 2. Is empty environment_id a good enough indicator to identify a 
>> native/primitive transform?
>> 3. Is external transform suppose to have empty or non-empty environment_id?
>> 
>> Best,
>> Ke
>> 
>> 
>> [1] 
>> https://github.com/apache/beam/blob/master/runners/core-construct

Re: Command line to run DatstoreIO integration tests for java

2021-07-28 Thread Ke Wu
Comment the PR with "Run SQL PostCommit” would trigger the post commit 
integration tests for SQL, which I suppose includes DataStoreReadWriteIT

Let me know if whether or not this is sufficient.

Best,
Ke

> On Jul 28, 2021, at 12:20 PM, Alex Amato  wrote:
> 
> Is it possible to run a Datastore IO integration test to test this PR?
> 
> https://github.com/apache/beam/pull/15183/files 
> 
> 
> Probably this test can be ran somehow. Though I don't know the gradle command 
> to run it
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
>  
> 
> 
> Does anyone know how to run this test?



Identify primitive/native transforms in portable pipeline

2021-07-27 Thread Ke Wu
Hello All,

When I am looking at translating composite transforms in potable pipeline, I 
realized that TrivialNativeTransformExpander[1] is being used to identify 
native transforms by transform urn, and it removes sub-transform and 
environment id in the corresponding transform node. However, QueryablePipeline 
seems to identify primitive transforms in a different approach [2], which 
requires us to register runner native transforms again [3][4] in addition to 
the transform translators.

An idea came to me that we should be able to identify primitive/native 
transform by look at its environment according to protobuf model [5], 

// Environment where the current PTransform should be executed in.
//
// Transforms that are required to be implemented by a runner must omit this.
// All other transforms are required to specify this.
string environment_id = 7;

therefore, I updated the logic:

   private static boolean isPrimitiveTransform(PTransform transform) {
 String urn = PTransformTranslation.urnForTransformOrNull(transform);
-return PRIMITIVE_URNS.contains(urn) || 
NativeTransforms.isNative(transform);
+   return transform.getEnvironmentId().isEmpty()
   }

However, tests started to fail on SQL cases where I found that external 
transforms seem to have empty environment id as well [6], which does not seem 
to confront the protobuf model.

My questions here are:

1. Is NativeTranforms required to register a primitive/native transform in 
addition to register with translators?
2. Is empty environment_id a good enough indicator to identify a 
native/primitive transform?
3. Is external transform suppose to have empty or non-empty environment_id?

Best,
Ke


[1] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/TrivialNativeTransformExpander.java#L44
 

 
[2] 
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java#L186
 

 
[3] 
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java#L254
 

 
[4] 
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkBatchPortablePipelineTranslator.java#L412
 

 
[5] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L194
 

 
[6] 
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py#L392
 

 



Re: Translates composite transform in portable pipeline

2021-07-27 Thread Ke Wu
Thank you Robert for the response. 

After some more digging, I realized that Flink/Spark runner is translating 
composite transforms with

// Don't let the fuser fuse any subcomponents of native transforms.
Pipeline trimmedPipeline =
TrivialNativeTransformExpander.forKnownUrns(
pipelineWithSdfExpanded, translator.knownUrns());
which will mark composite transform whose urn is in translator list primitive, 
i.e, leaf node, therefore the traverse latter will be able to return it to be 
translated.

I believe this is a step that is currently missing in the Samza runner which 
makes the composite transform translation not working properly. 

Since I am working on Samza runner, I will post a PR to fix it.

Best,
Ke
 

> On Jul 26, 2021, at 5:45 PM, Robert Bradshaw  wrote:
> 
> You can think of composite transforms like subroutines--they're useful
> concepts for representing the logical structure of the pipeline, but
> for the purposes of execution it is just as valid to inline them all
> as a single monolithic function/pipeline composed of nothing but
> primitive calls. Flink/Spark/Samza have no native notion of composite
> transforms, so this is what they do. If you can preserve the more rich
> structure that has advantages (e.g. for monitoring, debugging, rolling
> up counters and messages, visualizing the pipeline).
> 
> There is one other important case for composites that runners may want
> to take advantage of: runners may recognize higher-level transforms
> and substitute their own (equivalent, of course) implementations. The
> prototypical example of this is combiner lifting, where CombinePerKey
> is naively implemented as GroupByKey + CombineAllValuesDoFn, but most
> runners have more sophisticated ways of handling associative,
> commutative CombineFn aggregations (See
> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_260
> )
> 
> - Robert
> 
> On Mon, Jul 26, 2021 at 5:27 PM Ke Wu  wrote:
>> 
>> Hello All,
>> 
>> I noticed that Flink/Spark/Samza runners are translating portable pipeline 
>> in the similar manner:
>> 
>> QueryablePipeline p =
>>QueryablePipeline.forTransforms(
>>pipeline.getRootTransformIdsList(), pipeline.getComponents());
>> 
>> for (PipelineNode.PTransformNode transform : 
>> p.getTopologicallyOrderedTransforms()) {
>>  // Translation logic
>> }
>> 
>> However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. 
>> composite transforms are NOT being translated at all.
>> 
>> Is this the expected behavior for runner to implement translation logic for 
>> portable pipeline? If Yes, what are the suggestions if certain runners need 
>> to translate composite transforms?
>> 
>> Best,
>> Ke



Translates composite transform in portable pipeline

2021-07-26 Thread Ke Wu
Hello All,

I noticed that Flink/Spark/Samza runners are translating portable pipeline in 
the similar manner:
> QueryablePipeline p =
> QueryablePipeline.forTransforms(
> pipeline.getRootTransformIdsList(), pipeline.getComponents());
> 
> for (PipelineNode.PTransformNode transform : 
> p.getTopologicallyOrderedTransforms()) {
>   // Translation logic
> }
However, IIUC, this only iterates through leaf nodes of the pipeline, i.e. 
composite transforms are NOT being translated at all. 

Is this the expected behavior for runner to implement translation logic for 
portable pipeline? If Yes, what are the suggestions if certain runners need to 
translate composite transforms?

Best,
Ke

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-03 Thread Ke Wu
There is also a talk [1] which introduces dynamic scaling a stream processing 
job at LinkedIn with Samza runner as well

[1] https://www.usenix.org/conference/hotcloud20/presentation/singh 
<https://www.usenix.org/conference/hotcloud20/presentation/singh> 

> On Jun 3, 2021, at 1:59 PM, Ke Wu  wrote:
> 
> Also, are there any resources I can use to find out more about how horizontal 
> scaling works in Samza?
> 
> It is a configuration [1] passed along with job submission, then Job 
> Coordinator, similar to Job Manager in Flink, asks for Yarn Resource Manager 
> to allocate containers, or Kubernetes API server to allocate Pods. 
> 
> configure the job to use the PROCESS environment.
> 
> Agreed that a custom image with fat jar inside + PROCESS environment works 
> too, we prefer EXTERNAL environment because it gives us isolation between the 
> runner and sdk worker, where the runner container can be running completely 
> based on a framework image.
> 
>  
> 
> [1] 
> https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count
>  
> <https://samza.apache.org/learn/documentation/latest/jobs/configuration-table.html#job-container-count>
>  
> 
> On Thu, Jun 3, 2021 at 12:45 PM Kyle Weaver  <mailto:kcwea...@google.com>> wrote:
> However, not all runners follow the pattern where a predefined number of 
> workers are brought up before job submission, for example, for Samza runner, 
> the number of workers needed for a job is determined after the job submission 
> happens, in which case, in the Samza worker Pod, which is similar to “Task 
> Manager Pod” in Flink, is brought up together after job submission and the 
> runner container in this POD need to connect to worker pool service at much 
> earlier time.
>  
> Makes sense. In that case, the best alternative to worker pools is probably 
> to create a custom Samza/Flink worker container image that includes whatever 
> dependencies necessary to run the Beam user code, and then configure the job 
> to use the PROCESS environment.
> 
> Also, are there any resources I can use to find out more about how horizontal 
> scaling works in Samza?
> 
> On Wed, Jun 2, 2021 at 6:39 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Very good point. We are actually talking about the same high level approach 
> where Task Manager Pod has two containers inside running, one is task manager 
> container while the other is worker pool service container.
> 
> I believe the disconnect probably lies in how a job is being 
> deployed/started. In the GCP Flink operator example, it is completely true 
> that the likelihood where the worker pool service is not available when the 
> task manager container needs to connect to it is extremely low. It is because 
> the worker pool service is being brought up together when the Flink cluster 
> is being brought up, which is before the job submission even happens.
> 
> However, not all runners follow the pattern where a predefined number of 
> workers are brought up before job submission, for example, for Samza runner, 
> the number of workers needed for a job is determined after the job submission 
> happens, in which case, in the Samza worker Pod, which is similar to “Task 
> Manager Pod” in Flink, is brought up together after job submission and the 
> runner container in this POD need to connect to worker pool service at much 
> earlier time.
> 
> In addition, if I understand correctly, Flink is planning to add support for 
> dynamically adding new task managers after job submission [1], in which case, 
> the task manager container and worker pool service container in the same Task 
> Manager Pod could be started together and the task manager container need to 
> connect to the worker pool service container sooner. 
> 
> Hope this clarifies things better. Let me know if you have more questions.
> 
> Best,
> Ke
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10407 
> <https://issues.apache.org/jira/browse/FLINK-10407> 
> 
>> On Jun 2, 2021, at 4:27 PM, Kyle Weaver > <mailto:kcwea...@google.com>> wrote:
>> 
>> Therefore, if we bring up the external worker pool container together with 
>> the runner container, which is one the supported approach by Flink Runner on 
>> K8s
>> 
>> Exactly which approach are you talking about in the doc? I feel like there 
>> could be some misunderstanding here. Here is the configuration I'm talking 
>> about: 
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>>  
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Ke Wu
Very good point. We are actually talking about the same high level approach 
where Task Manager Pod has two containers inside running, one is task manager 
container while the other is worker pool service container.

I believe the disconnect probably lies in how a job is being deployed/started. 
In the GCP Flink operator example, it is completely true that the likelihood 
where the worker pool service is not available when the task manager container 
needs to connect to it is extremely low. It is because the worker pool service 
is being brought up together when the Flink cluster is being brought up, which 
is before the job submission even happens.

However, not all runners follow the pattern where a predefined number of 
workers are brought up before job submission, for example, for Samza runner, 
the number of workers needed for a job is determined after the job submission 
happens, in which case, in the Samza worker Pod, which is similar to “Task 
Manager Pod” in Flink, is brought up together after job submission and the 
runner container in this POD need to connect to worker pool service at much 
earlier time.

In addition, if I understand correctly, Flink is planning to add support for 
dynamically adding new task managers after job submission [1], in which case, 
the task manager container and worker pool service container in the same Task 
Manager Pod could be started together and the task manager container need to 
connect to the worker pool service container sooner. 

Hope this clarifies things better. Let me know if you have more questions.

Best,
Ke

[1] https://issues.apache.org/jira/browse/FLINK-10407 
 

> On Jun 2, 2021, at 4:27 PM, Kyle Weaver  wrote:
> 
> Therefore, if we bring up the external worker pool container together with 
> the runner container, which is one the supported approach by Flink Runner on 
> K8s
> 
> Exactly which approach are you talking about in the doc? I feel like there 
> could be some misunderstanding here. Here is the configuration I'm talking 
> about: 
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/without_job_server/beam_flink_cluster.yaml
>  
> 
> 
> Basically this config is describing a Flink task manager with a Beam worker 
> pool sidecar. The user invokes it with:
> 
> kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
> 
> It doesn't matter which container is started first, the task manager 
> container or the worker pool sidecar, because no communication between the 
> two containers is necessary at this time.
> 
> The instructions are to start the cluster first and wait until it is ready to 
> submit a job, e.g.:
> 
> kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
> 
> The task manager only sends the worker pool requests once it's running a job. 
> So for things to go wrong in the way you describe:
> 
> 1. The user submits a job, then starts a Flink cluster -- reversing the order 
> of steps in the instructions.
> 2. The worker pool sidecar takes way longer to start up than the task manager 
> container for some reason.
> 3. The Flink cluster accepts and starts running the job before the worker 
> pool sidecar is ready -- I'm not familiar enough with k8s lifecycle 
> management or the Flink operator implementation to be sure if this is even 
> possible.
> 
> I've never seen this happen. But, of course there are plenty of unforeseen 
> ways things can go wrong. So I'm not opposed to improving our error handling 
> here more generally.
> 



Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Ke Wu
I do agree that it usually takes longer for runner before tries to connect than 
external worker to become available, I suppose that is probably why we have the 
external service pool in the current way.

However, I am not 100% confident to say it won’t happen in practice because the 
design does seem to have the potential to result in such failure. The question 
is, do we want to rely on the assumption that worker pool usually starts faster 
than runner connects to it or we should add some preventative approaches to try 
to mitigate it ever happens? The answer may be OK to depend on the assumption, 
in addition, if there are some data points showing the possibility that worker 
is ready first, that will give us more confidence. 

What are your thoughts?

Best,
Ke 

> On Jun 2, 2021, at 12:37 PM, Kyle Weaver  wrote:
> 
> As far as I'm aware there's nothing strictly guaranteeing the worker pool has 
> been started. But in practice it takes a while for the job to start up - the 
> pipeline needs to be constructed, sent to the job server, translated, and 
> then the runner needs to start the job, etc. before the external environment 
> request is sent. So I'm not sure whether or not the problem you describe 
> would ever actually happen.
> 
> On Wed, Jun 2, 2021 at 11:01 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hi Kyle,
> 
> Thanks for reviewing https://github.com/apache/beam/pull/14923 
> <https://github.com/apache/beam/pull/14923>. I would like to follow up with 
> the deadline & waitForReady on ExternalEnvironment here.
> 
> In Kubernetes, if my understanding is correct, there is no ordering support 
> when bringing up containers (init container does not work for external mode 
> because it requires init container to complete). Therefore, if we bring up 
> the external worker pool container together with the runner container, which 
> is one the supported approach by Flink Runner on K8s [1], then it is possible 
> that external worker pool service may not be available when runner tries to 
> connect. 
> 
> What are the recommended way to guarantee the availability of external worker 
> pool service before connecting? We would like to adopt the pattern to support 
> Samza runner on K8s as well.
> 
> Best,
> Ke
> 
> 
> [1] [Public] Beam Flink K8s: 
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
>  
> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898>
>  
> 
>> On May 27, 2021, at 6:11 PM, Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> 
>> Good to know. We are working on running java portable pipeline for Samza 
>> runner and I believe we could take on the task to enhance the java workflow 
>> to support timeout/retry etc on gRPC calls. 
>> 
>> Created BEAM-12419 <https://issues.apache.org/jira/browse/BEAM-12419> to 
>> track the work.
>> 
>> Best,
>> Ke
>> 
>>> On May 27, 2021, at 4:30 PM, Kyle Weaver >> <mailto:kcwea...@google.com>> wrote:
>>> 
>>> I don't think there's any specific reason we don't set a timeout, I'm 
>>> guessing it was just never worth the effort of implementing. If it's stuck 
>>> it should be pretty obvious from the logs: "Still waiting for startup of 
>>> environment from {} for worker id {}"
>>> 
>>> On Thu, May 27, 2021 at 4:04 PM Ke Wu >> <mailto:ke.wu...@gmail.com>> wrote:
>>> Hi Kyle,
>>> 
>>> Thank you for the prompt response and apologize for the late reply. 
>>> 
>>> [1] seems to be only available in python portable_runner but not java 
>>> PortableRunner, is it intended or we could add similar changes in java as 
>>> well?
>>> 
>>> [2] makes sense to block since the wait/retry is handled in the previous 
>>> prepare(), however, is there any specific reason why we do not want to 
>>> support timeout in start worker request?
>>> 
>>> Best,
>>> Ke
>>> 
>>>> On May 14, 2021, at 11:25 AM, Kyle Weaver >>> <mailto:kcwea...@google.com>> wrote:
>>>> 
>>>> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
>>>> retry/wait logic. In some places we have a configurable timeout (which 
>>>> defaults to 60s) [1], while in other places we block [2][3].
>>>> 
>>>> [1] https://issues.apache.org/jira/browse/BEAM-7933 
>>>> <https://issues.apache.org/jira/browse/BEAM-7933>
>>>> [2] 
>>>> https://github.com/apache/beam/blob/51541a595b09751

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-06-02 Thread Ke Wu
Hi Kyle,

Thanks for reviewing https://github.com/apache/beam/pull/14923 
<https://github.com/apache/beam/pull/14923>. I would like to follow up with the 
deadline & waitForReady on ExternalEnvironment here.

In Kubernetes, if my understanding is correct, there is no ordering support 
when bringing up containers (init container does not work for external mode 
because it requires init container to complete). Therefore, if we bring up the 
external worker pool container together with the runner container, which is one 
the supported approach by Flink Runner on K8s [1], then it is possible that 
external worker pool service may not be available when runner tries to connect. 

What are the recommended way to guarantee the availability of external worker 
pool service before connecting? We would like to adopt the pattern to support 
Samza runner on K8s as well.

Best,
Ke


[1] [Public] Beam Flink K8s: 
https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
 
<https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898>
 

> On May 27, 2021, at 6:11 PM, Ke Wu  wrote:
> 
> Good to know. We are working on running java portable pipeline for Samza 
> runner and I believe we could take on the task to enhance the java workflow 
> to support timeout/retry etc on gRPC calls. 
> 
> Created BEAM-12419 <https://issues.apache.org/jira/browse/BEAM-12419> to 
> track the work.
> 
> Best,
> Ke
> 
>> On May 27, 2021, at 4:30 PM, Kyle Weaver > <mailto:kcwea...@google.com>> wrote:
>> 
>> I don't think there's any specific reason we don't set a timeout, I'm 
>> guessing it was just never worth the effort of implementing. If it's stuck 
>> it should be pretty obvious from the logs: "Still waiting for startup of 
>> environment from {} for worker id {}"
>> 
>> On Thu, May 27, 2021 at 4:04 PM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Hi Kyle,
>> 
>> Thank you for the prompt response and apologize for the late reply. 
>> 
>> [1] seems to be only available in python portable_runner but not java 
>> PortableRunner, is it intended or we could add similar changes in java as 
>> well?
>> 
>> [2] makes sense to block since the wait/retry is handled in the previous 
>> prepare(), however, is there any specific reason why we do not want to 
>> support timeout in start worker request?
>> 
>> Best,
>> Ke
>> 
>>> On May 14, 2021, at 11:25 AM, Kyle Weaver >> <mailto:kcwea...@google.com>> wrote:
>>> 
>>> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
>>> retry/wait logic. In some places we have a configurable timeout (which 
>>> defaults to 60s) [1], while in other places we block [2][3].
>>> 
>>> [1] https://issues.apache.org/jira/browse/BEAM-7933 
>>> <https://issues.apache.org/jira/browse/BEAM-7933>
>>> [2] 
>>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>>>  
>>> <https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242>
>>> [3] 
>>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>>>  
>>> <https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115>
>>> On Fri, May 14, 2021 at 10:51 AM Ke Wu >> <mailto:ke.wu...@gmail.com>> wrote:
>>> Hello All,
>>> 
>>> I came across this question when I am reading Beam on Flink on Kubernetes 
>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.x9qy4wlfgc1g>
>>>  and flink-on-k8s-operator 
>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/0310df76d6e2128cd5d2bc51fae4e842d370c463>
>>>  and realized that there seems no retry/wait logic built in PortableRunner 
>>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
>>> implications that:
>>> 
>>> 1. Job Server needs to be ready to accept request before SDK Client could 
>>> submit request.
>>> 2. External Worker Pool Service needs to be ready to accept start/stop 
>>> worker request before r

Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-27 Thread Ke Wu
Good to know. We are working on running java portable pipeline for Samza runner 
and I believe we could take on the task to enhance the java workflow to support 
timeout/retry etc on gRPC calls. 

Created BEAM-12419 <https://issues.apache.org/jira/browse/BEAM-12419> to track 
the work.

Best,
Ke

> On May 27, 2021, at 4:30 PM, Kyle Weaver  wrote:
> 
> I don't think there's any specific reason we don't set a timeout, I'm 
> guessing it was just never worth the effort of implementing. If it's stuck it 
> should be pretty obvious from the logs: "Still waiting for startup of 
> environment from {} for worker id {}"
> 
> On Thu, May 27, 2021 at 4:04 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hi Kyle,
> 
> Thank you for the prompt response and apologize for the late reply. 
> 
> [1] seems to be only available in python portable_runner but not java 
> PortableRunner, is it intended or we could add similar changes in java as 
> well?
> 
> [2] makes sense to block since the wait/retry is handled in the previous 
> prepare(), however, is there any specific reason why we do not want to 
> support timeout in start worker request?
> 
> Best,
> Ke
> 
>> On May 14, 2021, at 11:25 AM, Kyle Weaver > <mailto:kcwea...@google.com>> wrote:
>> 
>> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
>> retry/wait logic. In some places we have a configurable timeout (which 
>> defaults to 60s) [1], while in other places we block [2][3].
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-7933 
>> <https://issues.apache.org/jira/browse/BEAM-7933>
>> [2] 
>> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>>  
>> <https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242>
>> [3] 
>> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>>  
>> <https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115>
>> On Fri, May 14, 2021 at 10:51 AM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Hello All,
>> 
>> I came across this question when I am reading Beam on Flink on Kubernetes 
>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.x9qy4wlfgc1g>
>>  and flink-on-k8s-operator 
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/0310df76d6e2128cd5d2bc51fae4e842d370c463>
>>  and realized that there seems no retry/wait logic built in PortableRunner 
>> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
>> implications that:
>> 
>> 1. Job Server needs to be ready to accept request before SDK Client could 
>> submit request.
>> 2. External Worker Pool Service needs to be ready to accept start/stop 
>> worker request before runner starts to request.
>> 
>> This may bring some challenges on k8s since Flink opt to use multi 
>> containers pattern when bringing up a beam portable pipeline, in addition, I 
>> don’t find any special lifecycle management in place to guarantee the order, 
>> e.g. External Worker Pool Service container to start and ready before the 
>> task manager container to start making requests. 
>> 
>> I am wondering if I missed anything to guarantee the readiness of the 
>> dependent service or we are relying on that dependent containers are much 
>> lighter weigh so it should, in most time, be ready before the other 
>> container start to make requests. 
>> 
>> Best,
>> Ke
>> 
> 



Re: [DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-27 Thread Ke Wu
Hi Kyle,

Thank you for the prompt response and apologize for the late reply. 

[1] seems to be only available in python portable_runner but not java 
PortableRunner, is it intended or we could add similar changes in java as well?

[2] makes sense to block since the wait/retry is handled in the previous 
prepare(), however, is there any specific reason why we do not want to support 
timeout in start worker request?

Best,
Ke

> On May 14, 2021, at 11:25 AM, Kyle Weaver  wrote:
> 
> 1. and 2. are both facilitated by GRPC, which takes care of most of the 
> retry/wait logic. In some places we have a configurable timeout (which 
> defaults to 60s) [1], while in other places we block [2][3].
> 
> [1] https://issues.apache.org/jira/browse/BEAM-7933 
> <https://issues.apache.org/jira/browse/BEAM-7933>
> [2] 
> https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242
>  
> <https://github.com/apache/beam/blob/51541a595b09751dd3dde2c50caf2a968ac01b68/sdks/python/apache_beam/runners/portability/portable_runner.py#L238-L242>
> [3] 
> https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115
>  
> <https://github.com/apache/beam/blob/9601bdef8870bc6acc7895c06252e43ec040bd8c/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java#L115>
> On Fri, May 14, 2021 at 10:51 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello All,
> 
> I came across this question when I am reading Beam on Flink on Kubernetes 
> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.x9qy4wlfgc1g>
>  and flink-on-k8s-operator 
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/0310df76d6e2128cd5d2bc51fae4e842d370c463>
>  and realized that there seems no retry/wait logic built in PortableRunner 
> nor ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
> implications that:
> 
> 1. Job Server needs to be ready to accept request before SDK Client could 
> submit request.
> 2. External Worker Pool Service needs to be ready to accept start/stop worker 
> request before runner starts to request.
> 
> This may bring some challenges on k8s since Flink opt to use multi containers 
> pattern when bringing up a beam portable pipeline, in addition, I don’t find 
> any special lifecycle management in place to guarantee the order, e.g. 
> External Worker Pool Service container to start and ready before the task 
> manager container to start making requests. 
> 
> I am wondering if I missed anything to guarantee the readiness of the 
> dependent service or we are relying on that dependent containers are much 
> lighter weigh so it should, in most time, be ready before the other container 
> start to make requests. 
> 
> Best,
> Ke
> 



[DISCUSS] Client SDK/Job Server/Worker Pool Lifecycle Management on Kubernetes

2021-05-14 Thread Ke Wu
Hello All,

I came across this question when I am reading Beam on Flink on Kubernetes 

 and flink-on-k8s-operator 

 and realized that there seems no retry/wait logic built in PortableRunner nor 
ExternalEnvironmentFactory, (correct me if I am wrong) which creates 
implications that:

1. Job Server needs to be ready to accept request before SDK Client could 
submit request.
2. External Worker Pool Service needs to be ready to accept start/stop worker 
request before runner starts to request.

This may bring some challenges on k8s since Flink opt to use multi containers 
pattern when bringing up a beam portable pipeline, in addition, I don’t find 
any special lifecycle management in place to guarantee the order, e.g. External 
Worker Pool Service container to start and ready before the task manager 
container to start making requests. 

I am wondering if I missed anything to guarantee the readiness of the dependent 
service or we are relying on that dependent containers are much lighter weigh 
so it should, in most time, be ready before the other container start to make 
requests. 

Best,
Ke



Re: Customizable Artifacts to Stage in Java Portable Runner

2021-04-29 Thread Ke Wu
Got you. We are definitely interested in java worker pool to support Samza 
runner use case, and I think we could help implement on it if no one is 
currently working on it.

Comparing with what python offers, what I see that are missing are:

1. Main class/method to start ExternalWorkerService independently 
2. Worker pool mode support in JDK Docker container in boot.go

Is there anything else I missed?

Best,
Ke

> On Apr 29, 2021, at 12:54 PM, Kyle Weaver  wrote:
> 
> Thanks for the info. In order to use supported remote file systems, does it 
> mean it needs to be passed in as FILE_ARTIFACT_URN since neither 
> ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor 
> ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = 
> "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()?
> 
> Yes.
> 
> By the way, it seems the Python implementation of artifact_service does 
> handle URLs [1] - though it might not support them at every level of the 
> stack [2].
>  
> 
> On the other side, under circumstances, such as EXTERNAL environment type 
> with ExternalWorkerService, where artifacts are already available, what is 
> the expected usage to disable artifact staging phase in portable pipeline?
> 
> I think you can just set --filesToStage to empty.
>  
> 
> In addition, I noticed that the python counterpart 
> worker_pool_main#BeamFnExternalWorkerPoolServicer 
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56>
>  does invoke artifact staging service to get artifacts from artifact endpoint 
> specified in StartWorkerRequest but not in the java ExternalWorkerService. Is 
> this discrepancy expected since java worker pool process does not likely want 
> to start the worker with different classpath/classloader?
> 
> 
> It looks like ExternalWorkerService is only used for LOOPBACK mode in Java, 
> so I assume artifact staging/retrieval would be redundant. Whereas in Python, 
> the worker pool you linked to is started independently of job submission. But 
> there's no inherent reason it has to be that way. For example, someday we may 
> want to implement a Java worker pool [3].
> 
> [1] 
> https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85
>  
> <https://github.com/apache/beam/blob/e0136ffc176d157d0928e7d501bca4daca3160a8/sdks/python/apache_beam/runners/portability/artifact_service.py#L81-L85>
> [2] https://issues.apache.org/jira/browse/BEAM-11275 
> <https://issues.apache.org/jira/browse/BEAM-11275>
> [3] https://issues.apache.org/jira/browse/BEAM-8137 
> <https://issues.apache.org/jira/browse/BEAM-8137>
> On Wed, Apr 28, 2021 at 6:36 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thank you Kyle for the prompt response.
> 
> > Yeah, that looks like a bug.
> 
> Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to 
> track the issue.
> 
> > Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
> > Blobstore, HDFS). But arbitrary URLs are not supported.
> 
> Thanks for the info. In order to use supported remote file systems, does it 
> mean it needs to be passed in as FILE_ARTIFACT_URN since neither 
> ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor 
> ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = 
> "beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()?
> 
> On the other side, under circumstances, such as EXTERNAL environment type 
> with ExternalWorkerService, where artifacts are already available, what is 
> the expected usage to disable artifact staging phase in portable pipeline?
> 
> In addition, I noticed that the python counterpart 
> worker_pool_main#BeamFnExternalWorkerPoolServicer 
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56>
>  does invoke artifact staging service to get artifacts from artifact endpoint 
> specified in StartWorkerRequest but not in the java ExternalWorkerService. Is 
> this discrepancy expected since java worker pool process does not likely want 
> to start the worker with different classpath/classloader?
> 
> Best,
> Ke
> 
> 
>> On Apr 28, 2021, at 5:55 PM, Kyle Weaver > <mailto:kcwea...@google.com>> wrote:
>> 
>> > I am expecting FileStagingOptions#setFilesToStage in 
>> > PortablePipelineOptions 
>> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
>> >  is the way to customize artifac

Re: Customizable Artifacts to Stage in Java Portable Runner

2021-04-28 Thread Ke Wu
Thank you Kyle for the prompt response.

> Yeah, that looks like a bug.

Created BEAM-12251 <https://issues.apache.org/jira/browse/BEAM-12251> to track 
the issue.

> Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
> Blobstore, HDFS). But arbitrary URLs are not supported.

Thanks for the info. In order to use supported remote file systems, does it 
mean it needs to be passed in as FILE_ARTIFACT_URN since neither 
ArtifactRetrievalService#URL_ARTIFACT_URN = "beam:artifact:type:url:v1” nor 
ArtifactRetrievalService#STAGING_TO_ARTIFACT_URN = 
"beam:artifact:role:staging_to:v1” seems to be supported in getArtifact()?

On the other side, under circumstances, such as EXTERNAL environment type with 
ExternalWorkerService, where artifacts are already available, what is the 
expected usage to disable artifact staging phase in portable pipeline?

In addition, I noticed that the python counterpart 
worker_pool_main#BeamFnExternalWorkerPoolServicer 
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L56>
 does invoke artifact staging service to get artifacts from artifact endpoint 
specified in StartWorkerRequest but not in the java ExternalWorkerService. Is 
this discrepancy expected since java worker pool process does not likely want 
to start the worker with different classpath/classloader?

Best,
Ke


> On Apr 28, 2021, at 5:55 PM, Kyle Weaver  wrote:
> 
> > I am expecting FileStagingOptions#setFilesToStage in 
> > PortablePipelineOptions 
> > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
> >  is the way to customize artifacts to be staged and resolved in portable 
> > pipeline, however, it looks like that PortableRunner 
> > <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129>
> >  does not add preconfigured files to `filesToStageBuilder` which is used in 
> > the final options to prepare the job. Is this the expected behavior or 
> > maybe a bug?
> 
> Yeah, that looks like a bug.
> 
> > In addition, do we support specifying an URL in 
> > PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
> > retrieve artifacts from a remote address instead of default from JobServer, 
> > which got artifacts from SDK Client. I am asking because I noticed
> 
> Files can use any of Beam's supported remote file systems (GCS, S3, Azure 
> Blobstore, HDFS). But arbitrary URLs are not supported.
> 
> On Wed, Apr 28, 2021 at 5:44 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello All,
> 
> I am expecting FileStagingOptions#setFilesToStage in PortablePipelineOptions 
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L28>
>  is the way to customize artifacts to be staged and resolved in portable 
> pipeline, however, it looks like that PortableRunner 
> <https://github.com/apache/beam/blob/master/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java#L129>
>  does not add preconfigured files to `filesToStageBuilder` which is used in 
> the final options to prepare the job. Is this the expected behavior or maybe 
> a bug?
> 
> In addition, do we support specifying an URL in 
> PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
> retrieve artifacts from a remote address instead of default from JobServer, 
> which got artifacts from SDK Client. I am asking because I noticed
> 
> public static InputStream getArtifact(RunnerApi.ArtifactInformation artifact) 
> throws IOException {
>   switch (artifact.getTypeUrn()) {
> case FILE_ARTIFACT_URN:
>   RunnerApi.ArtifactFilePayload payload =
>   RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload());
>   return Channels.newInputStream(
>   FileSystems.open(
>   FileSystems.matchNewResource(payload.getPath(), false /* is 
> directory */)));
> case EMBEDDED_ARTIFACT_URN:
>   return 
> RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload())
>   .getData()
>   .newInput();
> default:
>   throw new UnsupportedOperationException(
>   "Unexpected artifact type: " + artifact.getTypeUrn());
>   }
> }
> Which indicates that only File and Embed artifacts seem to be supported now.
> 
> Best,
> Ke



Customizable Artifacts to Stage in Java Portable Runner

2021-04-28 Thread Ke Wu
Hello All,

I am expecting FileStagingOptions#setFilesToStage in PortablePipelineOptions 

 is the way to customize artifacts to be staged and resolved in portable 
pipeline, however, it looks like that PortableRunner 

 does not add preconfigured files to `filesToStageBuilder` which is used in the 
final options to prepare the job. Is this the expected behavior or maybe a bug?

In addition, do we support specifying an URL in 
PortablePipelineOptions#filesToStage so that ArtifactRetrievalService can 
retrieve artifacts from a remote address instead of default from JobServer, 
which got artifacts from SDK Client. I am asking because I noticed

public static InputStream getArtifact(RunnerApi.ArtifactInformation artifact) 
throws IOException {
  switch (artifact.getTypeUrn()) {
case FILE_ARTIFACT_URN:
  RunnerApi.ArtifactFilePayload payload =
  RunnerApi.ArtifactFilePayload.parseFrom(artifact.getTypePayload());
  return Channels.newInputStream(
  FileSystems.open(
  FileSystems.matchNewResource(payload.getPath(), false /* is 
directory */)));
case EMBEDDED_ARTIFACT_URN:
  return RunnerApi.EmbeddedFilePayload.parseFrom(artifact.getTypePayload())
  .getData()
  .newInput();
default:
  throw new UnsupportedOperationException(
  "Unexpected artifact type: " + artifact.getTypeUrn());
  }
}
Which indicates that only File and Embed artifacts seem to be supported now.

Best,
Ke

Re: Portable Java Pipeline Support

2021-04-26 Thread Ke Wu
That makes sense. 

For Samza Runner, we are looking to leverage java portable mode to achieve 
“split deployment” where runner is independently packaged w/o user code and 
user code should only exist in the submission/worker process. I believe this is 
supported by portable mode and therefore we would prefer to use LOOPBACK (for 
testing) and DOCKER (for production) mode.

Is there a way to get BEAM-12227 
<https://issues.apache.org/jira/browse/BEAM-12227> prioritized or the fastest 
way is to patch it ourselves?

Best,
Ke


> On Apr 26, 2021, at 10:17 AM, Kyle Weaver  wrote:
> 
> The reason is the Flink and Spark runners are written in Java. So when the 
> runner needs to execute user code written in Java, an EMBEDDED environment 
> can be started in the runner. Whereas the runner cannot natively execute 
> Python code, so it needs to call out to an external process. In the case of 
> LOOPBACK, that external process is started by the Python client process that 
> submitted the job in the first place.
> 
> On Mon, Apr 26, 2021 at 9:57 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thank you Kyle, I have created BEAM-12227 
> <https://issues.apache.org/jira/browse/BEAM-12227> to track the unimplemented 
> exception.
> 
> Is there any specific reason that Java tests are using EMBEDDED mode while 
> python usually in LOOPBACK mode?
> 
> Best,
> Ke
> 
>> On Apr 23, 2021, at 4:01 PM, Kyle Weaver > <mailto:kcwea...@google.com>> wrote:
>> 
>> I couldn't find any existing ticket for this issue (you may be the first to 
>> discover it). Feel free to create one with your findings. (FWIW I did find a 
>> ticket for documenting portable Java pipelines [1]).
>> 
>> For the Flink and Spark runners, we run most of our Java tests using 
>> EMBEDDED mode. For portable Samza, you will likely want to use a similar 
>> setup [2].
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-11062 
>> <https://issues.apache.org/jira/browse/BEAM-11062>
>> [2] 
>> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>>  
>> <https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186>
>> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Thank you, Kyle, for the detailed answer. 
>> 
>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
>> essential, especially for local testing as Samza Runner adopts portable mode 
>> and we are intended to run it with Java pipeline a lot.
>> 
>> In addition, I noticed that this issue does not happen every time LOOPBACK 
>> is used, for example:
>> Pipeline p = Pipeline.create(options);
>> 
>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
>> 3L), KV.of("3", 9L)))
>> .apply(Sum.longsPerKey())
>> .apply(MapElements.via(new PrintFn()));
>> 
>> p.run().waitUntilFinish();
>> Where PrintFn simply prints the result:
>> public static class PrintFn extends SimpleFunction, String> 
>> {
>>   @Override
>>   public String apply(KV input) {
>> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>> return input.getKey() + ": " + input.getValue();
>>   }
>> }
>> 
>> This simple pipeline did work in Java LOOPBACK mode.
>> 
>> Best,
>> Ke
>> 
>>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver >> <mailto:kcwea...@google.com>> wrote:
>>> 
>>> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
>>> method unimplemented exception is a bug, and we haven't caught it because 
>>> (as far as I know) we don't test the Java loopback worker.
>>> 
>>> As an alternative, you can try building the Java docker environment with 
>>> "./gradlew :sdks:java:container:java8:docker" and then use 
>>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
>>> be able to access the host filesystem [1].
>>> 
>>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the 
>>> embedded environment assumes the dependencies are already present on the 
>>> runner, which will not be the case unless you modify the job server to 
>>> depend on the examples module.
>>> 
>>> [1] https://issues.apache.org/jira/browse/BEAM-5440 
>>> <https://issues.apache.org/jira/browse/BE

Re: Portable Java Pipeline Support

2021-04-26 Thread Ke Wu
Thank you Kyle, I have created BEAM-12227 
<https://issues.apache.org/jira/browse/BEAM-12227> to track the unimplemented 
exception.

Is there any specific reason that Java tests are using EMBEDDED mode while 
python usually in LOOPBACK mode?

Best,
Ke

> On Apr 23, 2021, at 4:01 PM, Kyle Weaver  wrote:
> 
> I couldn't find any existing ticket for this issue (you may be the first to 
> discover it). Feel free to create one with your findings. (FWIW I did find a 
> ticket for documenting portable Java pipelines [1]).
> 
> For the Flink and Spark runners, we run most of our Java tests using EMBEDDED 
> mode. For portable Samza, you will likely want to use a similar setup [2].
> 
> [1] https://issues.apache.org/jira/browse/BEAM-11062 
> <https://issues.apache.org/jira/browse/BEAM-11062>
> [2] 
> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>  
> <https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186>
> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thank you, Kyle, for the detailed answer. 
> 
> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
> essential, especially for local testing as Samza Runner adopts portable mode 
> and we are intended to run it with Java pipeline a lot.
> 
> In addition, I noticed that this issue does not happen every time LOOPBACK is 
> used, for example:
> Pipeline p = Pipeline.create(options);
> 
> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
> 3L), KV.of("3", 9L)))
> .apply(Sum.longsPerKey())
> .apply(MapElements.via(new PrintFn()));
> 
> p.run().waitUntilFinish();
> Where PrintFn simply prints the result:
> public static class PrintFn extends SimpleFunction, String> {
>   @Override
>   public String apply(KV input) {
> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
> return input.getKey() + ": " + input.getValue();
>   }
> }
> 
> This simple pipeline did work in Java LOOPBACK mode.
> 
> Best,
> Ke
> 
>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver > <mailto:kcwea...@google.com>> wrote:
>> 
>> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
>> method unimplemented exception is a bug, and we haven't caught it because 
>> (as far as I know) we don't test the Java loopback worker.
>> 
>> As an alternative, you can try building the Java docker environment with 
>> "./gradlew :sdks:java:container:java8:docker" and then use 
>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
>> be able to access the host filesystem [1].
>> 
>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
>> environment assumes the dependencies are already present on the runner, 
>> which will not be the case unless you modify the job server to depend on the 
>> examples module.
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-5440 
>> <https://issues.apache.org/jira/browse/BEAM-5440>
>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Hi All,
>> 
>> I am working on add portability support for Samza Runner and having been 
>> playing around on the support in Flink and Spark runner recently. 
>> 
>> One thing I noticed is the lack of documentation on how to run a java 
>> pipeline in a portable mode. Almost all document focuses on how to run a 
>> python pipeline, which is understandable. I believe a java pipeline can be 
>> executed in portable mode as well so I did some experiments but results are 
>> not expected and would like to know if they are expected:
>> 
>> 
>> 1. Add portability module to example so PipelineOptionsFactory can recognize 
>> PortableRunner:
>> $ git diff
>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>> index 62f15ec24b..c9069d3f4f 100644
>> --- a/examples/java/build.gradle
>> +++ b/examples/java/build.gradle
>> @@ -59,6 +59,7 @@ dependencies {
>>compile project(":sdks:java:extensions:google-cloud-platform-core")
>>compile project(":sdks:java:io:google-cloud-platform")
>>compile project(":sdks:java:io:kafka")
>> +  compile project(":runners:portability:java")
>>compile project(":sdks:java:extensions:ml")
>>compile library.java.avro
>&g

Re: Portable Java Pipeline Support

2021-04-23 Thread Ke Wu
Thank you, Kyle, for the detailed answer. 

Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
essential, especially for local testing as Samza Runner adopts portable mode 
and we are intended to run it with Java pipeline a lot.

In addition, I noticed that this issue does not happen every time LOOPBACK is 
used, for example:
Pipeline p = Pipeline.create(options);

p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
3L), KV.of("3", 9L)))
.apply(Sum.longsPerKey())
.apply(MapElements.via(new PrintFn()));

p.run().waitUntilFinish();
Where PrintFn simply prints the result:
public static class PrintFn extends SimpleFunction, String> {
  @Override
  public String apply(KV input) {
LOG.info("Key {}: value {}", input.getKey(), input.getValue());
return input.getKey() + ": " + input.getValue();
  }
}

This simple pipeline did work in Java LOOPBACK mode.

Best,
Ke

> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
> 
> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
> method unimplemented exception is a bug, and we haven't caught it because (as 
> far as I know) we don't test the Java loopback worker.
> 
> As an alternative, you can try building the Java docker environment with 
> "./gradlew :sdks:java:container:java8:docker" and then use 
> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
> be able to access the host filesystem [1].
> 
> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
> environment assumes the dependencies are already present on the runner, which 
> will not be the case unless you modify the job server to depend on the 
> examples module.
> 
> [1] https://issues.apache.org/jira/browse/BEAM-5440 
> <https://issues.apache.org/jira/browse/BEAM-5440>
> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hi All,
> 
> I am working on add portability support for Samza Runner and having been 
> playing around on the support in Flink and Spark runner recently. 
> 
> One thing I noticed is the lack of documentation on how to run a java 
> pipeline in a portable mode. Almost all document focuses on how to run a 
> python pipeline, which is understandable. I believe a java pipeline can be 
> executed in portable mode as well so I did some experiments but results are 
> not expected and would like to know if they are expected:
> 
> 
> 1. Add portability module to example so PipelineOptionsFactory can recognize 
> PortableRunner:
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>compile project(":sdks:java:extensions:google-cloud-platform-core")
>compile project(":sdks:java:io:google-cloud-platform")
>compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>compile project(":sdks:java:extensions:ml")
>compile library.java.avro
>compile library.java.bigdataoss_util
> 
> 2. Bring up the Job Server: 
> 
>   Spark: ./gradlew :runners:spark:3:job-server:runShadow
>   Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
> 
> 3. Execute WordCount example:
> 
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=README.md --output=/tmp/output 
> --runner=PortableRunner --jobEndpoint=localhost:8099 
> --defaultEnvironmentType=LOOPBACK"
> 
> 
> Neither Flink or Spark runner worked for WordCount because of 
> 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
> UNIMPLEMENTED: Method 
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
> unimplemented
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
> at 
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
> at 
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
> at 
> org.apache.beam.runners

Portable Java Pipeline Support

2021-04-23 Thread Ke Wu
Hi All,

I am working on add portability support for Samza Runner and having been 
playing around on the support in Flink and Spark runner recently. 

One thing I noticed is the lack of documentation on how to run a java pipeline 
in a portable mode. Almost all document focuses on how to run a python 
pipeline, which is understandable. I believe a java pipeline can be executed in 
portable mode as well so I did some experiments but results are not expected 
and would like to know if they are expected:


1. Add portability module to example so PipelineOptionsFactory can recognize 
PortableRunner:
$ git diff
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 62f15ec24b..c9069d3f4f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -59,6 +59,7 @@ dependencies {
   compile project(":sdks:java:extensions:google-cloud-platform-core")
   compile project(":sdks:java:io:google-cloud-platform")
   compile project(":sdks:java:io:kafka")
+  compile project(":runners:portability:java")
   compile project(":sdks:java:extensions:ml")
   compile library.java.avro
   compile library.java.bigdataoss_util

2. Bring up the Job Server: 

Spark: ./gradlew :runners:spark:3:job-server:runShadow
Flink: ./gradlew :runners:flink:1.12:job-server:runShadow

3. Execute WordCount example:

./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=README.md --output=/tmp/output --runner=PortableRunner 
--jobEndpoint=localhost:8099 --defaultEnvironmentType=LOOPBACK"


Neither Flink or Spark runner worked for WordCount because of 

org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
UNIMPLEMENTED: Method 
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
unimplemented
at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
at 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
at 
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
at 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
at 

Unexpected empty state address window set for ACTIVE window

2021-03-10 Thread Ke Wu
Hello,

We recently encountered an weird exception:

Caused by: java.lang.IllegalStateException: Unexpected empty state address 
window set for ACTIVE window 
[2021-02-10T17:25:10.715Z..2021-02-10T20:25:10.715Z)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:588)
 ~[beam-vendor-guava-26_0-jre-0.1.jar:?]
at 
org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants(MergingActiveWindowSet.java:329)
 ~[beam-runners-core-java-2.20.7.9.jar:?]
at 
org.apache.beam.runners.core.MergingActiveWindowSet.persist(MergingActiveWindowSet.java:88)
 ~[beam-runners-core-java-2.20.7.9.jar:?]
at 
org.apache.beam.runners.core.ReduceFnRunner.persist(ReduceFnRunner.java:380) 
~[beam-runners-core-java-2.20.7.9.jar:?]
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:138)
 ~[beam-runners-core-java-2.20.7.9.jar:?]

which suggests there is an empty window in ReduceFnRunner#activeWindows

After some more digging and we found there could be two possibilities that 
could result in such state:

1. ReduceFnRunner#onTimers is clearing entry value, causing an empty value 
entry in ReduceFnRunner#activeWindows

2. ReduceFnRunner#activeWindows initialized with empty value in 
MergingActiveWindowSet#emptyIfNull, then empty iterable windowed values is 
passed in the subsequent ReduceFnRunner#processElements, where 
cleanupTemporaryWindows() is not invoked at all before ReduceFnRunner#persist

Questions are, are #1 and/or #2 possible to happen? If Yes, under what 
circumstances?

Best,
Ke

Re: Implication on PTransform name

2021-02-17 Thread Ke Wu
Thank you Reuven for the answers. I see that the answers are mostly Runner 
dependent, does it mean, BEAM as SDK, does not specify the intended behavior 
and it delegates to each runner to interpret the expected behavior?

Thanks,
Ke

> On Feb 17, 2021, at 11:48 AM, Reuven Lax  wrote:
> 
> 
> 
> On Wed, Feb 17, 2021 at 11:25 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello everyone,
> 
> Is there any documentation on the implication of name param when applying a 
> PTransform on PCollection? Some questions I have on mind is
> 
> Is the name required to be unique across the pipeline?
> No - only needs to be unique within the current scope. 
> For stateful PTransforms such as stateful ParDo, Combine and Window, if no 
> name is provided, is state preserved during upgrades?
> Runner dependent, but generally the name is required to perform upgrades.
>  
> If a stable name is provided,  is state preserved during upgrades?
> Runner dependent, but often yes. 
> If a stable name is provided and changed intentionally during upgrade, is 
> state expected to be clearer?
> Again runner dependent, but for the dataflow runner you currently can pass in 
> a name mapping that maps the old name to the new one. 
> 
> Thanks,
> Ke
> 
> 



Implication on PTransform name

2021-02-17 Thread Ke Wu
Hello everyone,

Is there any documentation on the implication of name param when applying a 
PTransform on PCollection? Some questions I have on mind is

Is the name required to be unique across the pipeline?
For stateful PTransforms such as stateful ParDo, Combine and Window, if no name 
is provided, is state preserved during upgrades?
If a stable name is provided,  is state preserved during upgrades?
If a stable name is provided and changed intentionally during upgrade, is state 
expected to be clearer?

Thanks,
Ke




Re: Issue when FnApiDoFnRunner executes Read.BoundedSourceAsSDFWrapperFn

2021-02-04 Thread Ke Wu
Thank you Boyuan for the explanation! This explains why it did not work since 
Samza does not wire in SamzaPipelineRunner when executing in portable mode yet. 

I will create a ticket to update Samza runner.

Best,
Ke

> On Feb 4, 2021, at 12:07 PM, Boyuan Zhang  wrote:
> 
> Hi Ke,
> 
>  is it expected that Create.of will be expanded to a SDF
> In Java SDK, Create.of will be expanded into CreateSource, which will be 
> wrapped into SDF implementation.
> 
>  with regular pardo:v1 urn?
> No, the runner should run SplittableParDoExpander[1] to expand SDF into 
> SPLITTABLE_PAIR_WITH_RESTRICTION_URN, 
> SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN and 
> SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.  
> 
> I do see that SamzaPipelineRunner running the expansion[2]. Can you double 
> check whether your job invokes that code path?
> [1]  
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
>  
> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java>
> [2] 
> https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47
>  
> <https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47>
>  
> 
> On Thu, Feb 4, 2021 at 11:31 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello Beamers,
> 
> I am trying out a simple pipeline to be executed on PortableRunner:
> 
> 
> PortablePipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
> options.setJobEndpoint(some_url);
> options.setDefaultEnvironmentType("LOOPBACK");
> options.setRunner(PortableRunner.class);
> 
> Pipeline pipeline = Pipeline.create(options);
> 
> pipeline
> .apply(Create.of("1", "2", "3”))
> .apply(…print to console...);
> 
> pipeline.run()
> ```
> 
> This pipeline works with runners such as SamzaRunner, however, when in 
> portable mode, it does not work. 
> 
> I did some debugging and it turns out that it failed because when 
> Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding 
> RestrictionTracke is null. This seems to be caused the expanded SDF transform 
> has urn of "beam:transform:pardo:v1”, in which case FnApiDoFnRunner is 
> created with 
> 
> ```
> mainInputConsumer = this::processElementForParDo; 
> ```
> 
> which does not create tracker at all. I do see the other processing method 
> such as 
> processElementForSplitRestriction()
> processElementForWindowObservingSplitRestriction()
> processElementForTruncateRestriction() 
> 
> etc are creating trackers properly before invoking DoFn, however, they are 
> requiring a different Urn for the Transform.
> 
> My questions here are, did I miss anything? is it expected that Create.of 
> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is the 
> expected behavior when FnApiDoFnRunner invokes 
> Read.BoundedSourceAsSDFWrapperFn?
> 
> Best,
> Ke
>  
> 



Issue when FnApiDoFnRunner executes Read.BoundedSourceAsSDFWrapperFn

2021-02-04 Thread Ke Wu
Hello Beamers,

I am trying out a simple pipeline to be executed on PortableRunner:


PortablePipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
options.setJobEndpoint(some_url);
options.setDefaultEnvironmentType("LOOPBACK");
options.setRunner(PortableRunner.class);

Pipeline pipeline = Pipeline.create(options);

pipeline
.apply(Create.of("1", "2", "3”))
.apply(…print to console...);

pipeline.run()
```

This pipeline works with runners such as SamzaRunner, however, when in portable 
mode, it does not work. 

I did some debugging and it turns out that it failed because when 
Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding 
RestrictionTracke is null. This seems to be caused the expanded SDF transform 
has urn of "beam:transform:pardo:v1”, in which case FnApiDoFnRunner is created 
with 

```
mainInputConsumer = this::processElementForParDo; 
```

which does not create tracker at all. I do see the other processing method such 
as 
processElementForSplitRestriction()
processElementForWindowObservingSplitRestriction()
processElementForTruncateRestriction() 

etc are creating trackers properly before invoking DoFn, however, they are 
requiring a different Urn for the Transform.

My questions here are, did I miss anything? is it expected that Create.of will 
be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is the 
expected behavior when FnApiDoFnRunner invokes Read.BoundedSourceAsSDFWrapperFn?

Best,
Ke
 



Re: Cross language pipeline example

2020-11-12 Thread Ke Wu
Thank you everyone for the examples! Looking forward to see the updated 
documents as well!

Best,
Ke

> On Nov 12, 2020, at 1:17 PM, Chamikara Jayalath  wrote:
> 
> Seems like a good place to promote this PR that adds documentation for 
> cross-language transforms :)
> https://github.com/apache/beam/pull/13317 
> <https://github.com/apache/beam/pull/13317>
> 
> This covers the following for both Java and Python SDKs.
> * Creating new cross-language transforms - primary audience will be transform 
> authors who wish to make existing Java/Python transforms available to other 
> SDKs.
> * Using cross-language transforms - primary audience will be pipeline authors 
> that wish to use existing cross-language transforms with or without language 
> specific wrappers.
> 
> Also this introduces the term "Multi-Language Pipelines" to denote pipelines 
> that use cross-language transforms (and hence utilize more than one SDK 
> language).
> 
> Thanks +Dave Wrede <mailto:dwr...@google.com> for working on this.
> 
> - Cham
> 
> On Thu, Nov 12, 2020 at 4:56 AM Ismaël Mejía  <mailto:ieme...@gmail.com>> wrote:
> I was not aware of these examples Brian, thanks for sharing. Maybe we should
> make these examples more discoverable on the website or as part of Beam's
> programming guide.
> 
> It would be nice to have an example of the opposite too, calling a Python
> transform from Java.
> 
> Additionally Java users who want to integrate python might be lost because
> External is NOT part of Beam's Java SDK (the transform is hidden inside of a
> different module core-construction-java), so it does not even appear in the
> website SDK javadoc.
> https://issues.apache.org/jira/browse/BEAM-8546 
> <https://issues.apache.org/jira/browse/BEAM-8546>
> 
> 
> On Wed, Nov 11, 2020 at 8:41 PM Brian Hulette  <mailto:bhule...@google.com>> wrote:
> >
> > Hi Ke,
> >
> > A cross-language pipeline looks a lot like a pipeline written natively in 
> > one of the Beam SDKs, the difference is that some of the transforms in the 
> > pipeline may be "external transforms" that actually have implementations in 
> > a different language. There are a few examples in the beam repo that use 
> > Java transforms from Python pipelines:
> > - kafkataxi [1]: Uses Java's KafkaIO from Python
> > - wordcount_xlang_sql [2] and sql_taxi [3]: Use Java's SqlTransform from 
> > Python
> >
> > To create your own cross-language pipeline, you'll need to decide which SDK 
> > you want to use primarily, and then create an expansion service to expose 
> > the transforms you want to use from the other SDK (if one doesn't exist 
> > already).
> >
> > [1] 
> > https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi
> >  
> > <https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/kafkataxi>
> > [2] 
> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang_sql.py
> >  
> > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang_sql.py>
> > [3] 
> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py
> >  
> > <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py>
> >
> > On Wed, Nov 11, 2020 at 11:07 AM Ke Wu  > <mailto:ke.wu...@gmail.com>> wrote:
> >>
> >> Hello,
> >>
> >> Is there an example demonstrating how a cross language pipeline look like? 
> >> e.g. a pipeline where it is composes of Java and Python code/transforms.
> >>
> >> Best,
> >> Ke



Cross language pipeline example

2020-11-11 Thread Ke Wu
Hello,

Is there an example demonstrating how a cross language pipeline look like? e.g. 
a pipeline where it is composes of Java and Python code/transforms.

Best,
Ke

Re: Upgrade instruction from TimerDataCoder to TimerDataCoderV2

2020-11-06 Thread Ke Wu
Thank you Jeff for the quick reply. Does this mean, if a stateful job using the 
old coder wants to start using timer family id, then it needs to discard all 
its state first before it can switch to use V2 coder to have timer family id 
support?

Best,
Ke

> On Nov 6, 2020, at 11:27 AM, Jeff Klukas  wrote:
> 
> Ke - You are correct that generally data encoded with a previous coder 
> version cannot be read with an updated coder. The formats have to match 
> exactly.
> 
> As far as I'm aware, it's necessary to flush a job and start with fresh state 
> in order to upgrade coders.
> 
> On Fri, Nov 6, 2020 at 2:13 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello,
> 
> I found that TimerDataCoderV2 is created to include timer family id and 
> output timestamps fields in TimerData. In addition, the new fields are 
> encoded between old fields, which I suppose V2 coder cannot decode and data 
> that is encoded by V1 coder and vice versus. My ask here is, how should we 
> properly upgrade without losing existing states persisted in a store?
> 
> Best,
> Ke



Upgrade instruction from TimerDataCoder to TimerDataCoderV2

2020-11-06 Thread Ke Wu
Hello,

I found that TimerDataCoderV2 is created to include timer family id and output 
timestamps fields in TimerData. In addition, the new fields are encoded between 
old fields, which I suppose V2 coder cannot decode and data that is encoded by 
V1 coder and vice versus. My ask here is, how should we properly upgrade 
without losing existing states persisted in a store?

Best,
Ke

Re: Contributor permissions for Beam Jira

2020-11-06 Thread Ke Wu
Thank you, Alexey!

> On Nov 6, 2020, at 5:58 AM, Alexey Romanenko  wrote:
> 
> Done, I added you to contributors list.
> 
> Welcome! 
> 
> Please, take a look on Beam Contribution Guide if not yet =)
> https://beam.apache.org/contribute/
> 
> Alexey
> 
>> On 5 Nov 2020, at 20:10, Ke Wu  wrote:
>> 
>> Absolutely, my jira username is kw2542
>> 
>> Thanks,
>> Ke
>> 
>>> On Nov 5, 2020, at 2:47 AM, Alexey Romanenko  
>>> wrote:
>>> 
>>> Hi,
>>> 
>>> Sure. Could you provide your Jira username, please?
>>> 
>>>> On 5 Nov 2020, at 00:48, Ke Wu  wrote:
>>>> 
>>>> Hello, 
>>>> 
>>>> I am working at Samza team at LinkedIn and I would like to contribute to 
>>>> Samza runner in Beam. Could I please have permission to add/assign tickets 
>>>> on the Beam Jira?
>>>> 
>>>> Best,
>>>> Ke
>>> 
>> 
> 



Re: Contributor permissions for Beam Jira

2020-11-05 Thread Ke Wu
Absolutely, my jira username is kw2542

Thanks,
Ke

> On Nov 5, 2020, at 2:47 AM, Alexey Romanenko  wrote:
> 
> Hi,
> 
> Sure. Could you provide your Jira username, please?
> 
>> On 5 Nov 2020, at 00:48, Ke Wu  wrote:
>> 
>> Hello, 
>> 
>> I am working at Samza team at LinkedIn and I would like to contribute to 
>> Samza runner in Beam. Could I please have permission to add/assign tickets 
>> on the Beam Jira?
>> 
>> Best,
>> Ke
> 



Contributor permissions for Beam Jira

2020-11-04 Thread Ke Wu
Hello, 

I am working at Samza team at LinkedIn and I would like to contribute to Samza 
runner in Beam. Could I please have permission to add/assign tickets on the 
Beam Jira?

Best,
Ke

Re: @StateId uniqueness across DoFn(s)

2020-08-25 Thread Ke Wu
Thank you all for the reply. One last question, I noticed that 
ParDoTest$StateTests > testValueStateSameId 
<https://github.com/apache/beam/blob/c7e8c6bdb819b0d32f8727036a09c227529d01d0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2118>
 seems to be testing exact this case, however, the first ParDo intentionally 
changed the key of inputs thus the subsequent ParDo would never share the same 
state cell anyway. Is this expected or do we want to actually want to test that 
same state id in different DoFn(s) is actually completely separate?

> On Aug 21, 2020, at 4:50 PM, Robert Bradshaw  wrote:
> 
> We should be using PTransform Labels (aka Names), not ids, for naming
> state. This is why the names must line up when doing, for example, a
> Dataflow update operation with Stateful DoFns.
> 
> (And, yes, if the user does not specify the transform name, and it is
> autogenerated differently, this will be an error. This is why we throw
> exceptions in the SDK if a name is re-used rather than just appending
> a counter or similar.)
> 
> 
> On Fri, Aug 21, 2020 at 4:12 PM Ke Wu  wrote:
>> 
>> If user does not explicitly specify transform name, in which case a 
>> autogenerated name will be used when generating the unique id, does it mean, 
>> the id could change when the pipeline changes, such as adding extra 
>> transforms etc?
>> 
>> On Aug 21, 2020, at 11:43 AM, Luke Cwik  wrote:
>> 
>> The DoFn is associated with a PTransform and in the pipeline proto there is 
>> a unique id associated with each PTransform. You can use that to generate a 
>> composite key (ptransformid, stateid) which will be unique within the 
>> pipeline.
>> 
>> On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  wrote:
>>> 
>>> Thank you Reuven for the confirmation. Do you know what is the recommended 
>>> way for underlying runners to distinguish same state id in different 
>>> DoFn(s)?
>>> 
>>> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
>>> 
>>> StateId is scoped to the DoFn. You can use the same string in different 
>>> DoFns for completely different states.
>>> 
>>> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  wrote:
>>>> 
>>>> Hello everyone,
>>>> 
>>>> After reading through Stateful processing with Apache Beam and 
>>>> DoFn.StateId, I understand that each state id must be unique and must be 
>>>> the same type at least in the same DoFn, however, it does not explicitly 
>>>> mention whether or not it is expected and supported that the same state id 
>>>> to be declared in different DoFn(s). If Yes, is the state supposed to be a 
>>>> shared state or is supposed to completed separate, therefore it could even 
>>>> be different types. If No, it seems that the validation in Beam SDK only 
>>>> validates uniqueness in the same DoFn.
>>>> 
>>>> Thanks,
>>>> Ke
>>> 
>>> 
>> 



Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
If user does not explicitly specify transform name, in which case a 
autogenerated name will be used when generating the unique id, does it mean, 
the id could change when the pipeline changes, such as adding extra transforms 
etc?

> On Aug 21, 2020, at 11:43 AM, Luke Cwik  wrote:
> 
> The DoFn is associated with a PTransform and in the pipeline proto there is a 
> unique id associated with each PTransform. You can use that to generate a 
> composite key (ptransformid, stateid) which will be unique within the 
> pipeline.
> 
> On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Thank you Reuven for the confirmation. Do you know what is the recommended 
> way for underlying runners to distinguish same state id in different DoFn(s)?
> 
>> On Aug 21, 2020, at 10:27 AM, Reuven Lax > <mailto:re...@google.com>> wrote:
>> 
>> StateId is scoped to the DoFn. You can use the same string in different 
>> DoFns for completely different states.
>> 
>> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu > <mailto:ke.wu...@gmail.com>> wrote:
>> Hello everyone,
>> 
>> After reading through Stateful processing with Apache Beam 
>> <https://beam.apache.org/blog/stateful-processing/> and DoFn.StateId 
>> <https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/DoFn.StateId.html>,
>>  I understand that each state id must be unique and must be the same type at 
>> least in the same DoFn, however, it does not explicitly mention whether or 
>> not it is expected and supported that the same state id to be declared in 
>> different DoFn(s). If Yes, is the state supposed to be a shared state or is 
>> supposed to completed separate, therefore it could even be different types. 
>> If No, it seems that the validation in Beam SDK only validates uniqueness in 
>> the same DoFn.
>> 
>> Thanks,
>> Ke
> 



Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
Thank you Reuven for the confirmation. Do you know what is the recommended way 
for underlying runners to distinguish same state id in different DoFn(s)?

> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
> 
> StateId is scoped to the DoFn. You can use the same string in different DoFns 
> for completely different states.
> 
> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> Hello everyone,
> 
> After reading through Stateful processing with Apache Beam 
> <https://beam.apache.org/blog/stateful-processing/> and DoFn.StateId 
> <https://beam.apache.org/releases/javadoc/2.23.0/org/apache/beam/sdk/transforms/DoFn.StateId.html>,
>  I understand that each state id must be unique and must be the same type at 
> least in the same DoFn, however, it does not explicitly mention whether or 
> not it is expected and supported that the same state id to be declared in 
> different DoFn(s). If Yes, is the state supposed to be a shared state or is 
> supposed to completed separate, therefore it could even be different types. 
> If No, it seems that the validation in Beam SDK only validates uniqueness in 
> the same DoFn.
> 
> Thanks,
> Ke



@StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
Hello everyone,

After reading through Stateful processing with Apache Beam 
 and DoFn.StateId 
,
 I understand that each state id must be unique and must be the same type at 
least in the same DoFn, however, it does not explicitly mention whether or not 
it is expected and supported that the same state id to be declared in different 
DoFn(s). If Yes, is the state supposed to be a shared state or is supposed to 
completed separate, therefore it could even be different types. If No, it seems 
that the validation in Beam SDK only validates uniqueness in the same DoFn.

Thanks,
Ke

Re: Percentile metrics in Beam

2020-08-18 Thread Ke Wu
; We didn't decide on making a private API. But rather an API available to user 
> code for populating metrics with specific labels, and specific URNs. The same 
> API could pretty much be used for user USER_HISTOGRAM. with a default URN 
> chosen.
> Thats how I see it in my head at the moment.
> 
> 
> On Fri, Aug 14, 2020 at 8:52 PM Robert Bradshaw  <mailto:rober...@google.com>> wrote:
> On Fri, Aug 14, 2020 at 7:35 PM Alex Amato  <mailto:ajam...@google.com>> wrote:
> >
> > I am only tackling the specific metrics covered in (for the python SDK 
> > first, then Java). To collect latency of IO API RPCS, and store it in a 
> > histogram.
> > https://s.apache.org/beam-gcp-debuggability 
> > <https://s.apache.org/beam-gcp-debuggability>
> >
> > User histogram metrics are unfunded, as far as I know. But you should be 
> > able to extend what I do for that project to the user metric use case. I 
> > agree, it won't be much more work to support that. I designed the histogram 
> > with the user histogram case in mind.
> 
> From the portability point of view, all metrics generated in users
> code (and SDK-side IOs are "user code") are user metrics. But
> regardless of how things are named, once we have histogram metrics
> crossing the FnAPI boundary all the infrastructure will be in place.
> (At least the plan as I understand it shouldn't use private APIs
> accessible only by the various IOs but not other SDK-level code.)
> 
> > On Fri, Aug 14, 2020 at 5:47 PM Robert Bradshaw  > <mailto:rober...@google.com>> wrote:
> >>
> >> Once histograms are implemented in the SDK(s) (Alex, you're tackling
> >> this, right?) it shoudn't be much work to update the Samza worker code
> >> to publish these via the Samza runner APIs (in parallel with Alex's
> >> work to do the same on Dataflow).
> >>
> >> On Fri, Aug 14, 2020 at 5:35 PM Alex Amato  >> <mailto:ajam...@google.com>> wrote:
> >> >
> >> > Noone has any plans currently to work on adding a generic histogram 
> >> > metric, at the moment.
> >> >
> >> > But I will be actively working on adding it for a specific set of 
> >> > metrics in the next quarter or so
> >> > https://s.apache.org/beam-gcp-debuggability 
> >> > <https://s.apache.org/beam-gcp-debuggability>
> >> >
> >> > After that work, one could take a look at my PRs for reference to create 
> >> > new metrics using the same histogram. One may wish to implement the 
> >> > UserHistogram use case and use that in the Samza Runner
> >> >
> >> >
> >> >
> >> >
> >> > On Fri, Aug 14, 2020 at 5:25 PM Ke Wu  >> > <mailto:ke.wu...@gmail.com>> wrote:
> >> >>
> >> >> Thank you Robert and Alex. I am not running a Beam job in Google Cloud 
> >> >> but with Samza Runner, so I am wondering if there is any ETA to add the 
> >> >> Histogram metrics in Metrics class so it can be mapped to the 
> >> >> SamzaHistogram metric to the actual emitting.
> >> >>
> >> >> Best,
> >> >> Ke
> >> >>
> >> >> On Aug 14, 2020, at 4:44 PM, Alex Amato  >> >> <mailto:ajam...@google.com>> wrote:
> >> >>
> >> >> One of the plans to use the histogram data is to send it to Google 
> >> >> Monitoring to compute estimates of percentiles. This is done using the 
> >> >> bucket counts and bucket boundaries.
> >> >>
> >> >> Here is a describing of roughly how its calculated.
> >> >> https://stackoverflow.com/questions/59635115/gcp-console-how-are-percentile-charts-calculated
> >> >>  
> >> >> <https://stackoverflow.com/questions/59635115/gcp-console-how-are-percentile-charts-calculated>
> >> >> This is a non exact estimate. But plotting the estimated percentiles 
> >> >> over time is often easier to understand and sufficient.
> >> >> (An alternative is a heatmap chart representing histograms over time. 
> >> >> I.e. a histogram for each window of time).
> >> >>
> >> >>
> >> >> On Fri, Aug 14, 2020 at 4:16 PM Robert Bradshaw  >> >> <mailto:rober...@google.com>> wrote:
> >> >>>
> >> >>> You may be interested in the propose histogram metrics:
> >> >>> https://docs.google.com/document/d/1kiNG2BAR-51pRdBCK4-XFmc0WuIkSuB

Re: Percentile metrics in Beam

2020-08-14 Thread Ke Wu
Thank you Robert and Alex. I am not running a Beam job in Google Cloud but with 
Samza Runner, so I am wondering if there is any ETA to add the Histogram 
metrics in Metrics class so it can be mapped to the SamzaHistogram 
<http://samza.apache.org/learn/documentation/versioned/api/javadocs/org/apache/samza/metrics/SamzaHistogram.html>
 metric to the actual emitting. 

Best,
Ke

> On Aug 14, 2020, at 4:44 PM, Alex Amato  wrote:
> 
> One of the plans to use the histogram data is to send it to Google Monitoring 
> to compute estimates of percentiles. This is done using the bucket counts and 
> bucket boundaries.
> 
> Here is a describing of roughly how its calculated.
> https://stackoverflow.com/questions/59635115/gcp-console-how-are-percentile-charts-calculated
>  
> <https://stackoverflow.com/questions/59635115/gcp-console-how-are-percentile-charts-calculated>
> This is a non exact estimate. But plotting the estimated percentiles over 
> time is often easier to understand and sufficient.
> (An alternative is a heatmap chart representing histograms over time. I.e. a 
> histogram for each window of time).
> 
> 
> On Fri, Aug 14, 2020 at 4:16 PM Robert Bradshaw  <mailto:rober...@google.com>> wrote:
> You may be interested in the propose histogram metrics:
> https://docs.google.com/document/d/1kiNG2BAR-51pRdBCK4-XFmc0WuIkSuBzeb__Zv8owbU/edit
>  
> <https://docs.google.com/document/d/1kiNG2BAR-51pRdBCK4-XFmc0WuIkSuBzeb__Zv8owbU/edit>
> 
> I think it'd be reasonable to add percentiles as its own metric type
> as well. The tricky bit (though there are lots of resources on this)
> is that one would have to publish more than just the percentiles from
> each worker to be able to compute the final percentiles across all
> workers.
> 
> On Fri, Aug 14, 2020 at 4:05 PM Ke Wu  <mailto:ke.wu...@gmail.com>> wrote:
> >
> > Hi everyone,
> >
> > I am looking to add percentile metrics (p50, p90 etc) to my beam job but I 
> > only find Counter, Gauge and Distribution metrics. I understand that I can 
> > calculate percentile metrics in my job itself and use Gauge to emit, 
> > however this is not an easy approach. On the other hand, Distribution 
> > metrics sounds like the one to go to according to its documentation: "A 
> > metric that reports information about the distribution of reported 
> > values.”, however it seems that it is intended for SUM, COUNT, MIN, MAX.
> >
> > The question(s) are:
> >
> > 1. is Distribution metric only intended for sum, count, min, max?
> > 2. If Yes, can the documentation be updated to be more specific?
> > 3. Can we add percentiles metric support, such as Histogram, with 
> > configurable list of percentiles to emit?
> >
> > Best,
> > Ke



Percentile metrics in Beam

2020-08-14 Thread Ke Wu
Hi everyone,

I am looking to add percentile metrics (p50, p90 etc) to my beam job but I only 
find Counter 
,
 Gauge 

 and Distribution 

 metrics. I understand that I can calculate percentile metrics in my job itself 
and use Gauge to emit, however this is not an easy approach. On the other hand, 
Distribution metrics sounds like the one to go to according to its 
documentation: "A metric that reports information about the distribution of 
reported values.”, however it seems that it is intended for SUM, COUNT, MIN, 
MAX. 

The question(s) are:

1. is Distribution metric only intended for sum, count, min, max? 
2. If Yes, can the documentation be updated to be more specific?
3. Can we add percentiles metric support, such as Histogram, with configurable 
list of percentiles to emit?

Best,
Ke