Hi All,

I am working on add portability support for Samza Runner and having been 
playing around on the support in Flink and Spark runner recently. 

One thing I noticed is the lack of documentation on how to run a java pipeline 
in a portable mode. Almost all document focuses on how to run a python 
pipeline, which is understandable. I believe a java pipeline can be executed in 
portable mode as well so I did some experiments but results are not expected 
and would like to know if they are expected:


1. Add portability module to example so PipelineOptionsFactory can recognize 
PortableRunner:
$ git diff
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 62f15ec24b..c9069d3f4f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -59,6 +59,7 @@ dependencies {
   compile project(":sdks:java:extensions:google-cloud-platform-core")
   compile project(":sdks:java:io:google-cloud-platform")
   compile project(":sdks:java:io:kafka")
+  compile project(":runners:portability:java")
   compile project(":sdks:java:extensions:ml")
   compile library.java.avro
   compile library.java.bigdataoss_util

2. Bring up the Job Server: 

        Spark: ./gradlew :runners:spark:3:job-server:runShadow
        Flink: ./gradlew :runners:flink:1.12:job-server:runShadow

3. Execute WordCount example:

./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=README.md --output=/tmp/output --runner=PortableRunner 
--jobEndpoint=localhost:8099 --defaultEnvironmentType=LOOPBACK"


Neither Flink or Spark runner worked for WordCount because of 

org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
UNIMPLEMENTED: Method 
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
unimplemented
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
        at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
        at 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
        at 
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
        at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
        at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
        at 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
        at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
        at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
        at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
        at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
        at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
        at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)

Here are couple of questions:

1. Can we expect to run a java pipeline in portable mode?
2. If Yes, is the above exception expected or did I do something wrong?
2. Is there any pending work to make Java portability support on par with 
Python?

Best,
Ke

Reply via email to