[ https://issues.apache.org/jira/browse/BEAM-3418?focusedWorklogId=86214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86214 ]
ASF GitHub Bot logged work on BEAM-3418: ---------------------------------------- Author: ASF GitHub Bot Created on: 30/Mar/18 23:52 Start Date: 30/Mar/18 23:52 Worklog Time Spent: 10m Work Description: angoenka commented on a change in pull request #4980: [BEAM-3418] Support multiple SDKHarness in RunnerHarness URL: https://github.com/apache/beam/pull/4980#discussion_r178409410 ########## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java ########## @@ -88,34 +92,48 @@ public void close() { closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection")); } + public String getWorkerId() { + return workerId; + } + /** Closes this client and terminates any outstanding requests exceptionally. */ private void closeAndTerminateOutstandingRequests(Throwable cause) { if (isClosed.getAndSet(true)) { return; } - // Make a copy of the map to make the view of the outstanding requests consistent. - Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy = - new ConcurrentHashMap<>(outstandingRequests); - outstandingRequests.clear(); + try { + // Make a copy of the map to make the view of the outstanding requests consistent. + Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy = + new ConcurrentHashMap<>(outstandingRequests); + outstandingRequests.clear(); - if (outstandingRequestsCopy.isEmpty()) { - requestReceiver.onCompleted(); - return; - } - requestReceiver.onError( - new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage()))); - - LOG.error( - "{} closed, clearing outstanding requests {}", - FnApiControlClient.class.getSimpleName(), - outstandingRequestsCopy); - for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest : - outstandingRequestsCopy.values()) { - outstandingRequest.completeExceptionally(cause); + if (outstandingRequestsCopy.isEmpty()) { + requestReceiver.onCompleted(); + return; + } + requestReceiver.onError( + new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage()))); + + LOG.error( + "{} closed, clearing outstanding requests {}", + FnApiControlClient.class.getSimpleName(), + outstandingRequestsCopy); + for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest : + outstandingRequestsCopy.values()) { + outstandingRequest.completeExceptionally(cause); + } + } finally { + if (onCloseListener != null) { + onCloseListener.accept(this); + } } } + public void onClose(Consumer<FnApiControlClient> onCloseListener) { Review comment: Changed to call all the registered listeners on close. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 86214) Time Spent: 4h 50m (was: 4h 40m) > Python Fnapi - Support Multiple SDK workers on a single VM > ---------------------------------------------------------- > > Key: BEAM-3418 > URL: https://issues.apache.org/jira/browse/BEAM-3418 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness > Reporter: Ankur Goenka > Assignee: Ankur Goenka > Priority: Major > Labels: performance, portability > Time Spent: 4h 50m > Remaining Estimate: 0h > > Support multiple python SDK process on a VM to fully utilize a machine. > Each SDK Process will work in isolation and interact with Runner HarnessĀ > independently. -- This message was sent by Atlassian JIRA (v7.6.3#76005)