[ 
https://issues.apache.org/jira/browse/BEAM-3418?focusedWorklogId=86214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86214
 ]

ASF GitHub Bot logged work on BEAM-3418:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Mar/18 23:52
            Start Date: 30/Mar/18 23:52
    Worklog Time Spent: 10m 
      Work Description: angoenka commented on a change in pull request #4980: 
[BEAM-3418] Support multiple SDKHarness in RunnerHarness
URL: https://github.com/apache/beam/pull/4980#discussion_r178409410
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
 ##########
 @@ -88,34 +92,48 @@ public void close() {
     closeAndTerminateOutstandingRequests(new IllegalStateException("Runner 
closed connection"));
   }
 
+  public String getWorkerId() {
+    return workerId;
+  }
+
   /** Closes this client and terminates any outstanding requests 
exceptionally. */
   private void closeAndTerminateOutstandingRequests(Throwable cause) {
     if (isClosed.getAndSet(true)) {
       return;
     }
 
-    // Make a copy of the map to make the view of the outstanding requests 
consistent.
-    Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequestsCopy =
-        new ConcurrentHashMap<>(outstandingRequests);
-    outstandingRequests.clear();
+    try {
+      // Make a copy of the map to make the view of the outstanding requests 
consistent.
+      Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> 
outstandingRequestsCopy =
+          new ConcurrentHashMap<>(outstandingRequests);
+      outstandingRequests.clear();
 
-    if (outstandingRequestsCopy.isEmpty()) {
-      requestReceiver.onCompleted();
-      return;
-    }
-    requestReceiver.onError(
-        new 
StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
-
-    LOG.error(
-        "{} closed, clearing outstanding requests {}",
-        FnApiControlClient.class.getSimpleName(),
-        outstandingRequestsCopy);
-    for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
-        outstandingRequestsCopy.values()) {
-      outstandingRequest.completeExceptionally(cause);
+      if (outstandingRequestsCopy.isEmpty()) {
+        requestReceiver.onCompleted();
+        return;
+      }
+      requestReceiver.onError(
+          new 
StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
+
+      LOG.error(
+          "{} closed, clearing outstanding requests {}",
+          FnApiControlClient.class.getSimpleName(),
+          outstandingRequestsCopy);
+      for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest 
:
+          outstandingRequestsCopy.values()) {
+        outstandingRequest.completeExceptionally(cause);
+      }
+    } finally {
+      if (onCloseListener != null) {
+        onCloseListener.accept(this);
+      }
     }
   }
 
+  public void onClose(Consumer<FnApiControlClient> onCloseListener) {
 
 Review comment:
   Changed to call all the registered listeners on close.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 86214)
    Time Spent: 4h 50m  (was: 4h 40m)

> Python Fnapi - Support Multiple SDK workers on a single VM
> ----------------------------------------------------------
>
>                 Key: BEAM-3418
>                 URL: https://issues.apache.org/jira/browse/BEAM-3418
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-harness
>            Reporter: Ankur Goenka
>            Assignee: Ankur Goenka
>            Priority: Major
>              Labels: performance, portability
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> Support multiple python SDK process on a VM to fully utilize a machine.
> Each SDK Process will work in isolation and interact with Runner HarnessĀ 
> independently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to