Thank you, Kyle, for the detailed answer. 

Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
essential, especially for local testing as Samza Runner adopts portable mode 
and we are intended to run it with Java pipeline a lot.

In addition, I noticed that this issue does not happen every time LOOPBACK is 
used, for example:
Pipeline p = Pipeline.create(options);

p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
3L), KV.of("3", 9L)))
    .apply(Sum.longsPerKey())
    .apply(MapElements.via(new PrintFn()));

p.run().waitUntilFinish();
Where PrintFn simply prints the result:
public static class PrintFn extends SimpleFunction<KV<String, Long>, String> {
  @Override
  public String apply(KV<String, Long> input) {
    LOG.info("Key {}: value {}", input.getKey(), input.getValue());
    return input.getKey() + ": " + input.getValue();
  }
}

This simple pipeline did work in Java LOOPBACK mode.

Best,
Ke

> On Apr 23, 2021, at 1:16 PM, Kyle Weaver <kcwea...@google.com> wrote:
> 
> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
> method unimplemented exception is a bug, and we haven't caught it because (as 
> far as I know) we don't test the Java loopback worker.
> 
> As an alternative, you can try building the Java docker environment with 
> "./gradlew :sdks:java:container:java8:docker" and then use 
> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
> be able to access the host filesystem [1].
> 
> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
> environment assumes the dependencies are already present on the runner, which 
> will not be the case unless you modify the job server to depend on the 
> examples module.
> 
> [1] https://issues.apache.org/jira/browse/BEAM-5440 
> <https://issues.apache.org/jira/browse/BEAM-5440>
> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com 
> <mailto:ke.wu...@gmail.com>> wrote:
> Hi All,
> 
> I am working on add portability support for Samza Runner and having been 
> playing around on the support in Flink and Spark runner recently. 
> 
> One thing I noticed is the lack of documentation on how to run a java 
> pipeline in a portable mode. Almost all document focuses on how to run a 
> python pipeline, which is understandable. I believe a java pipeline can be 
> executed in portable mode as well so I did some experiments but results are 
> not expected and would like to know if they are expected:
> 
> 
> 1. Add portability module to example so PipelineOptionsFactory can recognize 
> PortableRunner:
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>    compile project(":sdks:java:extensions:google-cloud-platform-core")
>    compile project(":sdks:java:io:google-cloud-platform")
>    compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>    compile project(":sdks:java:extensions:ml")
>    compile library.java.avro
>    compile library.java.bigdataoss_util
> 
> 2. Bring up the Job Server: 
> 
>       Spark: ./gradlew :runners:spark:3:job-server:runShadow
>       Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
> 
> 3. Execute WordCount example:
> 
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=README.md --output=/tmp/output 
> --runner=PortableRunner --jobEndpoint=localhost:8099 
> --defaultEnvironmentType=LOOPBACK"
> 
> 
> Neither Flink or Spark runner worked for WordCount because of 
> 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
> UNIMPLEMENTED: Method 
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
> unimplemented
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>         at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>         at 
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>         at 
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
>         at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
>         at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
>         at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
>         at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
>         at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
>         at 
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
>         at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
>         at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
>         at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
>         at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
>         at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
>         at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
>         at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
> 
> Here are couple of questions:
> 
> 1. Can we expect to run a java pipeline in portable mode?
> 2. If Yes, is the above exception expected or did I do something wrong?
> 2. Is there any pending work to make Java portability support on par with 
> Python?
> 
> Best,
> Ke

Reply via email to