Hi All, I am working on add portability support for Samza Runner and having been playing around on the support in Flink and Spark runner recently.
One thing I noticed is the lack of documentation on how to run a java pipeline in a portable mode. Almost all document focuses on how to run a python pipeline, which is understandable. I believe a java pipeline can be executed in portable mode as well so I did some experiments but results are not expected and would like to know if they are expected: 1. Add portability module to example so PipelineOptionsFactory can recognize PortableRunner: $ git diff diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 62f15ec24b..c9069d3f4f 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -59,6 +59,7 @@ dependencies { compile project(":sdks:java:extensions:google-cloud-platform-core") compile project(":sdks:java:io:google-cloud-platform") compile project(":sdks:java:io:kafka") + compile project(":runners:portability:java") compile project(":sdks:java:extensions:ml") compile library.java.avro compile library.java.bigdataoss_util 2. Bring up the Job Server: Spark: ./gradlew :runners:spark:3:job-server:runShadow Flink: ./gradlew :runners:flink:1.12:job-server:runShadow 3. Execute WordCount example: ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount -Dexec.args="--inputFile=README.md --output=/tmp/output --runner=PortableRunner --jobEndpoint=localhost:8099 --defaultEnvironmentType=LOOPBACK" Neither Flink or Spark runner worked for WordCount because of org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: UNIMPLEMENTED: Method org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is unimplemented at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140) at org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247) at org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319) at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48) at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208) Here are couple of questions: 1. Can we expect to run a java pipeline in portable mode? 2. If Yes, is the above exception expected or did I do something wrong? 2. Is there any pending work to make Java portability support on par with Python? Best, Ke