The reason is the Flink and Spark runners are written in Java. So when the
runner needs to execute user code written in Java, an EMBEDDED environment
can be started in the runner. Whereas the runner cannot natively execute
Python code, so it needs to call out to an external process. In the case of
LOOPBACK, that external process is started by the Python client process
that submitted the job in the first place.

On Mon, Apr 26, 2021 at 9:57 AM Ke Wu <ke.wu...@gmail.com> wrote:

> Thank you Kyle, I have created BEAM-12227
> <https://issues.apache.org/jira/browse/BEAM-12227> to track the
> unimplemented exception.
>
> Is there any specific reason that Java tests are using EMBEDDED mode while
> python usually in LOOPBACK mode?
>
> Best,
> Ke
>
> On Apr 23, 2021, at 4:01 PM, Kyle Weaver <kcwea...@google.com> wrote:
>
> I couldn't find any existing ticket for this issue (you may be the first
> to discover it). Feel free to create one with your findings. (FWIW I did
> find a ticket for documenting portable Java pipelines [1]).
>
> For the Flink and Spark runners, we run most of our Java tests using
> EMBEDDED mode. For portable Samza, you will likely want to use a similar
> setup [2].
>
> [1] https://issues.apache.org/jira/browse/BEAM-11062
> [2]
> https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186
>
> On Fri, Apr 23, 2021 at 3:25 PM Ke Wu <ke.wu...@gmail.com> wrote:
>
>> Thank you, Kyle, for the detailed answer.
>>
>> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be
>> essential, especially for local testing as Samza Runner adopts portable
>> mode and we are intended to run it with Java pipeline a lot.
>>
>> In addition, I noticed that this issue does not happen every time
>> LOOPBACK is used, for example:
>>
>> Pipeline p = Pipeline.create(options);
>>
>> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
>> 3L), KV.of("3", 9L)))
>>     .apply(Sum.longsPerKey())
>>     .apply(MapElements.via(new PrintFn()));
>>
>> p.run().waitUntilFinish();
>>
>> Where PrintFn simply prints the result:
>>
>> public static class PrintFn extends SimpleFunction<KV<String, Long>, String> 
>> {
>>   @Override
>>   public String apply(KV<String, Long> input) {
>>     LOG.info("Key {}: value {}", input.getKey(), input.getValue());
>>     return input.getKey() + ": " + input.getValue();
>>   }
>> }
>>
>>
>> This simple pipeline did work in Java LOOPBACK mode.
>>
>> Best,
>> Ke
>>
>> On Apr 23, 2021, at 1:16 PM, Kyle Weaver <kcwea...@google.com> wrote:
>>
>> Yes, we can expect to run java pipelines in portable mode. I'm guessing
>> the method unimplemented exception is a bug, and we haven't caught it
>> because (as far as I know) we don't test the Java loopback worker.
>>
>> As an alternative, you can try building the Java docker environment with
>> "./gradlew :sdks:java:container:java8:docker" and then use
>> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
>> be able to access the host filesystem [1].
>>
>> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
>> embedded environment assumes the dependencies are already present on the
>> runner, which will not be the case unless you modify the job server to
>> depend on the examples module.
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-5440
>>
>> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu <ke.wu...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am working on add portability support for Samza Runner and having been
>>> playing around on the support in Flink and Spark runner recently.
>>>
>>> One thing I noticed is the lack of documentation on how to run a java
>>> pipeline in a portable mode. Almost all document focuses on how to run a
>>> python pipeline, which is understandable. I believe a java pipeline can be
>>> executed in portable mode as well so I did some experiments but results are
>>> not expected and would like to know if they are expected:
>>>
>>>
>>> 1. Add portability module to example so PipelineOptionsFactory can
>>> recognize PortableRunner:
>>>
>>> $ git diff
>>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>>> index 62f15ec24b..c9069d3f4f 100644
>>> --- a/examples/java/build.gradle
>>> +++ b/examples/java/build.gradle
>>> @@ -59,6 +59,7 @@ dependencies {
>>>    compile project(":sdks:java:extensions:google-cloud-platform-core")
>>>    compile project(":sdks:java:io:google-cloud-platform")
>>>    compile project(":sdks:java:io:kafka")
>>> +  compile project(":runners:portability:java")
>>>    compile project(":sdks:java:extensions:ml")
>>>    compile library.java.avro
>>>    compile library.java.bigdataoss_util
>>>
>>>
>>> 2. Bring up the Job Server:
>>>
>>> Spark: ./gradlew :runners:spark:3:job-server:runShadow
>>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>>>
>>> 3. Execute WordCount example:
>>>
>>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
>>> -Dexec.args="--inputFile=README.md --output=/tmp/output
>>> --runner=PortableRunner --jobEndpoint=localhost:8099
>>> --defaultEnvironmentType=LOOPBACK"
>>>
>>>
>>>
>>> Neither Flink or Spark runner worked for WordCount because of
>>>
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
>>> UNIMPLEMENTED: Method
>>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
>>> is unimplemented
>>>         at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>>>         at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>>>         at
>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>>>         at
>>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>>>         at
>>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
>>>         at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
>>>         at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
>>>         at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
>>>         at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
>>>         at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
>>>         at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.access$300(ReferenceCountingExecutableStageContextFactory.java:48)
>>>         at
>>> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.close(ReferenceCountingExecutableStageContextFactory.java:208)
>>>
>>> Here are couple of questions:
>>>
>>> 1. Can we expect to run a java pipeline in portable mode?
>>> 2. If Yes, is the above exception expected or did I do something wrong?
>>> 2. Is there any pending work to make Java portability support on par
>>> with Python?
>>>
>>> Best,
>>> Ke
>>>
>>
>>
>

Reply via email to