Thank you Kyle, I have created BEAM-12227 
<https://issues.apache.org/jira/browse/BEAM-12227> to track the unimplemented 
exception.

Is there any specific reason that Java tests are using EMBEDDED mode while 
python usually in LOOPBACK mode?

Best,
Ke

> On Apr 23, 2021, at 4:01 PM, Kyle Weaver <kcwea...@google.com> wrote:
> 
> I couldn't find any existing ticket for this issue (you may be the first to 
> discover it). Feel free to create one with your findings. (FWIW I did find a 
> ticket for documenting portable Java pipelines [1]).
> 
> For the Flink and Spark runners, we run most of our Java tests using EMBEDDED 
> mode. For portable Samza, you will likely want to use a similar setup [2].
> 
> [1] https://issues.apache.org/jira/browse/BEAM-11062 
> <https://issues.apache.org/jira/browse/BEAM-11062>
> [2] 
> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>  
> <https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186>
> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu <ke.wu...@gmail.com 
> <mailto:ke.wu...@gmail.com>> wrote:
> Thank you, Kyle, for the detailed answer. 
> 
> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
> essential, especially for local testing as Samza Runner adopts portable mode 
> and we are intended to run it with Java pipeline a lot.
> 
> In addition, I noticed that this issue does not happen every time LOOPBACK is 
> used, for example:
> Pipeline p = Pipeline.create(options);
> 
> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
> 3L), KV.of("3", 9L)))
>     .apply(Sum.longsPerKey())
>     .apply(MapElements.via(new PrintFn()));
> 
> p.run().waitUntilFinish();
> Where PrintFn simply prints the result:
> public static class PrintFn extends SimpleFunction<KV<String, Long>, String> {
>   @Override
>   public String apply(KV<String, Long> input) {
>     LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>     return input.getKey() + ": " + input.getValue();
>   }
> }
> 
> This simple pipeline did work in Java LOOPBACK mode.
> 
> Best,
> Ke
> 
>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver <kcwea...@google.com 
>> <mailto:kcwea...@google.com>> wrote:
>> 
>> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
>> method unimplemented exception is a bug, and we haven't caught it because 
>> (as far as I know) we don't test the Java loopback worker.
>> 
>> As an alternative, you can try building the Java docker environment with 
>> "./gradlew :sdks:java:container:java8:docker" and then use 
>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
>> be able to access the host filesystem [1].
>> 
>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
>> environment assumes the dependencies are already present on the runner, 
>> which will not be the case unless you modify the job server to depend on the 
>> examples module.
>> 
>> [1] https://issues.apache.org/jira/browse/BEAM-5440 
>> <https://issues.apache.org/jira/browse/BEAM-5440>
>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com 
>> <mailto:ke.wu...@gmail.com>> wrote:
>> Hi All,
>> 
>> I am working on add portability support for Samza Runner and having been 
>> playing around on the support in Flink and Spark runner recently. 
>> 
>> One thing I noticed is the lack of documentation on how to run a java 
>> pipeline in a portable mode. Almost all document focuses on how to run a 
>> python pipeline, which is understandable. I believe a java pipeline can be 
>> executed in portable mode as well so I did some experiments but results are 
>> not expected and would like to know if they are expected:
>> 
>> 
>> 1. Add portability module to example so PipelineOptionsFactory can recognize 
>> PortableRunner:
>> $ git diff
>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>> index 62f15ec24b..c9069d3f4f 100644
>> --- a/examples/java/build.gradle
>> +++ b/examples/java/build.gradle
>> @@ -59,6 +59,7 @@ dependencies {
>>    compile project(":sdks:java:extensions:google-cloud-platform-core")
>>    compile project(":sdks:java:io:google-cloud-platform")
>>    compile project(":sdks:java:io:kafka")
>> +  compile project(":runners:portability:java")
>>    compile project(":sdks:java:extensions:ml")
>>    compile library.java.avro
>>    compile library.java.bigdataoss_util
>> 
>> 2. Bring up the Job Server: 
>> 
>>      Spark: ./gradlew :runners:spark:3:job-server:runShadow
>>      Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>> 
>> 3. Execute WordCount example:
>> 
>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
>> -Dexec.args="--inputFile=README.md --output=/tmp/output 
>> --runner=PortableRunner --jobEndpoint=localhost:8099 
>> --defaultEnvironmentType=LOOPBACK"
>> 
>> 
>> Neither Flink or Spark runner worked for WordCount because of 
>> 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
>> UNIMPLEMENTED: Method 
>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
>> unimplemented
>>         at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>>         at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>>         at 
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>>         at 
>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>>         at 
>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
>>         at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
>>         at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
>>         at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
>>         at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
>>         at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
>>         at 
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
>>         at 
>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
>>         at 
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
>>         at 
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
>>         at 
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
>>         at 
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
>>         at 
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
>>         at 
>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
>> 
>> Here are couple of questions:
>> 
>> 1. Can we expect to run a java pipeline in portable mode?
>> 2. If Yes, is the above exception expected or did I do something wrong?
>> 2. Is there any pending work to make Java portability support on par with 
>> Python?
>> 
>> Best,
>> Ke
> 

Reply via email to