This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e6d751f92e Add support for all Java based portable runners to consume 
elements embedding on the control response. (#25463)
4e6d751f92e is described below

commit 4e6d751f92e0d20f1b7a0ce34997bd35d6401913
Author: Luke Cwik <lukec...@gmail.com>
AuthorDate: Thu Feb 16 14:06:02 2023 -0800

    Add support for all Java based portable runners to consume elements 
embedding on the control response. (#25463)
    
    * Add support for all Java based portable runners to consume elements 
embedding on the control response.
    
    Fixes #25462
---
 .../fnexecution/control/DefaultJobBundleFactory.java   |  3 +++
 .../runners/fnexecution/control/SdkHarnessClient.java  |  4 ++++
 .../environment/EmbeddedEnvironmentFactory.java        |  7 ++++++-
 .../beam/fn/harness/jmh/ProcessBundleBenchmark.java    | 16 ++++++++++++++--
 .../fn/harness/jmh/ProcessBundleBenchmarkTest.java     | 18 ++++++++++++++++--
 5 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 0810a804a4a..1ac50922a29 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -34,6 +34,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.model.fnexecution.v1.ProvisionApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols;
 import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
@@ -683,6 +684,8 @@ public class DefaultJobBundleFactory implements 
JobBundleFactory {
     provisionInfo.setLoggingEndpoint(loggingServer.getApiServiceDescriptor());
     
provisionInfo.setArtifactEndpoint(retrievalServer.getApiServiceDescriptor());
     provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor());
+    provisionInfo.addRunnerCapabilities(
+        
BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING));
     GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
         GrpcFnServer.allocatePortAndCreateFor(
             StaticGrpcProvisionService.create(
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index 1a2b1c03a0f..123128814f9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -520,6 +520,10 @@ public class SdkHarnessClient implements AutoCloseable {
           // We don't have to worry about the completion stage.
           if (exception == null) {
             BeamFnApi.ProcessBundleResponse completedResponse = 
MoreFutures.get(response);
+            if (completedResponse.hasElements()) {
+              
beamFnDataInboundObserver.get().accept(completedResponse.getElements());
+            }
+
             outstandingRequests.arriveAndAwaitAdvance();
 
             progressHandler.onCompleted(completedResponse);
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
index 1cb62136edc..1dbf6b6c3ac 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java
@@ -29,6 +29,8 @@ import org.apache.beam.fn.harness.Caches;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols;
+import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source;
@@ -108,7 +110,10 @@ public class EmbeddedEnvironmentFactory implements 
EnvironmentFactory {
                 FnHarness.main(
                     workerId,
                     options,
-                    Collections.emptySet(), // Runner capabilities.
+                    Collections.singleton(
+                        BeamUrns.getUrn(
+                            StandardRunnerProtocols.Enum
+                                .CONTROL_RESPONSE_ELEMENTS_EMBEDDING)), // 
Runner capabilities.
                     loggingServer.getApiServiceDescriptor(),
                     controlServer.getApiServiceDescriptor(),
                     null,
diff --git 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java
 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java
index df13fb2996e..3e368b041bd 100644
--- 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java
+++ 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java
@@ -22,11 +22,12 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,8 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols;
+import org.apache.beam.runners.core.construction.BeamUrns;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.FusedPipeline;
@@ -92,6 +95,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
@@ -105,6 +109,9 @@ public class ProcessBundleBenchmark {
   /** Sets up the {@link ExecutionStateTracker} and an execution state. */
   @State(Scope.Benchmark)
   public static class SdkHarness {
+    @Param({"true", "false"})
+    public String elementsEmbedding = "false";
+
     final GrpcFnServer<FnApiControlClientPoolService> controlServer;
     final GrpcFnServer<GrpcDataService> dataServer;
     final GrpcFnServer<GrpcStateService> stateServer;
@@ -117,6 +124,11 @@ public class ProcessBundleBenchmark {
     final Future<?> sdkHarnessExecutorFuture;
 
     public SdkHarness() {
+      Set<String> runnerCapabilities = new HashSet<>();
+      if (Boolean.parseBoolean(elementsEmbedding)) {
+        runnerCapabilities.add(
+            
BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING));
+      }
       try {
         // Setup execution-time servers
         ThreadFactory threadFactory =
@@ -163,7 +175,7 @@ public class ProcessBundleBenchmark {
                     FnHarness.main(
                         WORKER_ID,
                         pipelineOptions,
-                        Collections.emptySet(), // Runner capabilities.
+                        runnerCapabilities,
                         loggingServer.getApiServiceDescriptor(),
                         controlServer.getApiServiceDescriptor(),
                         null,
diff --git 
a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java
 
b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java
index e5541f2b171..0624af2556c 100644
--- 
a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java
+++ 
b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java
@@ -17,18 +17,29 @@
  */
 package org.apache.beam.fn.harness.jmh;
 
+import java.util.Arrays;
+import java.util.Collection;
 import org.apache.beam.fn.harness.jmh.ProcessBundleBenchmark.StatefulTransform;
 import org.apache.beam.fn.harness.jmh.ProcessBundleBenchmark.TrivialTransform;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /** Tests for {@link ProcessBundleBenchmark}. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 public class ProcessBundleBenchmarkTest {
+
+  @Parameterized.Parameter public String elementsEmbedding;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][] {{"false"}, {"true"}});
+  }
+
   @Test
   public void testTinyBundle() throws Exception {
     TrivialTransform transform = new TrivialTransform();
+    transform.elementsEmbedding = elementsEmbedding;
     new ProcessBundleBenchmark().testTinyBundle(transform);
     transform.tearDown();
   }
@@ -36,6 +47,7 @@ public class ProcessBundleBenchmarkTest {
   @Test
   public void testLargeBundle() throws Exception {
     TrivialTransform transform = new TrivialTransform();
+    transform.elementsEmbedding = elementsEmbedding;
     new ProcessBundleBenchmark().testLargeBundle(transform);
     transform.tearDown();
   }
@@ -43,6 +55,7 @@ public class ProcessBundleBenchmarkTest {
   @Test
   public void testStateWithoutCaching() throws Exception {
     StatefulTransform transform = new StatefulTransform();
+    transform.elementsEmbedding = elementsEmbedding;
     new ProcessBundleBenchmark().testStateWithoutCaching(transform);
     transform.tearDown();
   }
@@ -50,6 +63,7 @@ public class ProcessBundleBenchmarkTest {
   @Test
   public void testStateWithCaching() throws Exception {
     StatefulTransform transform = new StatefulTransform();
+    transform.elementsEmbedding = elementsEmbedding;
     new ProcessBundleBenchmark().testStateWithCaching(transform);
     transform.tearDown();
   }

Reply via email to