[ 
https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=172909&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172909
 ]

ASF GitHub Bot logged work on BEAM-5850:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Dec/18 01:37
            Start Date: 07/Dec/18 01:37
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #6786: [BEAM-5850] Add 
QueueingBeamFnDataClient and make process, finish and start run on the same 
thread to support metrics.
URL: https://github.com/apache/beam/pull/6786
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 748efb47005a..f53257fe07f6 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -21,6 +21,7 @@
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Optional;
@@ -278,6 +279,91 @@ public void process(ProcessContext ctxt) {
     }
   }
 
+  @Test
+  public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() 
throws Exception {
+    Pipeline p = Pipeline.create();
+    p.apply("impulse", Impulse.create())
+        .apply(
+            "create",
+            ParDo.of(
+                new DoFn<byte[], KV<String, String>>() {
+                  @ProcessElement
+                  public void process(ProcessContext ctxt) throws Exception {
+                    String element =
+                        CoderUtils.decodeFromByteArray(StringUtf8Coder.of(), 
ctxt.element());
+                    if (element.equals("X")) {
+                      throw new Exception("testBundleExecutionFailure");
+                    }
+                    ctxt.output(KV.of(element, element));
+                  }
+                }))
+        .apply("gbk", GroupByKey.create());
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
+    checkState(fused.getFusedStages().size() == 1, "Expected exactly one fused 
stage");
+    ExecutableStage stage = fused.getFusedStages().iterator().next();
+
+    ExecutableProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptors.fromExecutableStage(
+            "my_stage", stage, dataServer.getApiServiceDescriptor());
+
+    BundleProcessor processor =
+        controlClient.getProcessor(
+            descriptor.getProcessBundleDescriptor(), 
descriptor.getRemoteInputDestinations());
+    Map<Target, ? super Coder<WindowedValue<?>>> outputTargets = 
descriptor.getOutputTargetCoders();
+    Map<Target, Collection<? super WindowedValue<?>>> outputValues = new 
HashMap<>();
+    Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+    for (Entry<Target, ? super Coder<WindowedValue<?>>> targetCoder : 
outputTargets.entrySet()) {
+      List<? super WindowedValue<?>> outputContents =
+          Collections.synchronizedList(new ArrayList<>());
+      outputValues.put(targetCoder.getKey(), outputContents);
+      outputReceivers.put(
+          targetCoder.getKey(),
+          RemoteOutputReceiver.of(
+              (Coder) targetCoder.getValue(),
+              (FnDataReceiver<? super WindowedValue<?>>) outputContents::add));
+    }
+
+    try (ActiveBundle bundle =
+        processor.newBundle(outputReceivers, BundleProgressHandler.ignored())) 
{
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(
+              WindowedValue.valueInGlobalWindow(
+                  CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
+    }
+
+    try {
+      try (ActiveBundle bundle =
+          processor.newBundle(outputReceivers, 
BundleProgressHandler.ignored())) {
+        Iterables.getOnlyElement(bundle.getInputReceivers().values())
+            .accept(
+                WindowedValue.valueInGlobalWindow(
+                    CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+      }
+      // Fail the test if we reach this point and never threw the exception.
+      fail();
+    } catch (ExecutionException e) {
+      assertTrue(e.getMessage().contains("testBundleExecutionFailure"));
+    }
+
+    try (ActiveBundle bundle =
+        processor.newBundle(outputReceivers, BundleProgressHandler.ignored())) 
{
+      Iterables.getOnlyElement(bundle.getInputReceivers().values())
+          .accept(
+              WindowedValue.valueInGlobalWindow(
+                  CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Z")));
+    }
+
+    for (Collection<? super WindowedValue<?>> windowedValues : 
outputValues.values()) {
+      assertThat(
+          windowedValues,
+          containsInAnyOrder(
+              WindowedValue.valueInGlobalWindow(kvBytes("Y", "Y")),
+              WindowedValue.valueInGlobalWindow(kvBytes("Z", "Z"))));
+    }
+  }
+
   @Test
   public void testExecutionWithSideInput() throws Exception {
     Pipeline p = Pipeline.create();
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 3c9d18367942..e22a6c2b1c39 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -41,6 +41,7 @@
 import org.apache.beam.fn.harness.PTransformRunnerFactory;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.data.QueueingBeamFnDataClient;
 import org.apache.beam.fn.harness.state.BeamFnStateClient;
 import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -138,6 +139,7 @@ public ProcessBundleHandler(
 
   private void createRunnerAndConsumersForPTransformRecursively(
       BeamFnStateClient beamFnStateClient,
+      BeamFnDataClient queueingClient,
       String pTransformId,
       PTransform pTransform,
       Supplier<String> processBundleInstructionId,
@@ -158,6 +160,7 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
       for (String consumingPTransformId : 
pCollectionIdsToConsumingPTransforms.get(pCollectionId)) {
         createRunnerAndConsumersForPTransformRecursively(
             beamFnStateClient,
+            queueingClient,
             consumingPTransformId,
             
processBundleDescriptor.getTransformsMap().get(consumingPTransformId),
             processBundleInstructionId,
@@ -188,7 +191,7 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
           .getOrDefault(pTransform.getSpec().getUrn(), 
defaultPTransformRunnerFactory)
           .createRunnerForPTransform(
               options,
-              beamFnDataClient,
+              queueingClient,
               beamFnStateClient,
               pTransformId,
               pTransform,
@@ -204,8 +207,17 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
     }
   }
 
+  /**
+   * Processes a bundle, running the start(), process(), and finish() 
functions. This function is
+   * required to be reentrant.
+   */
   public BeamFnApi.InstructionResponse.Builder 
processBundle(BeamFnApi.InstructionRequest request)
       throws Exception {
+    // Note: We must create one instance of the QueueingBeamFnDataClient as it 
is designed to
+    // handle the life of a bundle. It will insert elements onto a queue and 
drain them off so all
+    // process() calls will execute on this thread when 
queueingClient.drainAndBlock() is called.
+    QueueingBeamFnDataClient queueingClient = new 
QueueingBeamFnDataClient(this.beamFnDataClient);
+
     String bundleId = 
request.getProcessBundle().getProcessBundleDescriptorReference();
     BeamFnApi.ProcessBundleDescriptor bundleDescriptor =
         (BeamFnApi.ProcessBundleDescriptor) fnApiRegistry.apply(bundleId);
@@ -255,6 +267,7 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
       // Create a BeamFnStateClient
       for (Map.Entry<String, RunnerApi.PTransform> entry :
           bundleDescriptor.getTransformsMap().entrySet()) {
+
         // Skip anything which isn't a root
         // TODO: Remove source as a root and have it be triggered by the 
Runner.
         if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn())
@@ -266,6 +279,7 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
 
         createRunnerAndConsumersForPTransformRecursively(
             beamFnStateClient,
+            queueingClient,
             entry.getKey(),
             entry.getValue(),
             request::getInstructionId,
@@ -284,6 +298,8 @@ private void 
createRunnerAndConsumersForPTransformRecursively(
         startFunction.run();
       }
 
+      queueingClient.drainAndBlock();
+
       // Need to reverse this since we want to call finish in topological 
order.
       for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctions)) {
         LOG.debug("Finishing function {}", finishFunction);
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
new file mode 100644
index 000000000000..194672d1092b
--- /dev/null
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClient.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.fn.harness.control.ProcessBundleHandler;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.data.InboundDataClient;
+import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link BeamFnDataClient} that queues elements so that they can be 
consumed and processed in the
+ * thread which calls @{link #drainAndBlock}.
+ */
+public class QueueingBeamFnDataClient implements BeamFnDataClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueueingBeamFnDataClient.class);
+
+  private final BeamFnDataClient mainClient;
+  private final SynchronousQueue<ConsumerAndData> queue;
+  private final ConcurrentHashMap<InboundDataClient, Object> 
inboundDataClients;
+
+  public QueueingBeamFnDataClient(BeamFnDataClient mainClient) {
+    this.mainClient = mainClient;
+    this.queue = new SynchronousQueue<>();
+    this.inboundDataClients = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  public <T> InboundDataClient receive(
+      ApiServiceDescriptor apiServiceDescriptor,
+      LogicalEndpoint inputLocation,
+      Coder<WindowedValue<T>> coder,
+      FnDataReceiver<WindowedValue<T>> consumer) {
+    LOG.debug(
+        "Registering consumer for instruction {} and target {}",
+        inputLocation.getInstructionId(),
+        inputLocation.getTarget());
+
+    QueueingFnDataReceiver<T> queueingConsumer = new 
QueueingFnDataReceiver<T>(consumer);
+    InboundDataClient inboundDataClient =
+        this.mainClient.receive(apiServiceDescriptor, inputLocation, coder, 
queueingConsumer);
+    queueingConsumer.inboundDataClient = inboundDataClient;
+    this.inboundDataClients.computeIfAbsent(
+        inboundDataClient, (InboundDataClient idcToStore) -> idcToStore);
+    return inboundDataClient;
+  }
+
+  // Returns true if all the InboundDataClients have finished or cancelled.
+  private boolean allDone() {
+    for (InboundDataClient inboundDataClient : inboundDataClients.keySet()) {
+      if (!inboundDataClient.isDone()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Drains the internal queue of this class, by waiting for all 
WindowedValues to be passed to
+   * their consumers. The thread which wishes to process() the elements should 
call this method, as
+   * this will cause the consumers to invoke element processing. All receive() 
and send() calls must
+   * be made prior to calling drainAndBlock, in order to properly terminate.
+   *
+   * <p>All {@link InboundDataClient}s will be failed if processing throws an 
exception.
+   *
+   * <p>This method is NOT thread safe. This should only be invoked by a 
single thread, and is
+   * intended for use with a newly constructed QueueingBeamFnDataClient in 
{@link
+   * ProcessBundleHandler#processBundle(InstructionRequest)}.
+   */
+  public void drainAndBlock() throws Exception {
+    while (true) {
+      try {
+        ConsumerAndData tuple = queue.poll(200, TimeUnit.MILLISECONDS);
+        if (tuple != null) {
+          // Forward to the consumers who cares about this data.
+          tuple.consumer.accept(tuple.data);
+        } else {
+          // Note: We do not expect to ever hit this point without receiving 
all values
+          // as (1) The InboundObserver will not be set to Done until the
+          // QueuingFnDataReceiver.accept() call returns and will not be 
invoked again.
+          // (2) The QueueingFnDataReceiver will not return until the value is 
received in
+          // drainAndBlock, because of the use of the SynchronousQueue.
+          if (allDone()) {
+            break;
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Client failed to dequeue and process WindowedValue", e);
+        for (InboundDataClient inboundDataClient : 
inboundDataClients.keySet()) {
+          inboundDataClient.fail(e);
+        }
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
+      LogicalEndpoint outputLocation,
+      Coder<WindowedValue<T>> coder) {
+    LOG.debug(
+        "Creating output consumer for instruction {} and target {}",
+        outputLocation.getInstructionId(),
+        outputLocation.getTarget());
+    return this.mainClient.send(apiServiceDescriptor, outputLocation, coder);
+  }
+
+  /**
+   * The QueueingFnDataReceiver is a a FnDataReceiver used by the 
QueueingBeamFnDataClient.
+   *
+   * <p>All {@link #accept accept()ed} values will be put onto a synchronous 
queue which will cause
+   * the calling thread to block until {@link 
QueueingBeamFnDataClient#drainAndBlock} is called.
+   * {@link QueueingBeamFnDataClient#drainAndBlock} is responsible for 
processing values from the
+   * queue.
+   */
+  public class QueueingFnDataReceiver<T> implements 
FnDataReceiver<WindowedValue<T>> {
+    private final FnDataReceiver<WindowedValue<T>> consumer;
+    public InboundDataClient inboundDataClient;
+
+    public QueueingFnDataReceiver(FnDataReceiver<WindowedValue<T>> consumer) {
+      this.consumer = consumer;
+    }
+
+    /**
+     * This method is thread safe, we expect multiple threads to call this, 
passing in data when new
+     * data arrives via the QueueingBeamFnDataClient's mainClient.
+     */
+    @Override
+    public void accept(WindowedValue<T> value) throws Exception {
+      try {
+        ConsumerAndData offering = new ConsumerAndData(this.consumer, value);
+        while (!queue.offer(offering, 200, TimeUnit.MILLISECONDS)) {
+          if (inboundDataClient.isDone()) {
+            // If it was cancelled by the consuming side of the queue.
+            break;
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to insert WindowedValue into the queue", e);
+        inboundDataClient.fail(e);
+        throw e;
+      }
+    }
+  }
+
+  static class ConsumerAndData<T> {
+    public FnDataReceiver<WindowedValue<T>> consumer;
+    public WindowedValue<T> data;
+
+    public ConsumerAndData(FnDataReceiver<WindowedValue<T>> receiver, 
WindowedValue<T> data) {
+      this.consumer = receiver;
+      this.data = data;
+    }
+  }
+}
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
new file mode 100644
index 000000000000..3bb77f741fe0
--- /dev/null
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/QueueingBeamFnDataClientTest.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.data;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
+import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.fn.data.InboundDataClient;
+import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
+import org.apache.beam.sdk.fn.test.TestExecutors;
+import org.apache.beam.sdk.fn.test.TestExecutors.TestExecutorService;
+import org.apache.beam.sdk.fn.test.TestStreams;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.grpc.inprocess.InProcessChannelBuilder;
+import 
org.apache.beam.vendor.grpc.v1_13_1.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests for {@link QueueingBeamFnDataClient}. */
+@RunWith(JUnit4.class)
+public class QueueingBeamFnDataClientTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueueingBeamFnDataClientTest.class);
+
+  @Rule public TestExecutorService executor = 
TestExecutors.from(Executors::newCachedThreadPool);
+
+  private static final Coder<WindowedValue<String>> CODER =
+      LengthPrefixCoder.of(
+          WindowedValue.getFullCoder(StringUtf8Coder.of(), 
GlobalWindow.Coder.INSTANCE));
+  private static final LogicalEndpoint ENDPOINT_A =
+      LogicalEndpoint.of(
+          "12L",
+          
Target.newBuilder().setPrimitiveTransformReference("34L").setName("targetA").build());
+
+  private static final LogicalEndpoint ENDPOINT_B =
+      LogicalEndpoint.of(
+          "56L",
+          BeamFnApi.Target.newBuilder()
+              .setPrimitiveTransformReference("78L")
+              .setName("targetB")
+              .build());
+
+  private static final BeamFnApi.Elements ELEMENTS_A_1;
+  private static final BeamFnApi.Elements ELEMENTS_A_2;
+  private static final BeamFnApi.Elements ELEMENTS_B_1;
+
+  static {
+    try {
+      ELEMENTS_A_1 =
+          BeamFnApi.Elements.newBuilder()
+              .addData(
+                  BeamFnApi.Elements.Data.newBuilder()
+                      .setInstructionReference(ENDPOINT_A.getInstructionId())
+                      .setTarget(ENDPOINT_A.getTarget())
+                      .setData(
+                          ByteString.copyFrom(encodeToByteArray(CODER, 
valueInGlobalWindow("ABC")))
+                              .concat(
+                                  ByteString.copyFrom(
+                                      encodeToByteArray(CODER, 
valueInGlobalWindow("DEF"))))))
+              .build();
+      ELEMENTS_A_2 =
+          BeamFnApi.Elements.newBuilder()
+              .addData(
+                  BeamFnApi.Elements.Data.newBuilder()
+                      .setInstructionReference(ENDPOINT_A.getInstructionId())
+                      .setTarget(ENDPOINT_A.getTarget())
+                      .setData(
+                          ByteString.copyFrom(
+                              encodeToByteArray(CODER, 
valueInGlobalWindow("GHI")))))
+              .addData(
+                  BeamFnApi.Elements.Data.newBuilder()
+                      .setInstructionReference(ENDPOINT_A.getInstructionId())
+                      .setTarget(ENDPOINT_A.getTarget()))
+              .build();
+      ELEMENTS_B_1 =
+          BeamFnApi.Elements.newBuilder()
+              .addData(
+                  BeamFnApi.Elements.Data.newBuilder()
+                      .setInstructionReference(ENDPOINT_B.getInstructionId())
+                      .setTarget(ENDPOINT_B.getTarget())
+                      .setData(
+                          ByteString.copyFrom(encodeToByteArray(CODER, 
valueInGlobalWindow("JKL")))
+                              .concat(
+                                  ByteString.copyFrom(
+                                      encodeToByteArray(CODER, 
valueInGlobalWindow("MNO"))))))
+              .addData(
+                  BeamFnApi.Elements.Data.newBuilder()
+                      .setInstructionReference(ENDPOINT_B.getInstructionId())
+                      .setTarget(ENDPOINT_B.getTarget()))
+              .build();
+    } catch (Exception e) {
+      throw new ExceptionInInitializerError(e);
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testBasicInboundConsumerBehaviour() throws Exception {
+    CountDownLatch waitForClientToConnect = new CountDownLatch(1);
+    CountDownLatch receiveAllValuesA = new CountDownLatch(3);
+    CountDownLatch receiveAllValuesB = new CountDownLatch(2);
+    Collection<WindowedValue<String>> inboundValuesA = new 
ConcurrentLinkedQueue<>();
+    Collection<WindowedValue<String>> inboundValuesB = new 
ConcurrentLinkedQueue<>();
+    Collection<BeamFnApi.Elements> inboundServerValues = new 
ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver 
=
+        new AtomicReference<>();
+    CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
+        TestStreams.withOnNext(inboundServerValues::add).build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
+            .build();
+    Server server =
+        InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+            .addService(
+                new BeamFnDataGrpc.BeamFnDataImplBase() {
+                  @Override
+                  public StreamObserver<BeamFnApi.Elements> data(
+                      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+                    outboundServerObserver.set(outboundObserver);
+                    waitForClientToConnect.countDown();
+                    return inboundServerObserver;
+                  }
+                })
+            .build();
+    server.start();
+    try {
+      ManagedChannel channel =
+          
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+      BeamFnDataGrpcClient clientFactory =
+          new BeamFnDataGrpcClient(
+              PipelineOptionsFactory.create(),
+              (Endpoints.ApiServiceDescriptor descriptor) -> channel,
+              OutboundObserverFactory.trivial());
+      QueueingBeamFnDataClient queueingClient = new 
QueueingBeamFnDataClient(clientFactory);
+
+      InboundDataClient readFutureA =
+          queueingClient.receive(
+              apiServiceDescriptor,
+              ENDPOINT_A,
+              CODER,
+              (WindowedValue<String> wv) -> {
+                inboundValuesA.add(wv);
+                receiveAllValuesA.countDown();
+              });
+
+      waitForClientToConnect.await();
+
+      Future<?> sendElementsFuture =
+          executor.submit(
+              () -> {
+                outboundServerObserver.get().onNext(ELEMENTS_A_1);
+                // Purposefully transmit some data before the consumer for B 
is bound showing that
+                // data is not lost
+                outboundServerObserver.get().onNext(ELEMENTS_B_1);
+              });
+
+      // This can be compeleted before we get values?
+      InboundDataClient readFutureB =
+          queueingClient.receive(
+              apiServiceDescriptor,
+              ENDPOINT_B,
+              CODER,
+              (WindowedValue<String> wv) -> {
+                inboundValuesB.add(wv);
+                receiveAllValuesB.countDown();
+              });
+
+      Future<?> drainElementsFuture =
+          executor.submit(
+              () -> {
+                try {
+                  queueingClient.drainAndBlock();
+                } catch (Exception e) {
+                  LOG.error("Failed ", e);
+                  fail();
+                }
+              });
+
+      receiveAllValuesB.await();
+      assertThat(inboundValuesB, contains(valueInGlobalWindow("JKL"), 
valueInGlobalWindow("MNO")));
+
+      outboundServerObserver.get().onNext(ELEMENTS_A_2);
+
+      receiveAllValuesA.await(); // Wait for A's values to be available
+      assertThat(
+          inboundValuesA,
+          contains(
+              valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), 
valueInGlobalWindow("GHI")));
+
+      // Wait for these threads to terminate
+      sendElementsFuture.get();
+      drainElementsFuture.get();
+    } finally {
+      server.shutdownNow();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testBundleProcessorThrowsExecutionExceptionWhenUserCodeThrows() 
throws Exception {
+    CountDownLatch waitForClientToConnect = new CountDownLatch(1);
+    //Collection<WindowedValue<String>> inboundValuesA = new 
ConcurrentLinkedQueue<>();
+    Collection<WindowedValue<String>> inboundValuesB = new 
ConcurrentLinkedQueue<>();
+    Collection<BeamFnApi.Elements> inboundServerValues = new 
ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.Elements>> outboundServerObserver 
=
+        new AtomicReference<>();
+    CallStreamObserver<BeamFnApi.Elements> inboundServerObserver =
+        TestStreams.withOnNext(inboundServerValues::add).build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
+            .build();
+    Server server =
+        InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+            .addService(
+                new BeamFnDataGrpc.BeamFnDataImplBase() {
+                  @Override
+                  public StreamObserver<BeamFnApi.Elements> data(
+                      StreamObserver<BeamFnApi.Elements> outboundObserver) {
+                    outboundServerObserver.set(outboundObserver);
+                    waitForClientToConnect.countDown();
+                    return inboundServerObserver;
+                  }
+                })
+            .build();
+    server.start();
+    try {
+      ManagedChannel channel =
+          
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+
+      BeamFnDataGrpcClient clientFactory =
+          new BeamFnDataGrpcClient(
+              PipelineOptionsFactory.create(),
+              (Endpoints.ApiServiceDescriptor descriptor) -> channel,
+              OutboundObserverFactory.trivial());
+      QueueingBeamFnDataClient queueingClient = new 
QueueingBeamFnDataClient(clientFactory);
+
+      InboundDataClient readFutureA =
+          queueingClient.receive(
+              apiServiceDescriptor,
+              ENDPOINT_A,
+              CODER,
+              (WindowedValue<String> wv) -> {
+                throw new RuntimeException("Intentionally fail!"); // Error 
injected here.
+              });
+
+      waitForClientToConnect.await();
+
+      Future<?> sendElementsFuture =
+          executor.submit(
+              () -> {
+                outboundServerObserver.get().onNext(ELEMENTS_A_1);
+                // Purposefully transmit some data before the consumer for B 
is bound showing that
+                // data is not lost
+                outboundServerObserver.get().onNext(ELEMENTS_B_1);
+              });
+
+      InboundDataClient readFutureB =
+          queueingClient.receive(
+              apiServiceDescriptor,
+              ENDPOINT_B,
+              CODER,
+              (WindowedValue<String> wv) -> {
+                inboundValuesB.add(wv);
+              });
+
+      Future<?> drainElementsFuture =
+          executor.submit(
+              () -> {
+                boolean intentionallyFailed = false;
+                try {
+                  queueingClient.drainAndBlock();
+                } catch (RuntimeException e) {
+                  intentionallyFailed = true;
+                } catch (Exception e) {
+                  LOG.error("Unintentional failure", e);
+                  fail();
+                }
+                assertTrue(intentionallyFailed);
+              });
+
+      // Fail all InboundObservers if any of the downstream consumers fail.
+      // This allows the ProcessBundlerHandler to unblock everything and fail 
properly.
+      boolean intentionallyFailedA = false;
+      try {
+        readFutureA.awaitCompletion();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof RuntimeException) {
+          intentionallyFailedA = true;
+        }
+      }
+      assertTrue(intentionallyFailedA);
+
+      boolean intentionallyFailedB = false;
+      try {
+        readFutureB.awaitCompletion();
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof RuntimeException) {
+          intentionallyFailedB = true;
+        }
+      } catch (Exception e) {
+        intentionallyFailedB = true;
+      }
+      assertTrue(intentionallyFailedB);
+
+      // Wait for these threads to terminate
+      sendElementsFuture.get();
+      drainElementsFuture.get();
+    } finally {
+      server.shutdownNow();
+    }
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 172909)
    Time Spent: 16h 10m  (was: 16h)

> Make process, finish and start run on the same thread to support metrics.
> -------------------------------------------------------------------------
>
>                 Key: BEAM-5850
>                 URL: https://issues.apache.org/jira/browse/BEAM-5850
>             Project: Beam
>          Issue Type: New Feature
>          Components: java-fn-execution
>            Reporter: Alex Amato
>            Assignee: Alex Amato
>            Priority: Major
>          Time Spent: 16h 10m
>  Remaining Estimate: 0h
>
> Update BeamFnDataReceiver to place elements into a Queue and consumer then 
> and call the element processing receiver in blockTillReadFinishes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to