Re: Portable Java Pipeline Support

2021-04-26 Thread Kyle Weaver
> For Samza Runner, we are looking to leverage java portable mode to
achieve “split deployment” where runner is independently packaged w/o user
code and user code should only exist in the submission/worker process. I
believe this is supported by portable mode and therefore we would prefer to
use LOOPBACK (for testing) and DOCKER (for production) mode.

Makes sense.

> Is there a way to get BEAM-12227
 prioritized or the
fastest way is to patch it ourselves?

Probably to patch it yourselves. I'd be happy to provide a review if you
need it though.

On Mon, Apr 26, 2021 at 10:47 AM Ke Wu  wrote:

> That makes sense.
>
> For Samza Runner, we are looking to leverage java portable mode to achieve
> “split deployment” where runner is independently packaged w/o user code and
> user code should only exist in the submission/worker process. I believe
> this is supported by portable mode and therefore we would prefer to use
> LOOPBACK (for testing) and DOCKER (for production) mode.
>
> Is there a way to get BEAM-12227
>  prioritized or the
> fastest way is to patch it ourselves?
>
> Best,
> Ke
>
>
> On Apr 26, 2021, at 10:17 AM, Kyle Weaver  wrote:
>
> The reason is the Flink and Spark runners are written in Java. So when the
> runner needs to execute user code written in Java, an EMBEDDED environment
> can be started in the runner. Whereas the runner cannot natively execute
> Python code, so it needs to call out to an external process. In the case of
> LOOPBACK, that external process is started by the Python client process
> that submitted the job in the first place.
>
> On Mon, Apr 26, 2021 at 9:57 AM Ke Wu  wrote:
>
>> Thank you Kyle, I have created BEAM-12227
>>  to track the
>> unimplemented exception.
>>
>> Is there any specific reason that Java tests are using EMBEDDED mode
>> while python usually in LOOPBACK mode?
>>
>> Best,
>> Ke
>>
>> On Apr 23, 2021, at 4:01 PM, Kyle Weaver  wrote:
>>
>> I couldn't find any existing ticket for this issue (you may be the first
>> to discover it). Feel free to create one with your findings. (FWIW I did
>> find a ticket for documenting portable Java pipelines [1]).
>>
>> For the Flink and Spark runners, we run most of our Java tests using
>> EMBEDDED mode. For portable Samza, you will likely want to use a similar
>> setup [2].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11062
>> [2]
>> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>>
>> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu  wrote:
>>
>>> Thank you, Kyle, for the detailed answer.
>>>
>>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be
>>> essential, especially for local testing as Samza Runner adopts portable
>>> mode and we are intended to run it with Java pipeline a lot.
>>>
>>> In addition, I noticed that this issue does not happen every time
>>> LOOPBACK is used, for example:
>>>
>>> Pipeline p = Pipeline.create(options);
>>>
>>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), 
>>> KV.of("2", 3L), KV.of("3", 9L)))
>>> .apply(Sum.longsPerKey())
>>> .apply(MapElements.via(new PrintFn()));
>>>
>>> p.run().waitUntilFinish();
>>>
>>> Where PrintFn simply prints the result:
>>>
>>> public static class PrintFn extends SimpleFunction, 
>>> String> {
>>>   @Override
>>>   public String apply(KV input) {
>>> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>>> return input.getKey() + ": " + input.getValue();
>>>   }
>>> }
>>>
>>>
>>> This simple pipeline did work in Java LOOPBACK mode.
>>>
>>> Best,
>>> Ke
>>>
>>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
>>>
>>> Yes, we can expect to run java pipelines in portable mode. I'm guessing
>>> the method unimplemented exception is a bug, and we haven't caught it
>>> because (as far as I know) we don't test the Java loopback worker.
>>>
>>> As an alternative, you can try building the Java docker environment with
>>> "./gradlew :sdks:java:container:java8:docker" and then use
>>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
>>> be able to access the host filesystem [1].
>>>
>>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
>>> embedded environment assumes the dependencies are already present on the
>>> runner, which will not be the case unless you modify the job server to
>>> depend on the examples module.
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-5440
>>>
>>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  wrote:
>>>
 Hi All,

 I am working on add portability support for Samza Runner and having
 been playing around on the support in Flink and Spark runner recently.

 One thing I noticed is the lack of documentation on how to run a java
 pipeline in a portable mode. Almost all document 

Re: Portable Java Pipeline Support

2021-04-26 Thread Ke Wu
That makes sense. 

For Samza Runner, we are looking to leverage java portable mode to achieve 
“split deployment” where runner is independently packaged w/o user code and 
user code should only exist in the submission/worker process. I believe this is 
supported by portable mode and therefore we would prefer to use LOOPBACK (for 
testing) and DOCKER (for production) mode.

Is there a way to get BEAM-12227 
 prioritized or the fastest 
way is to patch it ourselves?

Best,
Ke


> On Apr 26, 2021, at 10:17 AM, Kyle Weaver  wrote:
> 
> The reason is the Flink and Spark runners are written in Java. So when the 
> runner needs to execute user code written in Java, an EMBEDDED environment 
> can be started in the runner. Whereas the runner cannot natively execute 
> Python code, so it needs to call out to an external process. In the case of 
> LOOPBACK, that external process is started by the Python client process that 
> submitted the job in the first place.
> 
> On Mon, Apr 26, 2021 at 9:57 AM Ke Wu  > wrote:
> Thank you Kyle, I have created BEAM-12227 
>  to track the unimplemented 
> exception.
> 
> Is there any specific reason that Java tests are using EMBEDDED mode while 
> python usually in LOOPBACK mode?
> 
> Best,
> Ke
> 
>> On Apr 23, 2021, at 4:01 PM, Kyle Weaver > > wrote:
>> 
>> I couldn't find any existing ticket for this issue (you may be the first to 
>> discover it). Feel free to create one with your findings. (FWIW I did find a 
>> ticket for documenting portable Java pipelines [1]).
>> 
>> For the Flink and Spark runners, we run most of our Java tests using 
>> EMBEDDED mode. For portable Samza, you will likely want to use a similar 
>> setup [2].
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-11062 
>> 
>> [2] 
>> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>>  
>> 
>> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu > > wrote:
>> Thank you, Kyle, for the detailed answer. 
>> 
>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
>> essential, especially for local testing as Samza Runner adopts portable mode 
>> and we are intended to run it with Java pipeline a lot.
>> 
>> In addition, I noticed that this issue does not happen every time LOOPBACK 
>> is used, for example:
>> Pipeline p = Pipeline.create(options);
>> 
>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
>> 3L), KV.of("3", 9L)))
>> .apply(Sum.longsPerKey())
>> .apply(MapElements.via(new PrintFn()));
>> 
>> p.run().waitUntilFinish();
>> Where PrintFn simply prints the result:
>> public static class PrintFn extends SimpleFunction, String> 
>> {
>>   @Override
>>   public String apply(KV input) {
>> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>> return input.getKey() + ": " + input.getValue();
>>   }
>> }
>> 
>> This simple pipeline did work in Java LOOPBACK mode.
>> 
>> Best,
>> Ke
>> 
>>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver >> > wrote:
>>> 
>>> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
>>> method unimplemented exception is a bug, and we haven't caught it because 
>>> (as far as I know) we don't test the Java loopback worker.
>>> 
>>> As an alternative, you can try building the Java docker environment with 
>>> "./gradlew :sdks:java:container:java8:docker" and then use 
>>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
>>> be able to access the host filesystem [1].
>>> 
>>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the 
>>> embedded environment assumes the dependencies are already present on the 
>>> runner, which will not be the case unless you modify the job server to 
>>> depend on the examples module.
>>> 
>>> [1] https://issues.apache.org/jira/browse/BEAM-5440 
>>> 
>>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu >> > wrote:
>>> Hi All,
>>> 
>>> I am working on add portability support for Samza Runner and having been 
>>> playing around on the support in Flink and Spark runner recently. 
>>> 
>>> One thing I noticed is the lack of documentation on how to run a java 
>>> pipeline in a portable mode. Almost all document focuses on how to run a 
>>> python pipeline, which is understandable. I believe a java pipeline can be 
>>> executed in portable mode as well so I did some experiments but results are 
>>> not expected and would like to know if they are expected:
>>> 
>>> 
>>> 1. Add portability module to example 

Re: Portable Java Pipeline Support

2021-04-26 Thread Kyle Weaver
The reason is the Flink and Spark runners are written in Java. So when the
runner needs to execute user code written in Java, an EMBEDDED environment
can be started in the runner. Whereas the runner cannot natively execute
Python code, so it needs to call out to an external process. In the case of
LOOPBACK, that external process is started by the Python client process
that submitted the job in the first place.

On Mon, Apr 26, 2021 at 9:57 AM Ke Wu  wrote:

> Thank you Kyle, I have created BEAM-12227
>  to track the
> unimplemented exception.
>
> Is there any specific reason that Java tests are using EMBEDDED mode while
> python usually in LOOPBACK mode?
>
> Best,
> Ke
>
> On Apr 23, 2021, at 4:01 PM, Kyle Weaver  wrote:
>
> I couldn't find any existing ticket for this issue (you may be the first
> to discover it). Feel free to create one with your findings. (FWIW I did
> find a ticket for documenting portable Java pipelines [1]).
>
> For the Flink and Spark runners, we run most of our Java tests using
> EMBEDDED mode. For portable Samza, you will likely want to use a similar
> setup [2].
>
> [1] https://issues.apache.org/jira/browse/BEAM-11062
> [2]
> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>
> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu  wrote:
>
>> Thank you, Kyle, for the detailed answer.
>>
>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be
>> essential, especially for local testing as Samza Runner adopts portable
>> mode and we are intended to run it with Java pipeline a lot.
>>
>> In addition, I noticed that this issue does not happen every time
>> LOOPBACK is used, for example:
>>
>> Pipeline p = Pipeline.create(options);
>>
>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
>> 3L), KV.of("3", 9L)))
>> .apply(Sum.longsPerKey())
>> .apply(MapElements.via(new PrintFn()));
>>
>> p.run().waitUntilFinish();
>>
>> Where PrintFn simply prints the result:
>>
>> public static class PrintFn extends SimpleFunction, String> 
>> {
>>   @Override
>>   public String apply(KV input) {
>> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>> return input.getKey() + ": " + input.getValue();
>>   }
>> }
>>
>>
>> This simple pipeline did work in Java LOOPBACK mode.
>>
>> Best,
>> Ke
>>
>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
>>
>> Yes, we can expect to run java pipelines in portable mode. I'm guessing
>> the method unimplemented exception is a bug, and we haven't caught it
>> because (as far as I know) we don't test the Java loopback worker.
>>
>> As an alternative, you can try building the Java docker environment with
>> "./gradlew :sdks:java:container:java8:docker" and then use
>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
>> be able to access the host filesystem [1].
>>
>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
>> embedded environment assumes the dependencies are already present on the
>> runner, which will not be the case unless you modify the job server to
>> depend on the examples module.
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-5440
>>
>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  wrote:
>>
>>> Hi All,
>>>
>>> I am working on add portability support for Samza Runner and having been
>>> playing around on the support in Flink and Spark runner recently.
>>>
>>> One thing I noticed is the lack of documentation on how to run a java
>>> pipeline in a portable mode. Almost all document focuses on how to run a
>>> python pipeline, which is understandable. I believe a java pipeline can be
>>> executed in portable mode as well so I did some experiments but results are
>>> not expected and would like to know if they are expected:
>>>
>>>
>>> 1. Add portability module to example so PipelineOptionsFactory can
>>> recognize PortableRunner:
>>>
>>> $ git diff
>>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>>> index 62f15ec24b..c9069d3f4f 100644
>>> --- a/examples/java/build.gradle
>>> +++ b/examples/java/build.gradle
>>> @@ -59,6 +59,7 @@ dependencies {
>>>compile project(":sdks:java:extensions:google-cloud-platform-core")
>>>compile project(":sdks:java:io:google-cloud-platform")
>>>compile project(":sdks:java:io:kafka")
>>> +  compile project(":runners:portability:java")
>>>compile project(":sdks:java:extensions:ml")
>>>compile library.java.avro
>>>compile library.java.bigdataoss_util
>>>
>>>
>>> 2. Bring up the Job Server:
>>>
>>> Spark: ./gradlew :runners:spark:3:job-server:runShadow
>>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>>>
>>> 3. Execute WordCount example:
>>>
>>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
>>> -Dexec.args="--inputFile=README.md --output=/tmp/output
>>> --runner=PortableRunner --jobEndpoint=localhost:8099
>>> 

Re: Portable Java Pipeline Support

2021-04-26 Thread Ke Wu
Thank you Kyle, I have created BEAM-12227 
 to track the unimplemented 
exception.

Is there any specific reason that Java tests are using EMBEDDED mode while 
python usually in LOOPBACK mode?

Best,
Ke

> On Apr 23, 2021, at 4:01 PM, Kyle Weaver  wrote:
> 
> I couldn't find any existing ticket for this issue (you may be the first to 
> discover it). Feel free to create one with your findings. (FWIW I did find a 
> ticket for documenting portable Java pipelines [1]).
> 
> For the Flink and Spark runners, we run most of our Java tests using EMBEDDED 
> mode. For portable Samza, you will likely want to use a similar setup [2].
> 
> [1] https://issues.apache.org/jira/browse/BEAM-11062 
> 
> [2] 
> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>  
> 
> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu  > wrote:
> Thank you, Kyle, for the detailed answer. 
> 
> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
> essential, especially for local testing as Samza Runner adopts portable mode 
> and we are intended to run it with Java pipeline a lot.
> 
> In addition, I noticed that this issue does not happen every time LOOPBACK is 
> used, for example:
> Pipeline p = Pipeline.create(options);
> 
> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
> 3L), KV.of("3", 9L)))
> .apply(Sum.longsPerKey())
> .apply(MapElements.via(new PrintFn()));
> 
> p.run().waitUntilFinish();
> Where PrintFn simply prints the result:
> public static class PrintFn extends SimpleFunction, String> {
>   @Override
>   public String apply(KV input) {
> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
> return input.getKey() + ": " + input.getValue();
>   }
> }
> 
> This simple pipeline did work in Java LOOPBACK mode.
> 
> Best,
> Ke
> 
>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver > > wrote:
>> 
>> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
>> method unimplemented exception is a bug, and we haven't caught it because 
>> (as far as I know) we don't test the Java loopback worker.
>> 
>> As an alternative, you can try building the Java docker environment with 
>> "./gradlew :sdks:java:container:java8:docker" and then use 
>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
>> be able to access the host filesystem [1].
>> 
>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
>> environment assumes the dependencies are already present on the runner, 
>> which will not be the case unless you modify the job server to depend on the 
>> examples module.
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-5440 
>> 
>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu > > wrote:
>> Hi All,
>> 
>> I am working on add portability support for Samza Runner and having been 
>> playing around on the support in Flink and Spark runner recently. 
>> 
>> One thing I noticed is the lack of documentation on how to run a java 
>> pipeline in a portable mode. Almost all document focuses on how to run a 
>> python pipeline, which is understandable. I believe a java pipeline can be 
>> executed in portable mode as well so I did some experiments but results are 
>> not expected and would like to know if they are expected:
>> 
>> 
>> 1. Add portability module to example so PipelineOptionsFactory can recognize 
>> PortableRunner:
>> $ git diff
>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>> index 62f15ec24b..c9069d3f4f 100644
>> --- a/examples/java/build.gradle
>> +++ b/examples/java/build.gradle
>> @@ -59,6 +59,7 @@ dependencies {
>>compile project(":sdks:java:extensions:google-cloud-platform-core")
>>compile project(":sdks:java:io:google-cloud-platform")
>>compile project(":sdks:java:io:kafka")
>> +  compile project(":runners:portability:java")
>>compile project(":sdks:java:extensions:ml")
>>compile library.java.avro
>>compile library.java.bigdataoss_util
>> 
>> 2. Bring up the Job Server: 
>> 
>>  Spark: ./gradlew :runners:spark:3:job-server:runShadow
>>  Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>> 
>> 3. Execute WordCount example:
>> 
>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
>> -Dexec.args="--inputFile=README.md --output=/tmp/output 
>> --runner=PortableRunner --jobEndpoint=localhost:8099 
>> --defaultEnvironmentType=LOOPBACK"
>> 
>> 
>> Neither Flink or Spark runner worked for WordCount because of 
>> 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
>> 

Re: Portable Java Pipeline Support

2021-04-23 Thread Kyle Weaver
I couldn't find any existing ticket for this issue (you may be the first to
discover it). Feel free to create one with your findings. (FWIW I did find
a ticket for documenting portable Java pipelines [1]).

For the Flink and Spark runners, we run most of our Java tests using
EMBEDDED mode. For portable Samza, you will likely want to use a similar
setup [2].

[1] https://issues.apache.org/jira/browse/BEAM-11062
[2]
https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186

On Fri, Apr 23, 2021 at 3:25 PM Ke Wu  wrote:

> Thank you, Kyle, for the detailed answer.
>
> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be
> essential, especially for local testing as Samza Runner adopts portable
> mode and we are intended to run it with Java pipeline a lot.
>
> In addition, I noticed that this issue does not happen every time LOOPBACK
> is used, for example:
>
> Pipeline p = Pipeline.create(options);
>
> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
> 3L), KV.of("3", 9L)))
> .apply(Sum.longsPerKey())
> .apply(MapElements.via(new PrintFn()));
>
> p.run().waitUntilFinish();
>
> Where PrintFn simply prints the result:
>
> public static class PrintFn extends SimpleFunction, String> {
>   @Override
>   public String apply(KV input) {
> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
> return input.getKey() + ": " + input.getValue();
>   }
> }
>
>
> This simple pipeline did work in Java LOOPBACK mode.
>
> Best,
> Ke
>
> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
>
> Yes, we can expect to run java pipelines in portable mode. I'm guessing
> the method unimplemented exception is a bug, and we haven't caught it
> because (as far as I know) we don't test the Java loopback worker.
>
> As an alternative, you can try building the Java docker environment with
> "./gradlew :sdks:java:container:java8:docker" and then use
> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
> be able to access the host filesystem [1].
>
> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
> embedded environment assumes the dependencies are already present on the
> runner, which will not be the case unless you modify the job server to
> depend on the examples module.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5440
>
> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  wrote:
>
>> Hi All,
>>
>> I am working on add portability support for Samza Runner and having been
>> playing around on the support in Flink and Spark runner recently.
>>
>> One thing I noticed is the lack of documentation on how to run a java
>> pipeline in a portable mode. Almost all document focuses on how to run a
>> python pipeline, which is understandable. I believe a java pipeline can be
>> executed in portable mode as well so I did some experiments but results are
>> not expected and would like to know if they are expected:
>>
>>
>> 1. Add portability module to example so PipelineOptionsFactory can
>> recognize PortableRunner:
>>
>> $ git diff
>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>> index 62f15ec24b..c9069d3f4f 100644
>> --- a/examples/java/build.gradle
>> +++ b/examples/java/build.gradle
>> @@ -59,6 +59,7 @@ dependencies {
>>compile project(":sdks:java:extensions:google-cloud-platform-core")
>>compile project(":sdks:java:io:google-cloud-platform")
>>compile project(":sdks:java:io:kafka")
>> +  compile project(":runners:portability:java")
>>compile project(":sdks:java:extensions:ml")
>>compile library.java.avro
>>compile library.java.bigdataoss_util
>>
>>
>> 2. Bring up the Job Server:
>>
>> Spark: ./gradlew :runners:spark:3:job-server:runShadow
>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>>
>> 3. Execute WordCount example:
>>
>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
>> -Dexec.args="--inputFile=README.md --output=/tmp/output
>> --runner=PortableRunner --jobEndpoint=localhost:8099
>> --defaultEnvironmentType=LOOPBACK"
>>
>>
>>
>> Neither Flink or Spark runner worked for WordCount because of
>>
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
>> UNIMPLEMENTED: Method
>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
>> is unimplemented
>> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>> at
>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>> at
>> 

Re: Portable Java Pipeline Support

2021-04-23 Thread Ke Wu
Thank you, Kyle, for the detailed answer. 

Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
essential, especially for local testing as Samza Runner adopts portable mode 
and we are intended to run it with Java pipeline a lot.

In addition, I noticed that this issue does not happen every time LOOPBACK is 
used, for example:
Pipeline p = Pipeline.create(options);

p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
3L), KV.of("3", 9L)))
.apply(Sum.longsPerKey())
.apply(MapElements.via(new PrintFn()));

p.run().waitUntilFinish();
Where PrintFn simply prints the result:
public static class PrintFn extends SimpleFunction, String> {
  @Override
  public String apply(KV input) {
LOG.info("Key {}: value {}", input.getKey(), input.getValue());
return input.getKey() + ": " + input.getValue();
  }
}

This simple pipeline did work in Java LOOPBACK mode.

Best,
Ke

> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
> 
> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
> method unimplemented exception is a bug, and we haven't caught it because (as 
> far as I know) we don't test the Java loopback worker.
> 
> As an alternative, you can try building the Java docker environment with 
> "./gradlew :sdks:java:container:java8:docker" and then use 
> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
> be able to access the host filesystem [1].
> 
> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
> environment assumes the dependencies are already present on the runner, which 
> will not be the case unless you modify the job server to depend on the 
> examples module.
> 
> [1] https://issues.apache.org/jira/browse/BEAM-5440 
> 
> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  > wrote:
> Hi All,
> 
> I am working on add portability support for Samza Runner and having been 
> playing around on the support in Flink and Spark runner recently. 
> 
> One thing I noticed is the lack of documentation on how to run a java 
> pipeline in a portable mode. Almost all document focuses on how to run a 
> python pipeline, which is understandable. I believe a java pipeline can be 
> executed in portable mode as well so I did some experiments but results are 
> not expected and would like to know if they are expected:
> 
> 
> 1. Add portability module to example so PipelineOptionsFactory can recognize 
> PortableRunner:
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>compile project(":sdks:java:extensions:google-cloud-platform-core")
>compile project(":sdks:java:io:google-cloud-platform")
>compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>compile project(":sdks:java:extensions:ml")
>compile library.java.avro
>compile library.java.bigdataoss_util
> 
> 2. Bring up the Job Server: 
> 
>   Spark: ./gradlew :runners:spark:3:job-server:runShadow
>   Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
> 
> 3. Execute WordCount example:
> 
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=README.md --output=/tmp/output 
> --runner=PortableRunner --jobEndpoint=localhost:8099 
> --defaultEnvironmentType=LOOPBACK"
> 
> 
> Neither Flink or Spark runner worked for WordCount because of 
> 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
> UNIMPLEMENTED: Method 
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
> unimplemented
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
> at 
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
> at 
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
> at 
> 

Re: Portable Java Pipeline Support

2021-04-23 Thread Kyle Weaver
Yes, we can expect to run java pipelines in portable mode. I'm guessing the
method unimplemented exception is a bug, and we haven't caught it because
(as far as I know) we don't test the Java loopback worker.

As an alternative, you can try building the Java docker environment with
"./gradlew :sdks:java:container:java8:docker" and then use
"--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
be able to access the host filesystem [1].

Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
embedded environment assumes the dependencies are already present on the
runner, which will not be the case unless you modify the job server to
depend on the examples module.

[1] https://issues.apache.org/jira/browse/BEAM-5440

On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  wrote:

> Hi All,
>
> I am working on add portability support for Samza Runner and having been
> playing around on the support in Flink and Spark runner recently.
>
> One thing I noticed is the lack of documentation on how to run a java
> pipeline in a portable mode. Almost all document focuses on how to run a
> python pipeline, which is understandable. I believe a java pipeline can be
> executed in portable mode as well so I did some experiments but results are
> not expected and would like to know if they are expected:
>
>
> 1. Add portability module to example so PipelineOptionsFactory can
> recognize PortableRunner:
>
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>compile project(":sdks:java:extensions:google-cloud-platform-core")
>compile project(":sdks:java:io:google-cloud-platform")
>compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>compile project(":sdks:java:extensions:ml")
>compile library.java.avro
>compile library.java.bigdataoss_util
>
>
> 2. Bring up the Job Server:
>
> Spark: ./gradlew :runners:spark:3:job-server:runShadow
> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>
> 3. Execute WordCount example:
>
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
> -Dexec.args="--inputFile=README.md --output=/tmp/output
> --runner=PortableRunner --jobEndpoint=localhost:8099
> --defaultEnvironmentType=LOOPBACK"
>
>
>
> Neither Flink or Spark runner worked for WordCount because of
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
> is unimplemented
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
> at
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
> at
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
>