This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this push: new 2439837c7 Synchronize calls to StreamObserver methods (#2934) 2439837c7 is described below commit 2439837c7ef44837883306d15a567de3a5d824d2 Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Tue Oct 8 11:22:05 2024 -0700 Synchronize calls to StreamObserver methods (#2934) - Synchronized calls to onNext, onError, and onComplete to prevent grpc failures - Ensured thread safety for StreamObserver method invocations --- .../helix/manager/zk/ParticipantManager.java | 2 +- .../channel/HelixGatewayServiceGrpcService.java | 56 ++++++++++++++-------- .../HelixGatewayServicePollModeChannel.java | 2 +- .../apache/helix/gateway/util/PollChannelUtil.java | 4 +- 4 files changed, 41 insertions(+), 23 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index 2044dbcd7..20910c891 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -394,7 +394,7 @@ public class ParticipantManager { continue; } - // If the the current state is related to tasks, there is no need to carry it over to new session. + // If the current state is related to tasks, there is no need to carry it over to new session. // Note: this check is not necessary due to TaskCurrentStates, but keep it for backwards compatibility if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) { continue; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index 6a2c76b8a..c83b01583 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -97,6 +97,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli @Override public void onError(Throwable t) { logger.info("Receive on error, reason: {} message: {}", Status.fromThrowable(t).getCode(), t.getMessage()); + // Notify the gateway manager that the client is closed Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); onClientClose(instanceInfo.getRight(), instanceInfo.getLeft()); } @@ -104,6 +105,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli @Override public void onCompleted() { logger.info("Receive on complete message"); + // Notify the gateway manager that the client is closed Pair<String, String> instanceInfo = _reversedObserverMap.get(responseObserver); onClientClose(instanceInfo.getRight(), instanceInfo.getLeft()); } @@ -119,12 +121,19 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli */ @Override public void sendStateChangeRequests(String instanceName, ShardChangeRequests requests) { - StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName); - if (observer!= null) { - observer.onNext(requests); - } else { - logger.error("Instance {} is not connected to the gateway service", instanceName); - } + _lockRegistry.withLock(instanceName, () -> { + StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName); + + // If observer is null, this means that the connection is already closed and + // we should not send a ShardChangeRequest + if (observer != null) { + observer.onNext(requests); + } else { + logger.error("Instance {} is not connected to the gateway service", instanceName); + // If the observer is null, we should remove the lock, so we don't keep unnecessary locks + _lockRegistry.removeLock(instanceName); + } + }); } /** @@ -149,30 +158,39 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli } private void closeConnectionHelper(String instanceName, String errorReason, boolean withError) { - StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName); - if (observer != null) { - if (withError) { - observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException()); - } else { - observer.onCompleted(); + _lockRegistry.withLock(instanceName, () -> { + StreamObserver<ShardChangeRequests> observer = _observerMap.get(instanceName); + + // If observer is null, this means that the connection is already closed and + // we should not try and close it again. + if (observer != null) { + // Depending on whether the connection is closed with error, send different status + if (withError) { + observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException()); + } else { + observer.onCompleted(); + } + + // Clean up the observer and lock + _reversedObserverMap.remove(_observerMap.get(instanceName)); + _observerMap.remove(instanceName); } - } + + // We always remove the lock after the connection is closed regardless of if observer is null or not + _lockRegistry.removeLock(instanceName); + }); } private void onClientClose(String clusterName, String instanceName) { if (instanceName == null || clusterName == null) { - // TODO: log error; + logger.error("Cluster: {} or instance: {} is null while handling onClientClose", clusterName, + instanceName); return; } logger.info("Client close connection for instance: {}", instanceName); GatewayServiceEvent event = StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, instanceName); pushClientEventToGatewayManager(_manager, event); - _lockRegistry.withLock(instanceName, () -> { - _reversedObserverMap.remove(_observerMap.get(instanceName)); - _observerMap.remove(instanceName); - _lockRegistry.removeLock(instanceName); - }); } private void updateObserver(String instanceName, String clusterName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java index 77caf0c17..a62cb6e58 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java @@ -82,7 +82,7 @@ public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceCh * 1. Get the diff of previous and current shard states, and send the state change event to the gateway manager. * 2. Compare previous liveness and current liveness, and send the connection event to the gateway manager. */ - protected void fetchUpdates() { + protected void fetchUpdates() { // 1. get the shard state change Map<String, Map<String, Map<String, Map<String, String>>>> currentShardStates = getChangedParticipantsCurrentState(_userCurrentStateFilePath); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java index 2e8d277a8..593bec258 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java @@ -87,7 +87,7 @@ public class PollChannelUtil { new TypeReference<Map<String, Map<String, Map<String, Map<String, String>>>>>() { }); } catch (IOException e) { - logger.warn("Failed to read from file: " + filePath); + logger.warn("Failed to read from file: " + filePath, e); return new HashMap<>(); } } @@ -107,7 +107,7 @@ public class PollChannelUtil { }); return status.isHealthy() && (System.currentTimeMillis()/1000 - status.getLastUpdatedTime()) < timeoutInSec; } catch (IOException e) { - logger.warn("Failed to read from file: " + filePath); + logger.warn("Failed to read from file: " + filePath, e); return false; } }