[ 
https://issues.apache.org/jira/browse/BEAM-5850?focusedWorklogId=165143&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-165143
 ]

ASF GitHub Bot logged work on BEAM-5850:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Nov/18 21:10
            Start Date: 12/Nov/18 21:10
    Worklog Time Spent: 10m 
      Work Description: ajamato commented on a change in pull request #6786: 
[BEAM-5850] Add BeamFnDataReceiver and make process, finish and start run on 
the same thread to support metrics.
URL: https://github.com/apache/beam/pull/6786#discussion_r232812669
 
 

 ##########
 File path: 
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java
 ##########
 @@ -123,43 +141,142 @@ public void close() {
    * it is ready to consume that data.
    */
   private final class InboundObserver implements 
StreamObserver<BeamFnApi.Elements> {
+
+    private final ConcurrentHashMap<String, SynchronousQueue<Data>> 
instructionIdToQueue;
+    private final ConcurrentHashMap<String, HashSet<Consumer<Data>>> 
instructionIdToConsumers;
+
+    public InboundObserver(boolean enableQueuing) {
+      this.instructionIdToQueue =
+          enableQueuing ? new ConcurrentHashMap<String, 
SynchronousQueue<Data>>() : null;
+      this.instructionIdToConsumers = new ConcurrentHashMap<String, 
HashSet<Consumer<Data>>>();
+    }
+
+    private void registerConsumer(
+        String instructionId, Consumer<BeamFnApi.Elements.Data> consumer) {
+      this.instructionIdToConsumers.putIfAbsent(instructionId, new 
HashSet<Consumer<Data>>());
+      this.instructionIdToConsumers.get(instructionId).add(consumer);
+    }
+
+    private boolean isFinalizationData(BeamFnApi.Elements.Data data) {
+      return data.getData().isEmpty();
+    }
+
+    // TODO overall error handling needs an audit and possibly we need to be 
creative about
+    // testing here. Maybe handleData should just throw all exceptions, and 
catch them
+    // and call onError in the calling code?
+    private void handleData(BeamFnApi.Elements.Data data) {
+      try {
+        LogicalEndpoint key = 
LogicalEndpoint.of(data.getInstructionReference(), data.getTarget());
+        CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = 
receiverFuture(key);
+        if (!consumer.isDone()) {
+          LOG.debug(
+              "Received data for key {} without consumer ready. "
+                  + "Waiting for consumer to be registered.",
+              key);
+        }
+        consumer.get().accept(data);
+        if (isFinalizationData(data)) {
+          consumers.remove(key);
+        }
+        /*
+         * TODO: On failure we should fail any bundles that were impacted 
eagerly
+         * instead of relying on the Runner harness to do all the failure 
handling.
+         */
+      } catch (ExecutionException | InterruptedException e) {
+        LOG.error(
+            "Client interrupted during handling of data for instruction {} and 
target {}",
+            data.getInstructionReference(),
+            data.getTarget(),
+            e);
+        outboundObserver.onError(e);
+      } catch (RuntimeException e) {
+        LOG.error(
+            "Client failed to handle data for instruction {} and target {}",
+            data.getInstructionReference(),
+            data.getTarget(),
+            e);
+        outboundObserver.onError(e);
+      }
+    }
+
+    private boolean IsQueueingEnabled() {
+      return instructionIdToQueue != null;
+    }
+
+
+    // TODO please help reveiw the concurrency aspect here. Questions:
+    // - Safe use of ConcurrentHashMap?
+    // - Unnecessary use of ConcurrentHashMap?
+    // - Is there some code that can run concurrently now, which was not 
before?
     @Override
     public void onNext(BeamFnApi.Elements value) {
       for (BeamFnApi.Elements.Data data : value.getDataList()) {
         try {
-          LogicalEndpoint key =
-              LogicalEndpoint.of(data.getInstructionReference(), 
data.getTarget());
-          CompletableFuture<Consumer<BeamFnApi.Elements.Data>> consumer = 
receiverFuture(key);
-          if (!consumer.isDone()) {
-            LOG.debug(
-                "Received data for key {} without consumer ready. "
-                    + "Waiting for consumer to be registered.",
-                key);
-          }
-          consumer.get().accept(data);
-          if (data.getData().isEmpty()) {
-            consumers.remove(key);
+          if (IsQueueingEnabled()) {
+            // TODO Is there a cleaner way do to this?
+            // I was trying to avoid always doing multiple lookups, and always 
creating
+            // a new instance on the heap
+            SynchronousQueue<Data> queue = 
instructionIdToQueue.get(data.getInstructionReference());
+            if (queue == null) {
+              instructionIdToQueue.putIfAbsent(
+                  data.getInstructionReference(), new 
SynchronousQueue<Data>());
+              // Lookup the queue again incase there was a race and another 
thread added one.
+              queue = instructionIdToQueue.get(data.getInstructionReference());
+            }
+
+            // TODO, how do we make this throw an interrupted exception? And 
when should we
+            // do this?
+            // TODO is there any concurrency issue here? Can multiple threads 
call onNext?
 
 Review comment:
   ack

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 165143)
    Time Spent: 4h 10m  (was: 4h)

> Make process, finish and start run on the same thread to support metrics.
> -------------------------------------------------------------------------
>
>                 Key: BEAM-5850
>                 URL: https://issues.apache.org/jira/browse/BEAM-5850
>             Project: Beam
>          Issue Type: New Feature
>          Components: java-fn-execution
>            Reporter: Alex Amato
>            Assignee: Alex Amato
>            Priority: Major
>          Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Update BeamFnDataReceiver to place elements into a Queue and consumer then 
> and call the element processing receiver in blockTillReadFinishes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to