That makes sense. For Samza Runner, we are looking to leverage java portable mode to achieve “split deployment” where runner is independently packaged w/o user code and user code should only exist in the submission/worker process. I believe this is supported by portable mode and therefore we would prefer to use LOOPBACK (for testing) and DOCKER (for production) mode.
Is there a way to get BEAM-12227 <https://issues.apache.org/jira/browse/BEAM-12227> prioritized or the fastest way is to patch it ourselves? Best, Ke > On Apr 26, 2021, at 10:17 AM, Kyle Weaver <kcwea...@google.com> wrote: > > The reason is the Flink and Spark runners are written in Java. So when the > runner needs to execute user code written in Java, an EMBEDDED environment > can be started in the runner. Whereas the runner cannot natively execute > Python code, so it needs to call out to an external process. In the case of > LOOPBACK, that external process is started by the Python client process that > submitted the job in the first place. > > On Mon, Apr 26, 2021 at 9:57 AM Ke Wu <ke.wu...@gmail.com > <mailto:ke.wu...@gmail.com>> wrote: > Thank you Kyle, I have created BEAM-12227 > <https://issues.apache.org/jira/browse/BEAM-12227> to track the unimplemented > exception. > > Is there any specific reason that Java tests are using EMBEDDED mode while > python usually in LOOPBACK mode? > > Best, > Ke > >> On Apr 23, 2021, at 4:01 PM, Kyle Weaver <kcwea...@google.com >> <mailto:kcwea...@google.com>> wrote: >> >> I couldn't find any existing ticket for this issue (you may be the first to >> discover it). Feel free to create one with your findings. (FWIW I did find a >> ticket for documenting portable Java pipelines [1]). >> >> For the Flink and Spark runners, we run most of our Java tests using >> EMBEDDED mode. For portable Samza, you will likely want to use a similar >> setup [2]. >> >> [1] https://issues.apache.org/jira/browse/BEAM-11062 >> <https://issues.apache.org/jira/browse/BEAM-11062> >> [2] >> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186 >> >> <https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186> >> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu <ke.wu...@gmail.com >> <mailto:ke.wu...@gmail.com>> wrote: >> Thank you, Kyle, for the detailed answer. >> >> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be >> essential, especially for local testing as Samza Runner adopts portable mode >> and we are intended to run it with Java pipeline a lot. >> >> In addition, I noticed that this issue does not happen every time LOOPBACK >> is used, for example: >> Pipeline p = Pipeline.create(options); >> >> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", >> 3L), KV.of("3", 9L))) >> .apply(Sum.longsPerKey()) >> .apply(MapElements.via(new PrintFn())); >> >> p.run().waitUntilFinish(); >> Where PrintFn simply prints the result: >> public static class PrintFn extends SimpleFunction<KV<String, Long>, String> >> { >> @Override >> public String apply(KV<String, Long> input) { >> LOG.info("Key {}: value {}", input.getKey(), input.getValue()); >> return input.getKey() + ": " + input.getValue(); >> } >> } >> >> This simple pipeline did work in Java LOOPBACK mode. >> >> Best, >> Ke >> >>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver <kcwea...@google.com >>> <mailto:kcwea...@google.com>> wrote: >>> >>> Yes, we can expect to run java pipelines in portable mode. I'm guessing the >>> method unimplemented exception is a bug, and we haven't caught it because >>> (as far as I know) we don't test the Java loopback worker. >>> >>> As an alternative, you can try building the Java docker environment with >>> "./gradlew :sdks:java:container:java8:docker" and then use >>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't >>> be able to access the host filesystem [1]. >>> >>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the >>> embedded environment assumes the dependencies are already present on the >>> runner, which will not be the case unless you modify the job server to >>> depend on the examples module. >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-5440 >>> <https://issues.apache.org/jira/browse/BEAM-5440> >>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com >>> <mailto:ke.wu...@gmail.com>> wrote: >>> Hi All, >>> >>> I am working on add portability support for Samza Runner and having been >>> playing around on the support in Flink and Spark runner recently. >>> >>> One thing I noticed is the lack of documentation on how to run a java >>> pipeline in a portable mode. Almost all document focuses on how to run a >>> python pipeline, which is understandable. I believe a java pipeline can be >>> executed in portable mode as well so I did some experiments but results are >>> not expected and would like to know if they are expected: >>> >>> >>> 1. Add portability module to example so PipelineOptionsFactory can >>> recognize PortableRunner: >>> $ git diff >>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle >>> index 62f15ec24b..c9069d3f4f 100644 >>> --- a/examples/java/build.gradle >>> +++ b/examples/java/build.gradle >>> @@ -59,6 +59,7 @@ dependencies { >>> compile project(":sdks:java:extensions:google-cloud-platform-core") >>> compile project(":sdks:java:io:google-cloud-platform") >>> compile project(":sdks:java:io:kafka") >>> + compile project(":runners:portability:java") >>> compile project(":sdks:java:extensions:ml") >>> compile library.java.avro >>> compile library.java.bigdataoss_util >>> >>> 2. Bring up the Job Server: >>> >>> Spark: ./gradlew :runners:spark:3:job-server:runShadow >>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow >>> >>> 3. Execute WordCount example: >>> >>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount >>> -Dexec.args="--inputFile=README.md --output=/tmp/output >>> --runner=PortableRunner --jobEndpoint=localhost:8099 >>> --defaultEnvironmentType=LOOPBACK" >>> >>> >>> Neither Flink or Spark runner worked for WordCount because of >>> >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: >>> UNIMPLEMENTED: Method >>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker >>> is unimplemented >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221) >>> at >>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140) >>> at >>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247) >>> at >>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270) >>> at >>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319) >>> at >>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43) >>> at >>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212) >>> at >>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188) >>> at >>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177) >>> at >>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136) >>> at >>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48) >>> at >>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208) >>> >>> Here are couple of questions: >>> >>> 1. Can we expect to run a java pipeline in portable mode? >>> 2. If Yes, is the above exception expected or did I do something wrong? >>> 2. Is there any pending work to make Java portability support on par with >>> Python? >>> >>> Best, >>> Ke >> >