[ https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105653&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105653 ]
ASF GitHub Bot logged work on BEAM-3326: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/May/18 16:56 Start Date: 24/May/18 16:56 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5349: [BEAM-3326] Remote stage evaluator URL: https://github.com/apache/beam/pull/5349 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index 5fc4e159520..af99b5b5bc2 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -64,6 +64,7 @@ dependencies { shadow library.java.slf4j_api shadow library.java.hamcrest_core shadow library.java.junit + testRuntime project(path: ":beam-sdks-java-harness") shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest") shadowTest project(path: ":beam-runners-core-java", configuration: "shadowTest") shadowTest library.java.guava_testlib diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml index 1d3d790a747..d06b074246c 100644 --- a/runners/direct-java/pom.xml +++ b/runners/direct-java/pom.xml @@ -226,6 +226,12 @@ <artifactId>beam-sdks-java-fn-execution</artifactId> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-harness</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-java-fn-execution</artifactId> diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java new file mode 100644 index 00000000000..f34c4d54d49 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import java.util.ArrayList; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * The {@link TransformEvaluatorFactory} which produces {@link TransformEvaluator evaluators} for + * stages which execute on an SDK harness via the Fn Execution APIs. + */ +class RemoteStageEvaluatorFactory implements TransformEvaluatorFactory { + private final BundleFactory bundleFactory; + + private final JobBundleFactory jobFactory; + + RemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory jobFactory) { + this.bundleFactory = bundleFactory; + this.jobFactory = jobFactory; + } + + @Nullable + @Override + public <InputT> TransformEvaluator<InputT> forApplication( + PTransformNode application, CommittedBundle<?> inputBundle) throws Exception { + return new RemoteStageEvaluator<>(application); + } + + @Override + public void cleanup() throws Exception { + jobFactory.close(); + } + + private class RemoteStageEvaluator<T> implements TransformEvaluator<T> { + private final PTransformNode transform; + private final RemoteBundle<T> bundle; + private final Collection<UncommittedBundle<?>> outputs; + + private RemoteStageEvaluator(PTransformNode transform) throws Exception { + this.transform = transform; + ExecutableStage stage = + ExecutableStage.fromPayload( + ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload())); + outputs = new ArrayList<>(); + StageBundleFactory stageFactory = jobFactory.forStage(stage); + bundle = + stageFactory.getBundle( + BundleFactoryOutputRecieverFactory.create( + bundleFactory, stage.getComponents(), outputs::add), + StateRequestHandler.unsupported()); + } + + @Override + public void processElement(WindowedValue<T> element) throws Exception { + bundle.getInputReceiver().accept(element); + } + + @Override + public TransformResult<T> finishBundle() throws Exception { + bundle.close(); + return StepTransformResult.<T>withoutHold(transform).addOutput(outputs).build(); + } + } +} diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java new file mode 100644 index 00000000000..267ced7ef79 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct.portable; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.Iterables; +import java.io.Serializable; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.runners.core.construction.PipelineTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; +import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; +import org.apache.beam.runners.core.construction.graph.PipelineNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; +import org.apache.beam.runners.core.construction.graph.QueryablePipeline; +import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.InProcessServerFactory; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.control.JobBundleFactory; +import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory; +import org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter; +import org.apache.beam.runners.fnexecution.state.GrpcStateService; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Impulse; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link RemoteStageEvaluatorFactory}. */ +@RunWith(JUnit4.class) +public class RemoteStageEvaluatorFactoryTest implements Serializable { + + private transient RemoteStageEvaluatorFactory factory; + private transient ExecutorService executor; + private transient GrpcFnServer<GrpcDataService> dataServer; + private transient GrpcFnServer<GrpcStateService> stateServer; + private transient GrpcFnServer<FnApiControlClientPoolService> controlServer; + private transient GrpcFnServer<GrpcLoggingService> loggingServer; + private transient BundleFactory bundleFactory; + + @Before + public void setup() throws Exception { + InProcessServerFactory serverFactory = InProcessServerFactory.create(); + + BlockingQueue<InstructionRequestHandler> clientPool = new LinkedBlockingQueue<>(); + controlServer = + GrpcFnServer.allocatePortAndCreateFor( + FnApiControlClientPoolService.offeringClientsToPool( + (workerId, instructionHandler) -> clientPool.put(instructionHandler), + GrpcContextHeaderAccessorProvider.getHeaderAccessor()), + serverFactory); + loggingServer = + GrpcFnServer.allocatePortAndCreateFor( + GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory); + + EnvironmentFactory environmentFactory = + InProcessEnvironmentFactory.create( + PipelineOptionsFactory.create(), + loggingServer, + controlServer, + (workerId, timeout) -> clientPool.take()); + executor = Executors.newCachedThreadPool(); + dataServer = + GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), serverFactory); + stateServer = GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory); + + bundleFactory = ImmutableListBundleFactory.create(); + JobBundleFactory jobBundleFactory = + DirectJobBundleFactory.create(environmentFactory, dataServer, stateServer); + factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory); + } + + @After + public void teardown() throws Exception { + try (AutoCloseable logging = loggingServer; + AutoCloseable exec = executor::shutdownNow; + AutoCloseable data = dataServer; + AutoCloseable state = stateServer; + AutoCloseable control = controlServer) {} + } + + @Test + public void executesRemoteStage() throws Exception { + Pipeline p = Pipeline.create(); + p.apply("impulse", Impulse.create()) + .apply( + "CreateInputs", + ParDo.of( + new DoFn<byte[], Integer>() { + @ProcessElement + public void create(ProcessContext ctxt) { + ctxt.output(1); + ctxt.output(2); + ctxt.output(3); + } + })) + .apply( + "ParDo", + ParDo.of( + new DoFn<Integer, KV<String, Long>>() { + @ProcessElement + public void proc(ProcessContext ctxt) { + ctxt.output(KV.of("foo", ctxt.element().longValue())); + } + })) + .apply(GroupByKey.create()); + + RunnerApi.Pipeline fusedPipeline = + GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).toPipeline(); + QueryablePipeline fusedQP = QueryablePipeline.forPipeline(fusedPipeline); + PTransformNode impulseTransform = getOnlyElement(fusedQP.getRootTransforms()); + PCollectionNode impulseOutput = getOnlyElement(fusedQP.getOutputPCollections(impulseTransform)); + PTransformNode stage = + fusedPipeline + .getRootTransformIdsList() + .stream() + .map( + id -> + PipelineNode.pTransform( + id, fusedPipeline.getComponents().getTransformsOrThrow(id))) + .filter(node -> node.getTransform().getSpec().getUrn().equals(ExecutableStage.URN)) + .findFirst() + .orElseThrow(IllegalArgumentException::new); + + WindowedValue<byte[]> impulse = WindowedValue.valueInGlobalWindow(new byte[0]); + CommittedBundle<byte[]> inputBundle = + bundleFactory.<byte[]>createBundle(impulseOutput).add(impulse).commit(Instant.now()); + TransformEvaluator<byte[]> evaluator = factory.forApplication(stage, inputBundle); + evaluator.processElement(impulse); + TransformResult<byte[]> result = evaluator.finishBundle(); + assertThat(Iterables.size(result.getOutputBundles()), equalTo(1)); + CommittedBundle<?> outputs = getOnlyElement(result.getOutputBundles()).commit(Instant.now()); + assertThat(Iterables.size(outputs), equalTo(3)); + } +} diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle index 845c6b28ed2..3713190cec6 100644 --- a/runners/java-fn-execution/build.gradle +++ b/runners/java-fn-execution/build.gradle @@ -24,6 +24,7 @@ dependencies { compile library.java.guava compile library.java.findbugs_annotations compile project(path: ":beam-runners-core-construction-java", configuration: "shadow") + provided project(path: ":beam-sdks-java-harness") shadow project(path: ":beam-model-pipeline", configuration: "shadow") shadow project(path: ":beam-model-fn-execution", configuration: "shadow") shadow project(path: ":beam-sdks-java-core", configuration: "shadow") diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml index bdf69c46a07..ef6e94ec5db 100644 --- a/runners/java-fn-execution/pom.xml +++ b/runners/java-fn-execution/pom.xml @@ -100,6 +100,12 @@ <artifactId>beam-runners-core-construction-java</artifactId> </dependency> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-harness</artifactId> + <scope>provided</scope> + </dependency> + <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> @@ -177,11 +183,5 @@ <version>${slf4j.version}</version> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.beam</groupId> - <artifactId>beam-sdks-java-harness</artifactId> - <scope>test</scope> - </dependency> </dependencies> </project> diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java new file mode 100644 index 00000000000..03a3b550eea --- /dev/null +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.beam.runners.fnexecution.environment; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.beam.fn.harness.FnHarness; +import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.sdk.fn.stream.StreamObserverFactory; +import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} which is executing in the + * same process. + */ +public class InProcessEnvironmentFactory implements EnvironmentFactory { + private static final Logger LOG = LoggerFactory.getLogger(InProcessEnvironmentFactory.class); + + private final PipelineOptions options; + + private final GrpcFnServer<GrpcLoggingService> loggingServer; + private final GrpcFnServer<FnApiControlClientPoolService> controlServer; + + private final ControlClientPool.Source clientSource; + + public static EnvironmentFactory create( + PipelineOptions options, + GrpcFnServer<GrpcLoggingService> loggingServer, + GrpcFnServer<FnApiControlClientPoolService> controlServer, + ControlClientPool.Source clientSource) { + return new InProcessEnvironmentFactory(options, loggingServer, controlServer, clientSource); + } + + private InProcessEnvironmentFactory( + PipelineOptions options, + GrpcFnServer<GrpcLoggingService> loggingServer, + GrpcFnServer<FnApiControlClientPoolService> controlServer, + Source clientSource) { + this.options = options; + this.loggingServer = loggingServer; + this.controlServer = controlServer; + checkArgument( + loggingServer.getApiServiceDescriptor() != null, + "Logging Server cannot have a null %s", + ApiServiceDescriptor.class.getSimpleName()); + checkArgument( + controlServer.getApiServiceDescriptor() != null, + "Control Server cannot have a null %s", + ApiServiceDescriptor.class.getSimpleName()); + this.clientSource = clientSource; + } + + @Override + public RemoteEnvironment createEnvironment(Environment container) throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future<?> fnHarness = + executor.submit( + () -> { + try { + FnHarness.main( + options, + loggingServer.getApiServiceDescriptor(), + controlServer.getApiServiceDescriptor(), + InProcessManagedChannelFactory.create(), + StreamObserverFactory.direct()); + } catch (NoClassDefFoundError e) { + // TODO: https://issues.apache.org/jira/browse/BEAM-4384 load the FnHarness in a + // Restricted classpath that we control for any user. + LOG.error( + "{} while executing an in-process FnHarness. " + + "To use the {}, " + + "the 'org.apache.beam:beam-sdks-java-harness' artifact " + + "and its dependencies must be on the classpath", + NoClassDefFoundError.class.getSimpleName(), + InProcessEnvironmentFactory.class.getSimpleName(), + e); + throw e; + } + }); + executor.submit( + () -> { + try { + fnHarness.get(); + } catch (Throwable t) { + executor.shutdownNow(); + } + }); + + // TODO: find some way to populate the actual ID in FnHarness.main() + InstructionRequestHandler handler = clientSource.take("", Duration.ofMinutes(1L)); + return RemoteEnvironment.forHandler(container, handler); + } +} diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java index fb177d059c6..9be07fe8e3b 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java @@ -19,21 +19,20 @@ package org.apache.beam.runners.fnexecution; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import javax.annotation.Nullable; import org.apache.beam.fn.harness.FnHarness; import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.runners.fnexecution.control.ControlClientPool; import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; import org.apache.beam.runners.fnexecution.control.MapControlClientPool; import org.apache.beam.runners.fnexecution.control.SdkHarnessClient; import org.apache.beam.runners.fnexecution.data.GrpcDataService; +import org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory; import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter; -import org.apache.beam.sdk.fn.stream.StreamObserverFactory; -import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.rules.ExternalResource; import org.junit.rules.TestRule; @@ -46,15 +45,9 @@ public class InProcessSdkHarness extends ExternalResource implements TestRule { public static InProcessSdkHarness create() { - return new InProcessSdkHarness(null); + return new InProcessSdkHarness(); } - public static InProcessSdkHarness withClientTimeout(Duration clientTimeout) { - return new InProcessSdkHarness(clientTimeout); - } - - @Nullable private final Duration clientTimeout; - private ExecutorService executor; private GrpcFnServer<GrpcLoggingService> loggingServer; private GrpcFnServer<GrpcDataService> dataServer; @@ -62,9 +55,7 @@ public static InProcessSdkHarness withClientTimeout(Duration clientTimeout) { private SdkHarnessClient client; - private InProcessSdkHarness(Duration clientTimeout) { - this.clientTimeout = clientTimeout; - } + private InProcessSdkHarness() {} public SdkHarnessClient client() { return client; @@ -74,11 +65,11 @@ public ApiServiceDescriptor dataEndpoint() { return dataServer.getApiServiceDescriptor(); } + @Override protected void before() throws Exception { InProcessServerFactory serverFactory = InProcessServerFactory.create(); executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()); - ControlClientPool clientPool; - clientPool = MapControlClientPool.create(); + ControlClientPool clientPool = MapControlClientPool.create(); FnApiControlClientPoolService clientPoolService = FnApiControlClientPoolService.offeringClientsToPool( clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()); @@ -90,29 +81,28 @@ protected void before() throws Exception { GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), serverFactory); controlServer = GrpcFnServer.allocatePortAndCreateFor(clientPoolService, serverFactory); - executor.submit( - () -> { - FnHarness.main( - PipelineOptionsFactory.create(), - loggingServer.getApiServiceDescriptor(), - controlServer.getApiServiceDescriptor(), - new InProcessManagedChannelFactory(), - StreamObserverFactory.direct()); - return null; - }); + InstructionRequestHandler requestHandler = + InProcessEnvironmentFactory.create( + PipelineOptionsFactory.create(), + loggingServer, + controlServer, + clientPool.getSource()) + // The InProcessEnvironmentFactory can only create Java environments, regardless of the + // Environment that's passed to it. + .createEnvironment(Environment.getDefaultInstance()) + .getInstructionRequestHandler(); // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Worker ids cannot currently be set by // the harness. All clients have the implicit empty id for now. - client = - SdkHarnessClient.usingFnApiClient( - clientPool.getSource().take("", clientTimeout), dataServer.getService()); + client = SdkHarnessClient.usingFnApiClient(requestHandler, dataServer.getService()); } + @Override protected void after() { try (AutoCloseable logs = loggingServer; AutoCloseable data = dataServer; AutoCloseable ctl = controlServer; - AutoCloseable c = client; ) { + AutoCloseable c = client) { executor.shutdownNow(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index 9e44fe5380a..d8529ee55c7 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -136,7 +136,7 @@ public void setup() throws Exception { PipelineOptionsFactory.create(), loggingServer.getApiServiceDescriptor(), controlServer.getApiServiceDescriptor(), - new InProcessManagedChannelFactory(), + InProcessManagedChannelFactory.create(), StreamObserverFactory.direct())); // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Use proper worker id. InstructionRequestHandler controlClient = diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java index f9096dd7e54..64a80569a26 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java @@ -34,7 +34,6 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; -import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -92,8 +91,7 @@ @Mock public FnApiControlClient fnApiControlClient; @Mock public FnDataService dataService; - @Rule - public InProcessSdkHarness harness = InProcessSdkHarness.withClientTimeout(Duration.ofSeconds(5)); + @Rule public InProcessSdkHarness harness = InProcessSdkHarness.create(); @Rule public ExpectedException thrown = ExpectedException.none(); diff --git a/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java b/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java index 8000d4f7ec1..2c0845e9321 100644 --- a/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java +++ b/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java @@ -54,7 +54,8 @@ @Test public void stagesAndRunsJob() throws Exception { try (CloseableResource<Server> server = createJobServer(JobState.Enum.DONE)) { - PortableRunner runner = PortableRunner.create(options, new InProcessManagedChannelFactory()); + PortableRunner runner = + PortableRunner.create(options, InProcessManagedChannelFactory.create()); State state = runner.run(p).waitUntilFinish(); assertThat(state, is(State.DONE)); } @@ -80,5 +81,4 @@ private static PipelineOptions createPipelineOptions() { options.setRunner(PortableRunner.class); return options; } - } diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java index c25203e5d62..787047b7f6e 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java @@ -28,6 +28,11 @@ * <p>The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the unique in-process name. */ public class InProcessManagedChannelFactory extends ManagedChannelFactory { + public static ManagedChannelFactory create() { + return new InProcessManagedChannelFactory(); + } + + private InProcessManagedChannelFactory() {} @Override public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 105653) Time Spent: 15h 40m (was: 15.5h) > Execute a Stage via the portability framework in the ReferenceRunner > -------------------------------------------------------------------- > > Key: BEAM-3326 > URL: https://issues.apache.org/jira/browse/BEAM-3326 > Project: Beam > Issue Type: New Feature > Components: runner-core > Reporter: Thomas Groh > Assignee: Thomas Groh > Priority: Major > Labels: portability > Time Spent: 15h 40m > Remaining Estimate: 0h > > This is the supertask for remote execution in the Universal Local Runner > (BEAM-2899). > This executes a stage remotely via portability framework APIs -- This message was sent by Atlassian JIRA (v7.6.3#76005)