Re: Portable Java Pipeline Support

2021-04-23 Thread Kyle Weaver
I couldn't find any existing ticket for this issue (you may be the first to
discover it). Feel free to create one with your findings. (FWIW I did find
a ticket for documenting portable Java pipelines [1]).

For the Flink and Spark runners, we run most of our Java tests using
EMBEDDED mode. For portable Samza, you will likely want to use a similar
setup [2].

[1] https://issues.apache.org/jira/browse/BEAM-11062
[2]
https://github.com/apache/beam/blob/247915c66f6206249c31e6160b8b605a013d9f04/runners/spark/job-server/spark_job_server.gradle#L186

On Fri, Apr 23, 2021 at 3:25 PM Ke Wu  wrote:

> Thank you, Kyle, for the detailed answer.
>
> Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be
> essential, especially for local testing as Samza Runner adopts portable
> mode and we are intended to run it with Java pipeline a lot.
>
> In addition, I noticed that this issue does not happen every time LOOPBACK
> is used, for example:
>
> Pipeline p = Pipeline.create(options);
>
> p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
> 3L), KV.of("3", 9L)))
> .apply(Sum.longsPerKey())
> .apply(MapElements.via(new PrintFn()));
>
> p.run().waitUntilFinish();
>
> Where PrintFn simply prints the result:
>
> public static class PrintFn extends SimpleFunction, String> {
>   @Override
>   public String apply(KV input) {
> LOG.info("Key {}: value {}", input.getKey(), input.getValue());
> return input.getKey() + ": " + input.getValue();
>   }
> }
>
>
> This simple pipeline did work in Java LOOPBACK mode.
>
> Best,
> Ke
>
> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
>
> Yes, we can expect to run java pipelines in portable mode. I'm guessing
> the method unimplemented exception is a bug, and we haven't caught it
> because (as far as I know) we don't test the Java loopback worker.
>
> As an alternative, you can try building the Java docker environment with
> "./gradlew :sdks:java:container:java8:docker" and then use
> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
> be able to access the host filesystem [1].
>
> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
> embedded environment assumes the dependencies are already present on the
> runner, which will not be the case unless you modify the job server to
> depend on the examples module.
>
> [1] https://issues.apache.org/jira/browse/BEAM-5440
>
> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  wrote:
>
>> Hi All,
>>
>> I am working on add portability support for Samza Runner and having been
>> playing around on the support in Flink and Spark runner recently.
>>
>> One thing I noticed is the lack of documentation on how to run a java
>> pipeline in a portable mode. Almost all document focuses on how to run a
>> python pipeline, which is understandable. I believe a java pipeline can be
>> executed in portable mode as well so I did some experiments but results are
>> not expected and would like to know if they are expected:
>>
>>
>> 1. Add portability module to example so PipelineOptionsFactory can
>> recognize PortableRunner:
>>
>> $ git diff
>> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
>> index 62f15ec24b..c9069d3f4f 100644
>> --- a/examples/java/build.gradle
>> +++ b/examples/java/build.gradle
>> @@ -59,6 +59,7 @@ dependencies {
>>compile project(":sdks:java:extensions:google-cloud-platform-core")
>>compile project(":sdks:java:io:google-cloud-platform")
>>compile project(":sdks:java:io:kafka")
>> +  compile project(":runners:portability:java")
>>compile project(":sdks:java:extensions:ml")
>>compile library.java.avro
>>compile library.java.bigdataoss_util
>>
>>
>> 2. Bring up the Job Server:
>>
>> Spark: ./gradlew :runners:spark:3:job-server:runShadow
>> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>>
>> 3. Execute WordCount example:
>>
>> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
>> -Dexec.args="--inputFile=README.md --output=/tmp/output
>> --runner=PortableRunner --jobEndpoint=localhost:8099
>> --defaultEnvironmentType=LOOPBACK"
>>
>>
>>
>> Neither Flink or Spark runner worked for WordCount because of
>>
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
>> UNIMPLEMENTED: Method
>> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
>> is unimplemented
>> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
>> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
>> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
>> at
>> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
>> at
>> 

Flaky test issue report

2021-04-23 Thread Beam Jira Bot
This is your daily summary of Beam's current flaky tests. These are P1 issues 
because they have a major negative impact on the community and make it hard to 
determine the quality of the software.

BEAM-12200: SamzaStoreStateInternalsTest is flaky 
(https://issues.apache.org/jira/browse/BEAM-12200)
BEAM-12163: Python GHA PreCommits flake with grpc.FutureTimeoutError on SDK 
harness startup (https://issues.apache.org/jira/browse/BEAM-12163)
BEAM-12061: beam_PostCommit_SQL failing on 
KafkaTableProviderIT.testFakeNested 
(https://issues.apache.org/jira/browse/BEAM-12061)
BEAM-12020: :sdks:java:container:java8:docker failing missing licenses 
(https://issues.apache.org/jira/browse/BEAM-12020)
BEAM-12019: 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky (https://issues.apache.org/jira/browse/BEAM-12019)
BEAM-11792: Python precommit failed (flaked?) installing package  
(https://issues.apache.org/jira/browse/BEAM-11792)
BEAM-11733: [beam_PostCommit_Java] [testFhirIO_Import|export] flaky 
(https://issues.apache.org/jira/browse/BEAM-11733)
BEAM-11666: 
apache_beam.runners.interactive.recording_manager_test.RecordingManagerTest.test_basic_execution
 is flaky (https://issues.apache.org/jira/browse/BEAM-11666)
BEAM-11662: elasticsearch tests failing 
(https://issues.apache.org/jira/browse/BEAM-11662)
BEAM-11661: hdfsIntegrationTest flake: network not found (py38 postcommit) 
(https://issues.apache.org/jira/browse/BEAM-11661)
BEAM-11646: beam_PostCommit_XVR_Spark failing 
(https://issues.apache.org/jira/browse/BEAM-11646)
BEAM-11645: beam_PostCommit_XVR_Flink failing 
(https://issues.apache.org/jira/browse/BEAM-11645)
BEAM-11541: testTeardownCalledAfterExceptionInProcessElement flakes on 
direct runner. (https://issues.apache.org/jira/browse/BEAM-11541)
BEAM-11540: Linter sometimes flakes on apache_beam.dataframe.frames_test 
(https://issues.apache.org/jira/browse/BEAM-11540)
BEAM-11493: Spark test failure: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyAndWindows
 (https://issues.apache.org/jira/browse/BEAM-11493)
BEAM-11492: Spark test failure: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows
 (https://issues.apache.org/jira/browse/BEAM-11492)
BEAM-11491: Spark test failure: 
org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMultipleWindows
 (https://issues.apache.org/jira/browse/BEAM-11491)
BEAM-11490: Spark test failure: 
org.apache.beam.sdk.transforms.ReifyTimestampsTest.inValuesSucceeds 
(https://issues.apache.org/jira/browse/BEAM-11490)
BEAM-11489: Spark test failure: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedDistributionMetrics
 (https://issues.apache.org/jira/browse/BEAM-11489)
BEAM-11488: Spark test failure: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedCounterMetrics
 (https://issues.apache.org/jira/browse/BEAM-11488)
BEAM-11487: Spark test failure: 
org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsShouldApplyTimestamps
 (https://issues.apache.org/jira/browse/BEAM-11487)
BEAM-11486: Spark test failure: 
org.apache.beam.sdk.testing.PAssertTest.testSerializablePredicate 
(https://issues.apache.org/jira/browse/BEAM-11486)
BEAM-11485: Spark test failure: 
org.apache.beam.sdk.transforms.CombineFnsTest.testComposedCombineNullValues 
(https://issues.apache.org/jira/browse/BEAM-11485)
BEAM-11484: Spark test failure: 
org.apache.beam.runners.core.metrics.MetricsPusherTest.pushesUserMetrics 
(https://issues.apache.org/jira/browse/BEAM-11484)
BEAM-11483: Spark portable streaming PostCommit Test Improvements 
(https://issues.apache.org/jira/browse/BEAM-11483)
BEAM-10995: Java + Universal Local Runner: 
WindowingTest.testWindowPreservation fails 
(https://issues.apache.org/jira/browse/BEAM-10995)
BEAM-10987: stager_test.py::StagerTest::test_with_main_session flaky on 
windows py3.6,3.7 (https://issues.apache.org/jira/browse/BEAM-10987)
BEAM-10968: flaky test: 
org.apache.beam.sdk.metrics.MetricsTest$AttemptedMetricTests.testAttemptedDistributionMetrics
 (https://issues.apache.org/jira/browse/BEAM-10968)
BEAM-10955: Flink Java Runner test flake: Could not find Flink job  
(https://issues.apache.org/jira/browse/BEAM-10955)
BEAM-10923: Python requirements installation in docker container is flaky 
(https://issues.apache.org/jira/browse/BEAM-10923)
BEAM-10899: test_FhirIO_exportFhirResourcesGcs flake with OOM 
(https://issues.apache.org/jira/browse/BEAM-10899)
BEAM-10866: PortableRunnerTestWithSubprocesses.test_register_finalizations 
flaky on macOS (https://issues.apache.org/jira/browse/BEAM-10866)
BEAM-10763: Spotless flake (NullPointerException) 
(https://issues.apache.org/jira/browse/BEAM-10763)
BEAM-10590: BigQueryQueryToTableIT flaky: test_big_query_new_types 

P1 issues report

2021-04-23 Thread Beam Jira Bot
This is your daily summary of Beam's current P1 issues, not including flaky 
tests.

See https://beam.apache.org/contribute/jira-priorities/#p1-critical for the 
meaning and expectations around P1 issues.

BEAM-12205: Dataflow pipelines broken NoSuchMethodError 
DoFnInvoker.invokeSetup() (https://issues.apache.org/jira/browse/BEAM-12205)
BEAM-12195: Flink Runner 1.11 uses old Scala-Version 
(https://issues.apache.org/jira/browse/BEAM-12195)
BEAM-11959: Python Beam SDK Harness hangs when installing pip packages 
(https://issues.apache.org/jira/browse/BEAM-11959)
BEAM-11906: No trigger early repeatedly for session windows 
(https://issues.apache.org/jira/browse/BEAM-11906)
BEAM-11875: XmlIO.Read does not handle XML encoding per spec 
(https://issues.apache.org/jira/browse/BEAM-11875)
BEAM-11828: JmsIO is not acknowledging messages correctly 
(https://issues.apache.org/jira/browse/BEAM-11828)
BEAM-11755: Cross-language consistency (RequiresStableInputs) is quietly 
broken (at least on portable flink runner) 
(https://issues.apache.org/jira/browse/BEAM-11755)
BEAM-11578: `dataflow_metrics` (python) fails with TypeError (when int 
overflowing?) (https://issues.apache.org/jira/browse/BEAM-11578)
BEAM-11576: Go ValidatesRunner failure: TestFlattenDup on Dataflow Runner 
(https://issues.apache.org/jira/browse/BEAM-11576)
BEAM-11434: Expose Spanner admin/batch clients in Spanner Accessor 
(https://issues.apache.org/jira/browse/BEAM-11434)
BEAM-11227: Upgrade beam-vendor-grpc-1_26_0-0.3 to fix CVE-2020-27216 
(https://issues.apache.org/jira/browse/BEAM-11227)
BEAM-11148: Kafka commitOffsetsInFinalize OOM on Flink 
(https://issues.apache.org/jira/browse/BEAM-11148)
BEAM-11017: Timer with dataflow runner can be set multiple times (dataflow 
runner) (https://issues.apache.org/jira/browse/BEAM-11017)
BEAM-10861: Adds URNs and payloads to PubSub transforms 
(https://issues.apache.org/jira/browse/BEAM-10861)
BEAM-10617: python CombineGlobally().with_fanout() cause duplicate combine 
results for sliding windows (https://issues.apache.org/jira/browse/BEAM-10617)
BEAM-10573: CSV files are loaded several times if they are too large 
(https://issues.apache.org/jira/browse/BEAM-10573)
BEAM-10569: SpannerIO tests don't actually assert anything. 
(https://issues.apache.org/jira/browse/BEAM-10569)
BEAM-10288: Quickstart documents are out of date 
(https://issues.apache.org/jira/browse/BEAM-10288)
BEAM-10244: Populate requirements cache fails on poetry-based packages 
(https://issues.apache.org/jira/browse/BEAM-10244)
BEAM-10100: FileIO writeDynamic with AvroIO.sink not writing all data 
(https://issues.apache.org/jira/browse/BEAM-10100)
BEAM-9564: Remove insecure ssl options from MongoDBIO 
(https://issues.apache.org/jira/browse/BEAM-9564)
BEAM-9455: Environment-sensitive provisioning for Dataflow 
(https://issues.apache.org/jira/browse/BEAM-9455)
BEAM-9293: Python direct runner doesn't emit empty pane when it should 
(https://issues.apache.org/jira/browse/BEAM-9293)
BEAM-8986: SortValues may not work correct for numerical types 
(https://issues.apache.org/jira/browse/BEAM-8986)
BEAM-8985: SortValues should fail if SecondaryKey coder is not 
deterministic (https://issues.apache.org/jira/browse/BEAM-8985)
BEAM-8407: [SQL] Some Hive tests throw NullPointerException, but get marked 
as passing (Direct Runner) (https://issues.apache.org/jira/browse/BEAM-8407)
BEAM-7717: PubsubIO watermark tracking hovers near start of epoch 
(https://issues.apache.org/jira/browse/BEAM-7717)
BEAM-7716: PubsubIO returns empty message bodies for all messages read 
(https://issues.apache.org/jira/browse/BEAM-7716)
BEAM-7195: BigQuery - 404 errors for 'table not found' when using dynamic 
destinations - sometimes, new table fails to get created 
(https://issues.apache.org/jira/browse/BEAM-7195)
BEAM-6839: User reports protobuf ClassChangeError running against 2.6.0 or 
above (https://issues.apache.org/jira/browse/BEAM-6839)
BEAM-6466: KafkaIO doesn't commit offsets while being used as bounded 
source (https://issues.apache.org/jira/browse/BEAM-6466)


Re: Portable Java Pipeline Support

2021-04-23 Thread Ke Wu
Thank you, Kyle, for the detailed answer. 

Do we have a ticket track fix the LOOPBACK mode? LOOPBACK mode will be 
essential, especially for local testing as Samza Runner adopts portable mode 
and we are intended to run it with Java pipeline a lot.

In addition, I noticed that this issue does not happen every time LOOPBACK is 
used, for example:
Pipeline p = Pipeline.create(options);

p.apply(Create.of(KV.of("1", 1L), KV.of("1", 2L), KV.of("2", 2L), KV.of("2", 
3L), KV.of("3", 9L)))
.apply(Sum.longsPerKey())
.apply(MapElements.via(new PrintFn()));

p.run().waitUntilFinish();
Where PrintFn simply prints the result:
public static class PrintFn extends SimpleFunction, String> {
  @Override
  public String apply(KV input) {
LOG.info("Key {}: value {}", input.getKey(), input.getValue());
return input.getKey() + ": " + input.getValue();
  }
}

This simple pipeline did work in Java LOOPBACK mode.

Best,
Ke

> On Apr 23, 2021, at 1:16 PM, Kyle Weaver  wrote:
> 
> Yes, we can expect to run java pipelines in portable mode. I'm guessing the 
> method unimplemented exception is a bug, and we haven't caught it because (as 
> far as I know) we don't test the Java loopback worker.
> 
> As an alternative, you can try building the Java docker environment with 
> "./gradlew :sdks:java:container:java8:docker" and then use 
> "--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't 
> be able to access the host filesystem [1].
> 
> Another alternative is "--defaultEnvironmentType=EMBEDDED", but the embedded 
> environment assumes the dependencies are already present on the runner, which 
> will not be the case unless you modify the job server to depend on the 
> examples module.
> 
> [1] https://issues.apache.org/jira/browse/BEAM-5440 
> 
> On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  > wrote:
> Hi All,
> 
> I am working on add portability support for Samza Runner and having been 
> playing around on the support in Flink and Spark runner recently. 
> 
> One thing I noticed is the lack of documentation on how to run a java 
> pipeline in a portable mode. Almost all document focuses on how to run a 
> python pipeline, which is understandable. I believe a java pipeline can be 
> executed in portable mode as well so I did some experiments but results are 
> not expected and would like to know if they are expected:
> 
> 
> 1. Add portability module to example so PipelineOptionsFactory can recognize 
> PortableRunner:
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>compile project(":sdks:java:extensions:google-cloud-platform-core")
>compile project(":sdks:java:io:google-cloud-platform")
>compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>compile project(":sdks:java:extensions:ml")
>compile library.java.avro
>compile library.java.bigdataoss_util
> 
> 2. Bring up the Job Server: 
> 
>   Spark: ./gradlew :runners:spark:3:job-server:runShadow
>   Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
> 
> 3. Execute WordCount example:
> 
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
> -Dexec.args="--inputFile=README.md --output=/tmp/output 
> --runner=PortableRunner --jobEndpoint=localhost:8099 
> --defaultEnvironmentType=LOOPBACK"
> 
> 
> Neither Flink or Spark runner worked for WordCount because of 
> 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
> UNIMPLEMENTED: Method 
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
> unimplemented
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
> at 
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
> at 
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
> at 
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
> at 
> 

Re: Portable Java Pipeline Support

2021-04-23 Thread Kyle Weaver
Yes, we can expect to run java pipelines in portable mode. I'm guessing the
method unimplemented exception is a bug, and we haven't caught it because
(as far as I know) we don't test the Java loopback worker.

As an alternative, you can try building the Java docker environment with
"./gradlew :sdks:java:container:java8:docker" and then use
"--defaultEnvironmentType=DOCKER" in your pipeline. But note that you won't
be able to access the host filesystem [1].

Another alternative is "--defaultEnvironmentType=EMBEDDED", but the
embedded environment assumes the dependencies are already present on the
runner, which will not be the case unless you modify the job server to
depend on the examples module.

[1] https://issues.apache.org/jira/browse/BEAM-5440

On Fri, Apr 23, 2021 at 11:24 AM Ke Wu  wrote:

> Hi All,
>
> I am working on add portability support for Samza Runner and having been
> playing around on the support in Flink and Spark runner recently.
>
> One thing I noticed is the lack of documentation on how to run a java
> pipeline in a portable mode. Almost all document focuses on how to run a
> python pipeline, which is understandable. I believe a java pipeline can be
> executed in portable mode as well so I did some experiments but results are
> not expected and would like to know if they are expected:
>
>
> 1. Add portability module to example so PipelineOptionsFactory can
> recognize PortableRunner:
>
> $ git diff
> diff --git a/examples/java/build.gradle b/examples/java/build.gradle
> index 62f15ec24b..c9069d3f4f 100644
> --- a/examples/java/build.gradle
> +++ b/examples/java/build.gradle
> @@ -59,6 +59,7 @@ dependencies {
>compile project(":sdks:java:extensions:google-cloud-platform-core")
>compile project(":sdks:java:io:google-cloud-platform")
>compile project(":sdks:java:io:kafka")
> +  compile project(":runners:portability:java")
>compile project(":sdks:java:extensions:ml")
>compile library.java.avro
>compile library.java.bigdataoss_util
>
>
> 2. Bring up the Job Server:
>
> Spark: ./gradlew :runners:spark:3:job-server:runShadow
> Flink: ./gradlew :runners:flink:1.12:job-server:runShadow
>
> 3. Execute WordCount example:
>
> ./gradlew execute -DmainClass=org.apache.beam.examples.WordCount
> -Dexec.args="--inputFile=README.md --output=/tmp/output
> --runner=PortableRunner --jobEndpoint=localhost:8099
> --defaultEnvironmentType=LOOPBACK"
>
>
>
> Neither Flink or Spark runner worked for WordCount because of
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method
> org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker
> is unimplemented
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
> at
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
> at
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
> 

Portable Java Pipeline Support

2021-04-23 Thread Ke Wu
Hi All,

I am working on add portability support for Samza Runner and having been 
playing around on the support in Flink and Spark runner recently. 

One thing I noticed is the lack of documentation on how to run a java pipeline 
in a portable mode. Almost all document focuses on how to run a python 
pipeline, which is understandable. I believe a java pipeline can be executed in 
portable mode as well so I did some experiments but results are not expected 
and would like to know if they are expected:


1. Add portability module to example so PipelineOptionsFactory can recognize 
PortableRunner:
$ git diff
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 62f15ec24b..c9069d3f4f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -59,6 +59,7 @@ dependencies {
   compile project(":sdks:java:extensions:google-cloud-platform-core")
   compile project(":sdks:java:io:google-cloud-platform")
   compile project(":sdks:java:io:kafka")
+  compile project(":runners:portability:java")
   compile project(":sdks:java:extensions:ml")
   compile library.java.avro
   compile library.java.bigdataoss_util

2. Bring up the Job Server: 

Spark: ./gradlew :runners:spark:3:job-server:runShadow
Flink: ./gradlew :runners:flink:1.12:job-server:runShadow

3. Execute WordCount example:

./gradlew execute -DmainClass=org.apache.beam.examples.WordCount 
-Dexec.args="--inputFile=README.md --output=/tmp/output --runner=PortableRunner 
--jobEndpoint=localhost:8099 --defaultEnvironmentType=LOOPBACK"


Neither Flink or Spark runner worked for WordCount because of 

org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: 
UNIMPLEMENTED: Method 
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StopWorker is 
unimplemented
at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:240)
at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:221)
at 
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:140)
at 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.stopWorker(BeamFnExternalWorkerPoolGrpc.java:247)
at 
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory$1.close(ExternalEnvironmentFactory.java:159)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.$closeResource(DefaultJobBundleFactory.java:642)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.close(DefaultJobBundleFactory.java:642)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.unref(DefaultJobBundleFactory.java:658)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$WrappedSdkHarnessClient.access$400(DefaultJobBundleFactory.java:589)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.lambda$createEnvironmentCaches$3(DefaultJobBundleFactory.java:212)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.processPendingNotifications(LocalCache.java:1809)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.runUnlockedCleanup(LocalCache.java:3462)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.postWriteCleanup(LocalCache.java:3438)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.clear(LocalCache.java:3215)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.clear(LocalCache.java:4270)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalManualCache.invalidateAll(LocalCache.java:4909)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.close(DefaultJobBundleFactory.java:319)
at 
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.close(DefaultExecutableStageContext.java:43)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.closeActual(ReferenceCountingExecutableStageContextFactory.java:212)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.access$200(ReferenceCountingExecutableStageContextFactory.java:188)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.release(ReferenceCountingExecutableStageContextFactory.java:177)
at 
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory.scheduleRelease(ReferenceCountingExecutableStageContextFactory.java:136)
at 

Re: Contributor Permission for Beam on Jira

2021-04-23 Thread Kenneth Knowles
Welcome! Love the Jira handle.

Kenn

On Fri, Apr 23, 2021 at 8:28 AM Jenny Xu  wrote:

> Thanks Alexey!
>
> Weiwen
>
> On Fri, Apr 23, 2021 at 11:24 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> Done.
>>
>> Welcome to Beam, Weiwen!
>>
>> ---
>> Alexey
>>
>> On 23 Apr 2021, at 17:16, Jenny Xu  wrote:
>>
>> Hi,
>>
>> This is Weiwen Xu. Could someone please add me as a contributor for
>> Beam's Jira issue tracker? My Jira ID is JenX.
>>
>> Thanks,
>> Weiwen
>>
>>
>>


Re: Issues and PR names and descriptions (or should we change the contribution guide)

2021-04-23 Thread Alexey Romanenko


> On 22 Apr 2021, at 20:18, Robert Bradshaw  wrote:
> 
> On Thu, Apr 22, 2021 at 9:33 AM Kenneth Knowles  > wrote:
> Heuristic CI that says "this commit history looks OK" might solve a lot of 
> the problem (I see that Robert already started on this).
> 
> And finally I was to repeat my agreement with Ismaël and Alexey that the root 
> problem is this: we need to actually care about the commit history and 
> communication of PR/commit titles and descriptions. We use tools to help us 
> to implement our intentions and to communicate them to newcomers, but I don't 
> think this will replace taking care of the repo.
> 
> Committers should care about taking care of the repo more than the average 
> contributor, but even there there is high variance. I think the issue is "oh, 
> I didn't think to squash vs. merge" rather than "who cares, I always press 
> merge anyway" in which case a timely reminder will go a long way. 

+100 (sorry) and if we can additionally have something like a warning before 
merge it would be helpful too. 
I don’t think we really want to add more complexity to development process and 
“light” version with just a warning perhaps will be enough.

> 
> Kenn
> 
> [1] 
> https://lists.apache.org/thread.html/4a65fb0b66935c9dc61568a3067538775edc3e685c6ac03dd3fa4725%40%3Cdev.beam.apache.org%3E
>  
> 
>  
> 
> As for me, I’d prefer that every committer paid more attention (if not yet) 
> on these “non code” things before reviewing/merging a PR.
> 
> [1] https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py 
> 
> 
> 
>> On 22 Apr 2021, at 01:28, Robert Bradshaw > > wrote:
>> 
>> I am also in the camp that it often makes sense to have more than 1 commit 
>> per PR, but rather than enforce a 1 commit per PR policy, I would say that 
>> it's too much bother to look at the commit history whether it should be 
>> squashed or merged (though I think it is almost always very obvious which is 
>> preferable for a given PR), go ahead and squash rather than merge by 
>> default. 
>> 
>> 
>> On Wed, Apr 21, 2021 at 2:23 PM Kenneth Knowles > > wrote:
>> This seems to come up a lot. Maybe we should change something?
>> 
>> Having worked on a number of projects and at companies with this policy, 
>> companies using non-distributed source control, and companies that just "use 
>> git like git", I know all these ways of life pretty well.
>> 
>> TL;DR my experience is:
>>  - when people care about the commit history and take care of it, then just 
>> "use git like git" results in faster development and clearer history, 
>> despite intermediate commits not being tested by Jenkins/Travis/GHA
>>  - when people see git as an inconvenience, view the history as an 
>> implementation detail, or think in linear history of PR merges and view the 
>> commits as an implementation detail, it becomes a mess
>> 
>> Empirically, this is what I expect from a 1 commit = 1 PR policy (and how I 
>> feel about each point):
>>  - fewer commits with bad messages (yay!)
>>  - simpler git graph if we squash + rebase (meh)
>>  - larger commits of related-but-independent changes (could be OK)
>>  - commits with bullet points in their description that bundle unrelated 
>> changes (sad face)
>>  - slowdown of development (neutral - slow can be good)
>>  - fewer "quality of life" improvements, since those would add lines of diff 
>> to a PR and are off topic; when they have to be done in a separate PR they 
>> don't get done and they don't get reviewed with the same priority (extra sad 
>> face)
>> 
>> I know I am in the minority. I tend to have a lot of PRs where there 
>> are 2-5 fairly independent commits. It is "to aid code review" but not in 
>> the way you might think: The best size for code review is pretty big, 
>> compared to the best size for commit. A commit is the unit of roll-forward, 
>> roll-back, cherry-pick, etc. Brian's point about commits not being 
>> independently tested is important: this is a tooling issue, but not that 
>> easy to change. Here is why I am not that worried about it: I believe 
>> strongly in a "rollback first" policy to restore greenness, but also that 
>> the rollback change itself must be verified to restore greenness. When a 
>> multi-commit PR fails, you can easily open a revert of the whole PR as well 
>> as reverts of individual suspect commits. The CI for these will finish 
>> around the same time, and if you manage a smaller revert, great! Imagine if 
>> to revert a PR you had to revert _every_ change between HEAD and that PR. It 
>> would restore to a known green state. Yet we don't do this, because we have 
>> technology that makes it unnecessary. Ultimately, single large commits with 
>> bullet points are just an 

Re: Contributor Permission for Beam on Jira

2021-04-23 Thread Jenny Xu
Thanks Alexey!

Weiwen

On Fri, Apr 23, 2021 at 11:24 AM Alexey Romanenko 
wrote:

> Done.
>
> Welcome to Beam, Weiwen!
>
> ---
> Alexey
>
> On 23 Apr 2021, at 17:16, Jenny Xu  wrote:
>
> Hi,
>
> This is Weiwen Xu. Could someone please add me as a contributor for Beam's
> Jira issue tracker? My Jira ID is JenX.
>
> Thanks,
> Weiwen
>
>
>


Re: Contributor Permission for Beam on Jira

2021-04-23 Thread Alexey Romanenko
Done.

Welcome to Beam, Weiwen!

---
Alexey

> On 23 Apr 2021, at 17:16, Jenny Xu  wrote:
> 
> Hi,
> 
> This is Weiwen Xu. Could someone please add me as a contributor for Beam's 
> Jira issue tracker? My Jira ID is JenX.
> 
> Thanks,
> Weiwen
> 



Contributor Permission for Beam on Jira

2021-04-23 Thread Jenny Xu
Hi,

This is Weiwen Xu. Could someone please add me as a contributor for Beam's
Jira issue tracker? My Jira ID is JenX.

Thanks,
Weiwen