[ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=105653&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-105653
 ]

ASF GitHub Bot logged work on BEAM-3326:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/18 16:56
            Start Date: 24/May/18 16:56
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5349: [BEAM-3326] Remote 
stage evaluator
URL: https://github.com/apache/beam/pull/5349
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 5fc4e159520..af99b5b5bc2 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -64,6 +64,7 @@ dependencies {
   shadow library.java.slf4j_api
   shadow library.java.hamcrest_core
   shadow library.java.junit
+  testRuntime project(path: ":beam-sdks-java-harness")
   shadowTest project(path: ":beam-sdks-java-core", configuration: "shadowTest")
   shadowTest project(path: ":beam-runners-core-java", configuration: 
"shadowTest")
   shadowTest library.java.guava_testlib
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 1d3d790a747..d06b074246c 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -226,6 +226,12 @@
       <artifactId>beam-sdks-java-fn-execution</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-harness</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-java-fn-execution</artifactId>
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
new file mode 100644
index 00000000000..f34c4d54d49
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * The {@link TransformEvaluatorFactory} which produces {@link 
TransformEvaluator evaluators} for
+ * stages which execute on an SDK harness via the Fn Execution APIs.
+ */
+class RemoteStageEvaluatorFactory implements TransformEvaluatorFactory {
+  private final BundleFactory bundleFactory;
+
+  private final JobBundleFactory jobFactory;
+
+  RemoteStageEvaluatorFactory(BundleFactory bundleFactory, JobBundleFactory 
jobFactory) {
+    this.bundleFactory = bundleFactory;
+    this.jobFactory = jobFactory;
+  }
+
+  @Nullable
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      PTransformNode application, CommittedBundle<?> inputBundle) throws 
Exception {
+    return new RemoteStageEvaluator<>(application);
+  }
+
+  @Override
+  public void cleanup() throws Exception {
+    jobFactory.close();
+  }
+
+  private class RemoteStageEvaluator<T> implements TransformEvaluator<T> {
+    private final PTransformNode transform;
+    private final RemoteBundle<T> bundle;
+    private final Collection<UncommittedBundle<?>> outputs;
+
+    private RemoteStageEvaluator(PTransformNode transform) throws Exception {
+      this.transform = transform;
+      ExecutableStage stage =
+          ExecutableStage.fromPayload(
+              
ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload()));
+      outputs = new ArrayList<>();
+      StageBundleFactory stageFactory = jobFactory.forStage(stage);
+      bundle =
+          stageFactory.getBundle(
+              BundleFactoryOutputRecieverFactory.create(
+                  bundleFactory, stage.getComponents(), outputs::add),
+              StateRequestHandler.unsupported());
+    }
+
+    @Override
+    public void processElement(WindowedValue<T> element) throws Exception {
+      bundle.getInputReceiver().accept(element);
+    }
+
+    @Override
+    public TransformResult<T> finishBundle() throws Exception {
+      bundle.close();
+      return 
StepTransformResult.<T>withoutHold(transform).addOutput(outputs).build();
+    }
+  }
+}
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
new file mode 100644
index 00000000000..267ced7ef79
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactoryTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RemoteStageEvaluatorFactory}. */
+@RunWith(JUnit4.class)
+public class RemoteStageEvaluatorFactoryTest implements Serializable {
+
+  private transient RemoteStageEvaluatorFactory factory;
+  private transient ExecutorService executor;
+  private transient GrpcFnServer<GrpcDataService> dataServer;
+  private transient GrpcFnServer<GrpcStateService> stateServer;
+  private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  private transient GrpcFnServer<GrpcLoggingService> loggingServer;
+  private transient BundleFactory bundleFactory;
+
+  @Before
+  public void setup() throws Exception {
+    InProcessServerFactory serverFactory = InProcessServerFactory.create();
+
+    BlockingQueue<InstructionRequestHandler> clientPool = new 
LinkedBlockingQueue<>();
+    controlServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            FnApiControlClientPoolService.offeringClientsToPool(
+                (workerId, instructionHandler) -> 
clientPool.put(instructionHandler),
+                GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
+            serverFactory);
+    loggingServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
+
+    EnvironmentFactory environmentFactory =
+        InProcessEnvironmentFactory.create(
+            PipelineOptionsFactory.create(),
+            loggingServer,
+            controlServer,
+            (workerId, timeout) -> clientPool.take());
+    executor = Executors.newCachedThreadPool();
+    dataServer =
+        
GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), 
serverFactory);
+    stateServer = 
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
+
+    bundleFactory = ImmutableListBundleFactory.create();
+    JobBundleFactory jobBundleFactory =
+        DirectJobBundleFactory.create(environmentFactory, dataServer, 
stateServer);
+    factory = new RemoteStageEvaluatorFactory(bundleFactory, jobBundleFactory);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    try (AutoCloseable logging = loggingServer;
+        AutoCloseable exec = executor::shutdownNow;
+        AutoCloseable data = dataServer;
+        AutoCloseable state = stateServer;
+        AutoCloseable control = controlServer) {}
+  }
+
+  @Test
+  public void executesRemoteStage() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("impulse", Impulse.create())
+        .apply(
+            "CreateInputs",
+            ParDo.of(
+                new DoFn<byte[], Integer>() {
+                  @ProcessElement
+                  public void create(ProcessContext ctxt) {
+                    ctxt.output(1);
+                    ctxt.output(2);
+                    ctxt.output(3);
+                  }
+                }))
+        .apply(
+            "ParDo",
+            ParDo.of(
+                new DoFn<Integer, KV<String, Long>>() {
+                  @ProcessElement
+                  public void proc(ProcessContext ctxt) {
+                    ctxt.output(KV.of("foo", ctxt.element().longValue()));
+                  }
+                }))
+        .apply(GroupByKey.create());
+
+    RunnerApi.Pipeline fusedPipeline =
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p)).toPipeline();
+    QueryablePipeline fusedQP = QueryablePipeline.forPipeline(fusedPipeline);
+    PTransformNode impulseTransform = 
getOnlyElement(fusedQP.getRootTransforms());
+    PCollectionNode impulseOutput = 
getOnlyElement(fusedQP.getOutputPCollections(impulseTransform));
+    PTransformNode stage =
+        fusedPipeline
+            .getRootTransformIdsList()
+            .stream()
+            .map(
+                id ->
+                    PipelineNode.pTransform(
+                        id, 
fusedPipeline.getComponents().getTransformsOrThrow(id)))
+            .filter(node -> 
node.getTransform().getSpec().getUrn().equals(ExecutableStage.URN))
+            .findFirst()
+            .orElseThrow(IllegalArgumentException::new);
+
+    WindowedValue<byte[]> impulse = WindowedValue.valueInGlobalWindow(new 
byte[0]);
+    CommittedBundle<byte[]> inputBundle =
+        
bundleFactory.<byte[]>createBundle(impulseOutput).add(impulse).commit(Instant.now());
+    TransformEvaluator<byte[]> evaluator = factory.forApplication(stage, 
inputBundle);
+    evaluator.processElement(impulse);
+    TransformResult<byte[]> result = evaluator.finishBundle();
+    assertThat(Iterables.size(result.getOutputBundles()), equalTo(1));
+    CommittedBundle<?> outputs = 
getOnlyElement(result.getOutputBundles()).commit(Instant.now());
+    assertThat(Iterables.size(outputs), equalTo(3));
+  }
+}
diff --git a/runners/java-fn-execution/build.gradle 
b/runners/java-fn-execution/build.gradle
index 845c6b28ed2..3713190cec6 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -24,6 +24,7 @@ dependencies {
   compile library.java.guava
   compile library.java.findbugs_annotations
   compile project(path: ":beam-runners-core-construction-java", configuration: 
"shadow")
+  provided project(path: ":beam-sdks-java-harness")
   shadow project(path: ":beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-model-fn-execution", configuration: "shadow")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
diff --git a/runners/java-fn-execution/pom.xml 
b/runners/java-fn-execution/pom.xml
index bdf69c46a07..ef6e94ec5db 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -100,6 +100,12 @@
       <artifactId>beam-runners-core-construction-java</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-harness</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
@@ -177,11 +183,5 @@
       <version>${slf4j.version}</version>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-sdks-java-harness</artifactId>
-      <scope>test</scope>
-    </dependency>
   </dependencies>
 </project>
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
new file mode 100644
index 00000000000..03a3b550eea
--- /dev/null
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/InProcessEnvironmentFactory.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link EnvironmentFactory} that communicates to a {@link FnHarness} 
which is executing in the
+ * same process.
+ */
+public class InProcessEnvironmentFactory implements EnvironmentFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(InProcessEnvironmentFactory.class);
+
+  private final PipelineOptions options;
+
+  private final GrpcFnServer<GrpcLoggingService> loggingServer;
+  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
+
+  private final ControlClientPool.Source clientSource;
+
+  public static EnvironmentFactory create(
+      PipelineOptions options,
+      GrpcFnServer<GrpcLoggingService> loggingServer,
+      GrpcFnServer<FnApiControlClientPoolService> controlServer,
+      ControlClientPool.Source clientSource) {
+    return new InProcessEnvironmentFactory(options, loggingServer, 
controlServer, clientSource);
+  }
+
+  private InProcessEnvironmentFactory(
+      PipelineOptions options,
+      GrpcFnServer<GrpcLoggingService> loggingServer,
+      GrpcFnServer<FnApiControlClientPoolService> controlServer,
+      Source clientSource) {
+    this.options = options;
+    this.loggingServer = loggingServer;
+    this.controlServer = controlServer;
+    checkArgument(
+        loggingServer.getApiServiceDescriptor() != null,
+        "Logging Server cannot have a null %s",
+        ApiServiceDescriptor.class.getSimpleName());
+    checkArgument(
+        controlServer.getApiServiceDescriptor() != null,
+        "Control Server cannot have a null %s",
+        ApiServiceDescriptor.class.getSimpleName());
+    this.clientSource = clientSource;
+  }
+
+  @Override
+  public RemoteEnvironment createEnvironment(Environment container) throws 
Exception {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<?> fnHarness =
+        executor.submit(
+            () -> {
+              try {
+                FnHarness.main(
+                    options,
+                    loggingServer.getApiServiceDescriptor(),
+                    controlServer.getApiServiceDescriptor(),
+                    InProcessManagedChannelFactory.create(),
+                    StreamObserverFactory.direct());
+              } catch (NoClassDefFoundError e) {
+                // TODO: https://issues.apache.org/jira/browse/BEAM-4384 load 
the FnHarness in a
+                // Restricted classpath that we control for any user.
+                LOG.error(
+                    "{} while executing an in-process FnHarness. "
+                        + "To use the {}, "
+                        + "the 'org.apache.beam:beam-sdks-java-harness' 
artifact "
+                        + "and its dependencies must be on the classpath",
+                    NoClassDefFoundError.class.getSimpleName(),
+                    InProcessEnvironmentFactory.class.getSimpleName(),
+                    e);
+                throw e;
+              }
+            });
+    executor.submit(
+        () -> {
+          try {
+            fnHarness.get();
+          } catch (Throwable t) {
+            executor.shutdownNow();
+          }
+        });
+
+    // TODO: find some way to populate the actual ID in FnHarness.main()
+    InstructionRequestHandler handler = clientSource.take("", 
Duration.ofMinutes(1L));
+    return RemoteEnvironment.forHandler(container, handler);
+  }
+}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
index fb177d059c6..9be07fe8e3b 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
@@ -19,21 +19,20 @@
 package org.apache.beam.runners.fnexecution;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.time.Duration;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import javax.annotation.Nullable;
 import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.runners.fnexecution.control.ControlClientPool;
 import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.control.MapControlClientPool;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import 
org.apache.beam.runners.fnexecution.environment.InProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
-import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
-import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.rules.ExternalResource;
 import org.junit.rules.TestRule;
@@ -46,15 +45,9 @@
 public class InProcessSdkHarness extends ExternalResource implements TestRule {
 
   public static InProcessSdkHarness create() {
-    return new InProcessSdkHarness(null);
+    return new InProcessSdkHarness();
   }
 
-  public static InProcessSdkHarness withClientTimeout(Duration clientTimeout) {
-    return new InProcessSdkHarness(clientTimeout);
-  }
-
-  @Nullable private final Duration clientTimeout;
-
   private ExecutorService executor;
   private GrpcFnServer<GrpcLoggingService> loggingServer;
   private GrpcFnServer<GrpcDataService> dataServer;
@@ -62,9 +55,7 @@ public static InProcessSdkHarness withClientTimeout(Duration 
clientTimeout) {
 
   private SdkHarnessClient client;
 
-  private InProcessSdkHarness(Duration clientTimeout) {
-    this.clientTimeout = clientTimeout;
-  }
+  private InProcessSdkHarness() {}
 
   public SdkHarnessClient client() {
     return client;
@@ -74,11 +65,11 @@ public ApiServiceDescriptor dataEndpoint() {
     return dataServer.getApiServiceDescriptor();
   }
 
+  @Override
   protected void before() throws Exception {
     InProcessServerFactory serverFactory = InProcessServerFactory.create();
     executor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true).build());
-    ControlClientPool clientPool;
-    clientPool = MapControlClientPool.create();
+    ControlClientPool clientPool = MapControlClientPool.create();
     FnApiControlClientPoolService clientPoolService =
         FnApiControlClientPoolService.offeringClientsToPool(
             clientPool.getSink(), 
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
@@ -90,29 +81,28 @@ protected void before() throws Exception {
         
GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), 
serverFactory);
     controlServer = GrpcFnServer.allocatePortAndCreateFor(clientPoolService, 
serverFactory);
 
-    executor.submit(
-        () -> {
-          FnHarness.main(
-              PipelineOptionsFactory.create(),
-              loggingServer.getApiServiceDescriptor(),
-              controlServer.getApiServiceDescriptor(),
-              new InProcessManagedChannelFactory(),
-              StreamObserverFactory.direct());
-          return null;
-        });
+    InstructionRequestHandler requestHandler =
+        InProcessEnvironmentFactory.create(
+                PipelineOptionsFactory.create(),
+                loggingServer,
+                controlServer,
+                clientPool.getSource())
+            // The InProcessEnvironmentFactory can only create Java 
environments, regardless of the
+            // Environment that's passed to it.
+            .createEnvironment(Environment.getDefaultInstance())
+            .getInstructionRequestHandler();
 
     // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Worker ids cannot 
currently be set by
     // the harness. All clients have the implicit empty id for now.
-    client =
-        SdkHarnessClient.usingFnApiClient(
-            clientPool.getSource().take("", clientTimeout), 
dataServer.getService());
+    client = SdkHarnessClient.usingFnApiClient(requestHandler, 
dataServer.getService());
   }
 
+  @Override
   protected void after() {
     try (AutoCloseable logs = loggingServer;
         AutoCloseable data = dataServer;
         AutoCloseable ctl = controlServer;
-        AutoCloseable c = client; ) {
+        AutoCloseable c = client) {
       executor.shutdownNow();
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 9e44fe5380a..d8529ee55c7 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -136,7 +136,7 @@ public void setup() throws Exception {
                 PipelineOptionsFactory.create(),
                 loggingServer.getApiServiceDescriptor(),
                 controlServer.getApiServiceDescriptor(),
-                new InProcessManagedChannelFactory(),
+                InProcessManagedChannelFactory.create(),
                 StreamObserverFactory.direct()));
     // TODO: https://issues.apache.org/jira/browse/BEAM-4149 Use proper worker 
id.
     InstructionRequestHandler controlClient =
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index f9096dd7e54..64a80569a26 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -34,7 +34,6 @@
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -92,8 +91,7 @@
   @Mock public FnApiControlClient fnApiControlClient;
   @Mock public FnDataService dataService;
 
-  @Rule
-  public InProcessSdkHarness harness = 
InProcessSdkHarness.withClientTimeout(Duration.ofSeconds(5));
+  @Rule public InProcessSdkHarness harness = InProcessSdkHarness.create();
 
   @Rule public ExpectedException thrown = ExpectedException.none();
 
diff --git 
a/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java
 
b/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java
index 8000d4f7ec1..2c0845e9321 100644
--- 
a/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java
+++ 
b/runners/reference/java/src/test/java/org/apache/beam/runners/reference/PortableRunnerTest.java
@@ -54,7 +54,8 @@
   @Test
   public void stagesAndRunsJob() throws Exception {
     try (CloseableResource<Server> server = 
createJobServer(JobState.Enum.DONE)) {
-      PortableRunner runner = PortableRunner.create(options, new 
InProcessManagedChannelFactory());
+      PortableRunner runner =
+          PortableRunner.create(options, 
InProcessManagedChannelFactory.create());
       State state = runner.run(p).waitUntilFinish();
       assertThat(state, is(State.DONE));
     }
@@ -80,5 +81,4 @@ private static PipelineOptions createPipelineOptions() {
     options.setRunner(PortableRunner.class);
     return options;
   }
-
 }
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
index c25203e5d62..787047b7f6e 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/test/InProcessManagedChannelFactory.java
@@ -28,6 +28,11 @@
  * <p>The channel builder uses {@link ApiServiceDescriptor#getUrl()} as the 
unique in-process name.
  */
 public class InProcessManagedChannelFactory extends ManagedChannelFactory {
+  public static ManagedChannelFactory create() {
+    return new InProcessManagedChannelFactory();
+  }
+
+  private InProcessManagedChannelFactory() {}
 
   @Override
   public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 105653)
    Time Spent: 15h 40m  (was: 15.5h)

> Execute a Stage via the portability framework in the ReferenceRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3326
>                 URL: https://issues.apache.org/jira/browse/BEAM-3326
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 15h 40m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to