Yes, we can expect to run java pipelines in portable mode. I'm guessing the
method unimplemented exception is a bug, and we haven't caught it because
(as far as I know) we don't test the Java loopback worker.

As an alternative, you can try building the Java docker environment with
"./gradlew :sdks:java:container:java8:docker" and then use
"--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
be able to access the host filesystem [1].

Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
embedded environment assumes the dependencies are already present on the
runner, which will not be the case unless you modify the job server to
depend on the examples module.

[1] https://issues.apache.org/jira/browse/BEAM-5440

On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com> wrote:

> Hi All,
>
> I am working on add portability support for Samza Runner and having been
> playing around on the support in Flink and Spark runner recently.
>
> One thing I noticed is the lack of documentation on how to run a java
> pipeline in a portable mode. Almost all document focuses on how to run a
> python pipeline, which is understandable. I believe a java pipeline can be
> executed in portable mode as well so I did some experiments but results are
> not expected and would like to know if they are expected:
>
>
> 1. Add portability module to example so PipelineOptionsFactory can
> recognize PortableRunner:
>
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>    compile project(":sdks:java:extensions:google-cloud-platform-core")
>    compile project(":sdks:java:io:google-cloud-platform")
>    compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>    compile project(":sdks:java:extensions:ml")
>    compile library.java.avro
>    compile library.java.bigdataoss_util
>
>
> 2. Bring up the Job Server:
>
> Spark: ./gradlew :runners:spark:3:job-server:runShadow
> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>
> 3. Execute WordCount example:
>
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
> -Dexec.args="--inputFile=README.md --output=/tmp/output
> --runner=PortableRunner --jobEndpoint=localhost:8099
> --defaultEnvironmentType=LOOPBACK"
>
>
>
> Neither Flink or Spark runner worked for WordCount because of
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
> is unimplemented
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>         at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>         at
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>         at
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
>         at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
>         at
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
>         at
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
>
> Here are couple of questions:
>
> 1. Can we expect to run a java pipeline in portable mode?
> 2. If Yes, is the above exception expected or did I do something wrong?
> 2. Is there any pending work to make Java portability support on par with
> Python?
>
> Best,
> Ke
>

Reply via email to