[ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=104679&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104679
 ]

ASF GitHub Bot logged work on BEAM-3326:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/May/18 16:49
            Start Date: 22/May/18 16:49
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5348: [BEAM-3326] Add a 
Direct Job Bundle Factory
URL: https://github.com/apache/beam/pull/5348
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle
index 220f4fcffb9..e74cd5313fe 100644
--- a/runners/direct-java/build.gradle
+++ b/runners/direct-java/build.gradle
@@ -57,6 +57,7 @@ dependencies {
   compile project(path: ":beam-runners-core-java", configuration: "shadow")
   compile project(path: ":beam-runners-local-java-core", configuration: 
"shadow")
   compile project(path: ":beam-runners-java-fn-execution", configuration: 
"shadow")
+  compile project(path: ":beam-sdks-java-fn-execution", configuration: 
"shadow")
   shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
   shadow library.java.joda_time
   shadow library.java.findbugs_jsr305
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 0613a0a863f..1d3d790a747 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -221,6 +221,11 @@
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-fn-execution</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-java-fn-execution</artifactId>
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java
new file mode 100644
index 00000000000..3285619fd2d
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/**
+ * An {@link OutputReceiverFactory} which adds received elements to {@link 
UncommittedBundle}
+ * instances. The produced {@link UncommittedBundle bundles} are added to a 
provided {@link
+ * StepTransformResult.Builder StepTransformResult Builder}.
+ */
+class BundleFactoryOutputRecieverFactory implements OutputReceiverFactory {
+  private final BundleFactory bundleFactory;
+  private final RunnerApi.Components components;
+
+  private final Consumer<UncommittedBundle<?>> bundleConsumer;
+
+  private BundleFactoryOutputRecieverFactory(
+      BundleFactory bundleFactory,
+      Components components,
+      Consumer<UncommittedBundle<?>> bundleConsumer) {
+    this.bundleFactory = bundleFactory;
+    this.components = components;
+    this.bundleConsumer = bundleConsumer;
+  }
+
+  public static OutputReceiverFactory create(
+      BundleFactory bundleFactory,
+      Components components,
+      Consumer<UncommittedBundle<?>> resultBuilder) {
+    return new BundleFactoryOutputRecieverFactory(bundleFactory, components, 
resultBuilder);
+  }
+
+  @Override
+  public <OutputT> FnDataReceiver<OutputT> create(String pCollectionId) {
+    PCollectionNode pcollection =
+        PipelineNode.pCollection(pCollectionId, 
components.getPcollectionsOrThrow(pCollectionId));
+    return create(pcollection);
+  }
+
+  private <ElemT, OutputT> FnDataReceiver<OutputT> create(PCollectionNode 
pcollection) {
+    UncommittedBundle<ElemT> bundle = bundleFactory.createBundle(pcollection);
+    bundleConsumer.accept(bundle);
+    return input -> bundle.add((WindowedValue<ElemT>) input);
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
new file mode 100644
index 00000000000..98094c7087f
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors;
+import 
org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.WindowedValue;
+
+/** A {@link JobBundleFactory} for the ReferenceRunner. */
+class DirectJobBundleFactory implements JobBundleFactory {
+  public static JobBundleFactory create(
+      EnvironmentFactory environmentFactory,
+      GrpcFnServer<GrpcDataService> data,
+      GrpcFnServer<GrpcStateService> state) {
+    return new DirectJobBundleFactory(environmentFactory, data, state);
+  }
+
+  private final EnvironmentFactory environmentFactory;
+
+  private final GrpcFnServer<GrpcDataService> dataService;
+  private final GrpcFnServer<GrpcStateService> stateService;
+
+  private final ConcurrentMap<ExecutableStage, StageBundleFactory<?>> 
stageBundleFactories =
+      new ConcurrentHashMap<>();
+  private final ConcurrentMap<Environment, RemoteEnvironment> environments =
+      new ConcurrentHashMap<>();
+
+  private final IdGenerator idGenerator = IdGenerators.incrementingLongs();
+
+  private DirectJobBundleFactory(
+      EnvironmentFactory environmentFactory,
+      GrpcFnServer<GrpcDataService> dataService,
+      GrpcFnServer<GrpcStateService> stateService) {
+    this.environmentFactory = environmentFactory;
+    this.dataService = dataService;
+    this.stateService = stateService;
+  }
+
+  @Override
+  public <T> StageBundleFactory<T> forStage(ExecutableStage executableStage) {
+    return (StageBundleFactory<T>)
+        stageBundleFactories.computeIfAbsent(executableStage, 
this::createBundleFactory);
+  }
+
+  private final AtomicLong idgen = new AtomicLong();
+
+  private <T> StageBundleFactory<T> createBundleFactory(ExecutableStage stage) 
{
+    RemoteEnvironment remoteEnv =
+        environments.computeIfAbsent(
+            stage.getEnvironment(),
+            env -> {
+              try {
+                return environmentFactory.createEnvironment(env);
+              } catch (Exception e) {
+                throw new RuntimeException(e);
+              }
+            });
+    SdkHarnessClient sdkHarnessClient =
+        SdkHarnessClient.usingFnApiClient(
+            remoteEnv.getInstructionRequestHandler(), 
dataService.getService());
+    ExecutableProcessBundleDescriptor descriptor;
+    try {
+      descriptor =
+          ProcessBundleDescriptors.fromExecutableStage(
+              Long.toString(idgen.getAndIncrement()), stage, 
dataService.getApiServiceDescriptor());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    RemoteInputDestination<? super WindowedValue<?>> destination =
+        descriptor.getRemoteInputDestination();
+    SdkHarnessClient.BundleProcessor<T> bundleProcessor =
+        sdkHarnessClient.getProcessor(
+            descriptor.getProcessBundleDescriptor(),
+            (RemoteInputDestination<WindowedValue<T>>) 
(RemoteInputDestination) destination,
+            stateService.getService());
+    return new DirectStageBundleFactory<>(descriptor, bundleProcessor);
+  }
+
+  @Override
+  public void close() throws Exception {
+    Exception thrown = null;
+    for (RemoteEnvironment remoteEnvironment : environments.values()) {
+      try {
+        remoteEnvironment.close();
+      } catch (Exception e) {
+        if (thrown == null) {
+          thrown = e;
+        } else {
+          thrown.addSuppressed(e);
+        }
+      }
+    }
+    if (thrown != null) {
+      throw thrown;
+    }
+  }
+
+  private static class DirectStageBundleFactory<T> implements 
StageBundleFactory<T> {
+    private final ExecutableProcessBundleDescriptor descriptor;
+    private final SdkHarnessClient.BundleProcessor<T> processor;
+
+    private DirectStageBundleFactory(
+        ExecutableProcessBundleDescriptor descriptor,
+        SdkHarnessClient.BundleProcessor<T> processor) {
+      this.descriptor = descriptor;
+      this.processor = processor;
+    }
+
+    @Override
+    public RemoteBundle<T> getBundle(
+        OutputReceiverFactory outputReceiverFactory, StateRequestHandler 
stateRequestHandler)
+        throws Exception {
+      Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+      for (Map.Entry<Target, Coder<WindowedValue<?>>> targetCoders :
+          descriptor.getOutputTargetCoders().entrySet()) {
+        String bundleOutputPCollection =
+            Iterables.getOnlyElement(
+                descriptor
+                    .getProcessBundleDescriptor()
+                    
.getTransformsOrThrow(targetCoders.getKey().getPrimitiveTransformReference())
+                    .getInputsMap()
+                    .values());
+        FnDataReceiver<WindowedValue<?>> outputReceiver =
+            outputReceiverFactory.create(bundleOutputPCollection);
+        outputReceivers.put(
+            targetCoders.getKey(),
+            RemoteOutputReceiver.of(targetCoders.getValue(), outputReceiver));
+      }
+      return processor.newBundle(outputReceivers, stateRequestHandler);
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactoryTest.java
new file mode 100644
index 00000000000..943897ec600
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/BundleFactoryOutputRecieverFactoryTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BundleFactoryOutputRecieverFactory}. */
+@RunWith(JUnit4.class)
+public class BundleFactoryOutputRecieverFactoryTest {
+  private final BundleFactory bundleFactory = 
ImmutableListBundleFactory.create();
+  private PCollectionNode fooPC;
+  private PCollectionNode barPC;
+  private RunnerApi.Components components;
+
+  private OutputReceiverFactory factory;
+  private Collection<UncommittedBundle<?>> outputBundles;
+
+  @Before
+  public void setup() throws IOException {
+    Pipeline p = Pipeline.create();
+    PCollection<String> foo =
+        p.apply("createFoo", Create.of("1", "2", "3"))
+            .apply("windowFoo", 
Window.into(FixedWindows.of(Duration.standardMinutes(5L))));
+    PCollection<Integer> bar = p.apply("bar", Create.of(1, 2, 3));
+
+    SdkComponents sdkComponents = SdkComponents.create();
+    String fooId = sdkComponents.registerPCollection(foo);
+    String barId = sdkComponents.registerPCollection(bar);
+    components = sdkComponents.toComponents();
+
+    fooPC = PipelineNode.pCollection(fooId, 
components.getPcollectionsOrThrow(fooId));
+    barPC = PipelineNode.pCollection(barId, 
components.getPcollectionsOrThrow(barId));
+
+    outputBundles = new ArrayList<>();
+    factory =
+        BundleFactoryOutputRecieverFactory.create(bundleFactory, components, 
outputBundles::add);
+  }
+
+  @Test
+  public void addsBundlesToResult() {
+    factory.create(fooPC.getId());
+    factory.create(barPC.getId());
+
+    assertThat(Iterables.size(outputBundles), equalTo(2));
+
+    Collection<PCollectionNode> pcollections = new ArrayList<>();
+    for (UncommittedBundle<?> bundle : outputBundles) {
+      pcollections.add(bundle.getPCollection());
+    }
+    assertThat(pcollections, containsInAnyOrder(fooPC, barPC));
+  }
+
+  @Test
+  public void receiverAddsElementsToBundle() throws Exception {
+    FnDataReceiver<WindowedValue<byte[]>> receiver = 
factory.create(fooPC.getId());
+    MessageWithComponents sdkWireCoder =
+        WireCoders.createSdkWireCoder(fooPC, components, 
components::containsCoders);
+    Coder<WindowedValue<String>> sdkCoder =
+        (Coder<WindowedValue<String>>)
+            CoderTranslation.fromProto(
+                sdkWireCoder.getCoder(),
+                
RehydratedComponents.forComponents(sdkWireCoder.getComponents()));
+    Coder<WindowedValue<byte[]>> runnerCoder =
+        WireCoders.instantiateRunnerWireCoder(fooPC, components);
+
+    WindowedValue<byte[]> firstElem =
+        CoderUtils.decodeFromByteArray(
+            runnerCoder,
+            CoderUtils.encodeToByteArray(
+                sdkCoder,
+                WindowedValue.of(
+                    "1",
+                    new Instant(120),
+                    new IntervalWindow(new Instant(0), 
Duration.standardMinutes(5)),
+                    PaneInfo.NO_FIRING)));
+    WindowedValue<byte[]> secondElem =
+        CoderUtils.decodeFromByteArray(
+            runnerCoder,
+            CoderUtils.encodeToByteArray(
+                sdkCoder,
+                WindowedValue.of(
+                    "2",
+                    new Instant(240),
+                    new IntervalWindow(new Instant(0), 
Duration.standardMinutes(5)),
+                    PaneInfo.NO_FIRING)));
+    receiver.accept(firstElem);
+    receiver.accept(secondElem);
+
+    CommittedBundle<?> output = 
getOnlyElement(outputBundles).commit(Instant.now());
+    assertThat(output, containsInAnyOrder(firstElem, secondElem));
+  }
+
+  /**
+   * Tests that if a {@link
+   * 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode} 
is provided
+   * multiple times, the returned {@link
+   * org.apache.beam.runners.fnexecution.control.RemoteOutputReceiver} 
instances are independent.
+   */
+  @Test
+  public void multipleInstancesOfPCollectionIndependent() throws Exception {
+    FnDataReceiver<WindowedValue<byte[]>> firstReceiver = 
factory.create(fooPC.getId());
+    FnDataReceiver<WindowedValue<byte[]>> secondReceiver = 
factory.create(fooPC.getId());
+    MessageWithComponents sdkWireCoder =
+        WireCoders.createSdkWireCoder(fooPC, components, 
components::containsCoders);
+    Coder<WindowedValue<String>> sdkCoder =
+        (Coder<WindowedValue<String>>)
+            CoderTranslation.fromProto(
+                sdkWireCoder.getCoder(),
+                
RehydratedComponents.forComponents(sdkWireCoder.getComponents()));
+    Coder<WindowedValue<byte[]>> runnerCoder =
+        WireCoders.instantiateRunnerWireCoder(fooPC, components);
+
+    WindowedValue<byte[]> firstElem =
+        CoderUtils.decodeFromByteArray(
+            runnerCoder,
+            CoderUtils.encodeToByteArray(
+                sdkCoder,
+                WindowedValue.of(
+                    "1",
+                    new Instant(120),
+                    new IntervalWindow(new Instant(0), 
Duration.standardMinutes(5)),
+                    PaneInfo.NO_FIRING)));
+    firstReceiver.accept(firstElem);
+
+    WindowedValue<byte[]> secondElem =
+        CoderUtils.decodeFromByteArray(
+            runnerCoder,
+            CoderUtils.encodeToByteArray(
+                sdkCoder,
+                WindowedValue.of(
+                    "2",
+                    new Instant(240),
+                    new IntervalWindow(new Instant(0), 
Duration.standardMinutes(5)),
+                    PaneInfo.NO_FIRING)));
+    secondReceiver.accept(secondElem);
+
+    Collection<WindowedValue<?>> outputs = new ArrayList<>();
+    for (UncommittedBundle<?> uncommitted : outputBundles) {
+      assertThat(uncommitted.getPCollection(), equalTo(fooPC));
+      Iterable<? extends WindowedValue<?>> elements =
+          uncommitted.commit(Instant.now()).getElements();
+      Iterables.addAll(outputs, elements);
+      assertThat(Iterables.size(elements), equalTo(1));
+    }
+    assertThat(outputs, containsInAnyOrder(firstElem, secondElem));
+  }
+
+  @Test
+  public void differentPCollectionsIndependent() throws Exception {
+    FnDataReceiver<WindowedValue<byte[]>> fooReceiver = 
factory.create(fooPC.getId());
+    MessageWithComponents fooSdkWireCoder =
+        WireCoders.createSdkWireCoder(fooPC, components, 
components::containsCoders);
+    Coder<WindowedValue<String>> fooSdkCoder =
+        (Coder<WindowedValue<String>>)
+            CoderTranslation.fromProto(
+                fooSdkWireCoder.getCoder(),
+                
RehydratedComponents.forComponents(fooSdkWireCoder.getComponents()));
+    Coder<WindowedValue<byte[]>> fooRunnerCoder =
+        WireCoders.instantiateRunnerWireCoder(fooPC, components);
+
+    FnDataReceiver<WindowedValue<byte[]>> barReceiver = 
factory.create(barPC.getId());
+    MessageWithComponents barSdkWireCoder =
+        WireCoders.createSdkWireCoder(barPC, components, 
components::containsCoders);
+    Coder<WindowedValue<Integer>> barSdkCoder =
+        (Coder<WindowedValue<Integer>>)
+            CoderTranslation.fromProto(
+                barSdkWireCoder.getCoder(),
+                
RehydratedComponents.forComponents(barSdkWireCoder.getComponents()));
+    Coder<WindowedValue<byte[]>> barRunnerCoder =
+        WireCoders.instantiateRunnerWireCoder(barPC, components);
+
+    WindowedValue<byte[]> fooElem =
+        CoderUtils.decodeFromByteArray(
+            fooRunnerCoder,
+            CoderUtils.encodeToByteArray(
+                fooSdkCoder,
+                WindowedValue.of(
+                    "1",
+                    new Instant(120),
+                    new IntervalWindow(new Instant(0), 
Duration.standardMinutes(5)),
+                    PaneInfo.NO_FIRING)));
+    fooReceiver.accept(fooElem);
+
+    WindowedValue<byte[]> barElem =
+        CoderUtils.decodeFromByteArray(
+            barRunnerCoder,
+            CoderUtils.encodeToByteArray(
+                barSdkCoder, WindowedValue.timestampedValueInGlobalWindow(2, 
new Instant(240))));
+    barReceiver.accept(barElem);
+
+    Collection<? super WindowedValue<?>> outputs = new ArrayList<>();
+    for (UncommittedBundle<?> uncommitted : outputBundles) {
+      WindowedValue<?> output = 
getOnlyElement(uncommitted.commit(Instant.now()).getElements());
+      if (fooPC.equals(uncommitted.getPCollection())) {
+        assertThat(output, equalTo(fooElem));
+      } else if (barPC.equals(uncommitted.getPCollection())) {
+        assertThat(output, equalTo(barElem));
+      } else {
+        fail(
+            String.format(
+                "Output %s should be either 'foo' or 'bar', got '%s",
+                PCollection.class.getSimpleName(), 
uncommitted.getPCollection().getId()));
+      }
+      outputs.add(output);
+    }
+    assertThat(outputs, containsInAnyOrder(fooElem, barElem));
+  }
+}
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
new file mode 100644
index 00000000000..1c14145914a
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/DirectJobBundleFactoryTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct.portable;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link DirectJobBundleFactory}. */
+@RunWith(JUnit4.class)
+public class DirectJobBundleFactoryTest {
+  @Mock private EnvironmentFactory environmentFactory;
+  @Mock private InstructionRequestHandler instructionRequestHandler;
+
+  private ExecutorService executor = Executors.newCachedThreadPool();
+
+  private GrpcFnServer<GrpcDataService> dataServer;
+  private GrpcFnServer<GrpcStateService> stateServer;
+  private JobBundleFactory factory;
+
+  @Before
+  public void setup() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(instructionRequestHandler.handle(any(InstructionRequest.class)))
+        
.thenReturn(CompletableFuture.completedFuture(InstructionResponse.getDefaultInstance()));
+
+    InProcessServerFactory serverFactory = InProcessServerFactory.create();
+    dataServer =
+        
GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), 
serverFactory);
+    stateServer = 
GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
+
+    factory = DirectJobBundleFactory.create(environmentFactory, dataServer, 
stateServer);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    try (AutoCloseable data = dataServer;
+        AutoCloseable state = stateServer) {
+      executor.shutdownNow();
+    }
+  }
+
+  @Test
+  public void closeShutsDownEnvironments() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("Create", Create.of(1, 2, 3));
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+
+    ExecutableStage stage =
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
+            .getFusedStages()
+            .stream()
+            .findFirst()
+            .get();
+    RemoteEnvironment remoteEnv = mock(RemoteEnvironment.class);
+    
when(remoteEnv.getInstructionRequestHandler()).thenReturn(instructionRequestHandler);
+    
when(environmentFactory.createEnvironment(stage.getEnvironment())).thenReturn(remoteEnv);
+
+    factory.forStage(stage);
+    factory.close();
+    verify(remoteEnv).close();
+  }
+
+  @Test
+  public void closeShutsDownEnvironmentsWhenSomeFail() throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("Create", Create.of(1, 2, 3));
+    
p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
+
+    ExecutableStage firstEnvStage =
+        GreedyPipelineFuser.fuse(PipelineTranslation.toProto(p))
+            .getFusedStages()
+            .stream()
+            .findFirst()
+            .get();
+    ExecutableStagePayload basePayload =
+        
ExecutableStagePayload.parseFrom(firstEnvStage.toPTransform().getSpec().getPayload());
+
+    Environment secondEnv = 
Environment.newBuilder().setUrl("second_env").build();
+    ExecutableStage secondEnvStage =
+        
ExecutableStage.fromPayload(basePayload.toBuilder().setEnvironment(secondEnv).build());
+
+    Environment thirdEnv = 
Environment.newBuilder().setUrl("third_env").build();
+    ExecutableStage thirdEnvStage =
+        
ExecutableStage.fromPayload(basePayload.toBuilder().setEnvironment(thirdEnv).build());
+
+    RemoteEnvironment firstRemoteEnv = mock(RemoteEnvironment.class, "First 
Remote Env");
+    RemoteEnvironment secondRemoteEnv = mock(RemoteEnvironment.class, "Second 
Remote Env");
+    RemoteEnvironment thirdRemoteEnv = mock(RemoteEnvironment.class, "Third 
Remote Env");
+    when(environmentFactory.createEnvironment(firstEnvStage.getEnvironment()))
+        .thenReturn(firstRemoteEnv);
+    when(environmentFactory.createEnvironment(secondEnvStage.getEnvironment()))
+        .thenReturn(secondRemoteEnv);
+    when(environmentFactory.createEnvironment(thirdEnvStage.getEnvironment()))
+        .thenReturn(thirdRemoteEnv);
+    
when(firstRemoteEnv.getInstructionRequestHandler()).thenReturn(instructionRequestHandler);
+    
when(secondRemoteEnv.getInstructionRequestHandler()).thenReturn(instructionRequestHandler);
+    
when(thirdRemoteEnv.getInstructionRequestHandler()).thenReturn(instructionRequestHandler);
+
+    factory.forStage(firstEnvStage);
+    factory.forStage(secondEnvStage);
+    factory.forStage(thirdEnvStage);
+
+    IllegalStateException firstException = new IllegalStateException("first 
stage");
+    doThrow(firstException).when(firstRemoteEnv).close();
+    IllegalStateException thirdException = new IllegalStateException("third 
stage");
+    doThrow(thirdException).when(thirdRemoteEnv).close();
+
+    try {
+      factory.close();
+      fail("Factory close should have thrown");
+    } catch (IllegalStateException expected) {
+      if (expected.equals(firstException)) {
+        assertThat(ImmutableList.copyOf(expected.getSuppressed()), 
contains(thirdException));
+      } else if (expected.equals(thirdException)) {
+        assertThat(ImmutableList.copyOf(expected.getSuppressed()), 
contains(firstException));
+      } else {
+        throw expected;
+      }
+
+      verify(firstRemoteEnv).close();
+      verify(secondRemoteEnv).close();
+      verify(thirdRemoteEnv).close();
+    }
+  }
+}
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerator.java 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerator.java
index 00d70b1fcad..39e2a968159 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerator.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/IdGenerator.java
@@ -1,27 +1,22 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 package org.apache.beam.sdk.fn;
 
-
 /**
  * A generator of unique IDs.
  */
diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/package-info.java 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/package-info.java
index 9916e20ee53..2471d61dd52 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/package-info.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/package-info.java
@@ -1,23 +1,19 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 /** The top level package for the Fn Execution Java libraries. */
 package org.apache.beam.sdk.fn;
diff --git 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/IdGeneratorsTest.java
 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/IdGeneratorsTest.java
index 624a584e066..8dfbf17956b 100644
--- 
a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/IdGeneratorsTest.java
+++ 
b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/IdGeneratorsTest.java
@@ -1,24 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 package org.apache.beam.sdk.fn;
 
 import static org.hamcrest.Matchers.equalTo;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104679)
    Time Spent: 11h 20m  (was: 11h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3326
>                 URL: https://issues.apache.org/jira/browse/BEAM-3326
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to