This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 4e6d751f92e Add support for all Java based portable runners to consume elements embedding on the control response. (#25463) 4e6d751f92e is described below commit 4e6d751f92e0d20f1b7a0ce34997bd35d6401913 Author: Luke Cwik <lukec...@gmail.com> AuthorDate: Thu Feb 16 14:06:02 2023 -0800 Add support for all Java based portable runners to consume elements embedding on the control response. (#25463) * Add support for all Java based portable runners to consume elements embedding on the control response. Fixes #25462 --- .../fnexecution/control/DefaultJobBundleFactory.java | 3 +++ .../runners/fnexecution/control/SdkHarnessClient.java | 4 ++++ .../environment/EmbeddedEnvironmentFactory.java | 7 ++++++- .../beam/fn/harness/jmh/ProcessBundleBenchmark.java | 16 ++++++++++++++-- .../fn/harness/jmh/ProcessBundleBenchmarkTest.java | 18 ++++++++++++++++-- 5 files changed, 43 insertions(+), 5 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java index 0810a804a4a..1ac50922a29 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java @@ -34,6 +34,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.beam.model.fnexecution.v1.ProvisionApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols; import org.apache.beam.runners.core.construction.BeamUrns; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; @@ -683,6 +684,8 @@ public class DefaultJobBundleFactory implements JobBundleFactory { provisionInfo.setLoggingEndpoint(loggingServer.getApiServiceDescriptor()); provisionInfo.setArtifactEndpoint(retrievalServer.getApiServiceDescriptor()); provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor()); + provisionInfo.addRunnerCapabilities( + BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)); GrpcFnServer<StaticGrpcProvisionService> provisioningServer = GrpcFnServer.allocatePortAndCreateFor( StaticGrpcProvisionService.create( diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java index 1a2b1c03a0f..123128814f9 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -520,6 +520,10 @@ public class SdkHarnessClient implements AutoCloseable { // We don't have to worry about the completion stage. if (exception == null) { BeamFnApi.ProcessBundleResponse completedResponse = MoreFutures.get(response); + if (completedResponse.hasElements()) { + beamFnDataInboundObserver.get().accept(completedResponse.getElements()); + } + outstandingRequests.arriveAndAwaitAdvance(); progressHandler.onCompleted(completedResponse); diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java index 1cb62136edc..1dbf6b6c3ac 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/EmbeddedEnvironmentFactory.java @@ -29,6 +29,8 @@ import org.apache.beam.fn.harness.Caches; import org.apache.beam.fn.harness.FnHarness; import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols; +import org.apache.beam.runners.core.construction.BeamUrns; import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; import org.apache.beam.runners.fnexecution.control.ControlClientPool; import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source; @@ -108,7 +110,10 @@ public class EmbeddedEnvironmentFactory implements EnvironmentFactory { FnHarness.main( workerId, options, - Collections.emptySet(), // Runner capabilities. + Collections.singleton( + BeamUrns.getUrn( + StandardRunnerProtocols.Enum + .CONTROL_RESPONSE_ELEMENTS_EMBEDDING)), // Runner capabilities. loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), null, diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java index df13fb2996e..3e368b041bd 100644 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmark.java @@ -22,11 +22,12 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec import static org.junit.Assert.assertEquals; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +44,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols; +import org.apache.beam.runners.core.construction.BeamUrns; import org.apache.beam.runners.core.construction.PipelineTranslation; import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.FusedPipeline; @@ -92,6 +95,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.TearDown; @@ -105,6 +109,9 @@ public class ProcessBundleBenchmark { /** Sets up the {@link ExecutionStateTracker} and an execution state. */ @State(Scope.Benchmark) public static class SdkHarness { + @Param({"true", "false"}) + public String elementsEmbedding = "false"; + final GrpcFnServer<FnApiControlClientPoolService> controlServer; final GrpcFnServer<GrpcDataService> dataServer; final GrpcFnServer<GrpcStateService> stateServer; @@ -117,6 +124,11 @@ public class ProcessBundleBenchmark { final Future<?> sdkHarnessExecutorFuture; public SdkHarness() { + Set<String> runnerCapabilities = new HashSet<>(); + if (Boolean.parseBoolean(elementsEmbedding)) { + runnerCapabilities.add( + BeamUrns.getUrn(StandardRunnerProtocols.Enum.CONTROL_RESPONSE_ELEMENTS_EMBEDDING)); + } try { // Setup execution-time servers ThreadFactory threadFactory = @@ -163,7 +175,7 @@ public class ProcessBundleBenchmark { FnHarness.main( WORKER_ID, pipelineOptions, - Collections.emptySet(), // Runner capabilities. + runnerCapabilities, loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), null, diff --git a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java index e5541f2b171..0624af2556c 100644 --- a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java +++ b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/ProcessBundleBenchmarkTest.java @@ -17,18 +17,29 @@ */ package org.apache.beam.fn.harness.jmh; +import java.util.Arrays; +import java.util.Collection; import org.apache.beam.fn.harness.jmh.ProcessBundleBenchmark.StatefulTransform; import org.apache.beam.fn.harness.jmh.ProcessBundleBenchmark.TrivialTransform; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Tests for {@link ProcessBundleBenchmark}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class ProcessBundleBenchmarkTest { + + @Parameterized.Parameter public String elementsEmbedding; + + @Parameterized.Parameters + public static Collection<Object[]> parameters() { + return Arrays.asList(new Object[][] {{"false"}, {"true"}}); + } + @Test public void testTinyBundle() throws Exception { TrivialTransform transform = new TrivialTransform(); + transform.elementsEmbedding = elementsEmbedding; new ProcessBundleBenchmark().testTinyBundle(transform); transform.tearDown(); } @@ -36,6 +47,7 @@ public class ProcessBundleBenchmarkTest { @Test public void testLargeBundle() throws Exception { TrivialTransform transform = new TrivialTransform(); + transform.elementsEmbedding = elementsEmbedding; new ProcessBundleBenchmark().testLargeBundle(transform); transform.tearDown(); } @@ -43,6 +55,7 @@ public class ProcessBundleBenchmarkTest { @Test public void testStateWithoutCaching() throws Exception { StatefulTransform transform = new StatefulTransform(); + transform.elementsEmbedding = elementsEmbedding; new ProcessBundleBenchmark().testStateWithoutCaching(transform); transform.tearDown(); } @@ -50,6 +63,7 @@ public class ProcessBundleBenchmarkTest { @Test public void testStateWithCaching() throws Exception { StatefulTransform transform = new StatefulTransform(); + transform.elementsEmbedding = elementsEmbedding; new ProcessBundleBenchmark().testStateWithCaching(transform); transform.tearDown(); }