This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-gateway-service by this 
push:
     new 2439837c7 Synchronize calls to StreamObserver methods (#2934)
2439837c7 is described below

commit 2439837c7ef44837883306d15a567de3a5d824d2
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Tue Oct 8 11:22:05 2024 -0700

    Synchronize calls to StreamObserver methods (#2934)
    
    - Synchronized calls to onNext, onError, and onComplete to prevent grpc 
failures
    - Ensured thread safety for StreamObserver method invocations
---
 .../helix/manager/zk/ParticipantManager.java       |  2 +-
 .../channel/HelixGatewayServiceGrpcService.java    | 56 ++++++++++++++--------
 .../HelixGatewayServicePollModeChannel.java        |  2 +-
 .../apache/helix/gateway/util/PollChannelUtil.java |  4 +-
 4 files changed, 41 insertions(+), 23 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
index 2044dbcd7..20910c891 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -394,7 +394,7 @@ public class ParticipantManager {
           continue;
         }
 
-        // If the the current state is related to tasks, there is no need to 
carry it over to new session.
+        // If the current state is related to tasks, there is no need to carry 
it over to new session.
         // Note: this check is not necessary due to TaskCurrentStates, but 
keep it for backwards compatibility
         if (stateModelDefRef.equals(TaskConstants.STATE_MODEL_NAME)) {
           continue;
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index 6a2c76b8a..c83b01583 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -97,6 +97,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
       @Override
       public void onError(Throwable t) {
         logger.info("Receive on error, reason: {} message: {}", 
Status.fromThrowable(t).getCode(), t.getMessage());
+        // Notify the gateway manager that the client is closed
         Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
         onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
       }
@@ -104,6 +105,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
       @Override
       public void onCompleted() {
         logger.info("Receive on complete message");
+        // Notify the gateway manager that the client is closed
         Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
         onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
       }
@@ -119,12 +121,19 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    */
   @Override
   public void sendStateChangeRequests(String instanceName, ShardChangeRequests 
requests) {
-    StreamObserver<ShardChangeRequests> observer = 
_observerMap.get(instanceName);
-    if (observer!= null) {
-      observer.onNext(requests);
-    } else {
-      logger.error("Instance {} is not connected to the gateway service", 
instanceName);
-    }
+    _lockRegistry.withLock(instanceName, () -> {
+      StreamObserver<ShardChangeRequests> observer = 
_observerMap.get(instanceName);
+
+      // If observer is null, this means that the connection is already closed 
and
+      // we should not send a ShardChangeRequest
+      if (observer != null) {
+        observer.onNext(requests);
+      } else {
+        logger.error("Instance {} is not connected to the gateway service", 
instanceName);
+        // If the observer is null, we should remove the lock, so we don't 
keep unnecessary locks
+        _lockRegistry.removeLock(instanceName);
+      }
+    });
   }
 
   /**
@@ -149,30 +158,39 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   }
 
   private void closeConnectionHelper(String instanceName, String errorReason, 
boolean withError) {
-    StreamObserver<ShardChangeRequests> observer = 
_observerMap.get(instanceName);
-    if (observer != null) {
-      if (withError) {
-        
observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException());
-      } else {
-        observer.onCompleted();
+    _lockRegistry.withLock(instanceName, () -> {
+      StreamObserver<ShardChangeRequests> observer = 
_observerMap.get(instanceName);
+
+      // If observer is null, this means that the connection is already closed 
and
+      // we should not try and close it again.
+      if (observer != null) {
+        // Depending on whether the connection is closed with error, send 
different status
+        if (withError) {
+          
observer.onError(Status.UNAVAILABLE.withDescription(errorReason).asRuntimeException());
+        } else {
+          observer.onCompleted();
+        }
+
+        // Clean up the observer and lock
+        _reversedObserverMap.remove(_observerMap.get(instanceName));
+        _observerMap.remove(instanceName);
       }
-    }
+
+      // We always remove the lock after the connection is closed regardless 
of if observer is null or not
+      _lockRegistry.removeLock(instanceName);
+    });
   }
 
    private void onClientClose(String clusterName, String instanceName) {
     if (instanceName == null || clusterName == null) {
-      // TODO: log error;
+      logger.error("Cluster: {} or instance: {} is null while handling 
onClientClose", clusterName,
+          instanceName);
       return;
     }
     logger.info("Client close connection for instance: {}", instanceName);
     GatewayServiceEvent event =
         
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, 
instanceName);
     pushClientEventToGatewayManager(_manager, event);
-    _lockRegistry.withLock(instanceName, () -> {
-      _reversedObserverMap.remove(_observerMap.get(instanceName));
-      _observerMap.remove(instanceName);
-      _lockRegistry.removeLock(instanceName);
-    });
   }
 
   private void updateObserver(String instanceName, String clusterName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
index 77caf0c17..a62cb6e58 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
@@ -82,7 +82,7 @@ public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceCh
    * 1. Get the diff of previous and current shard states, and send the state 
change event to the gateway manager.
    * 2. Compare previous liveness and current liveness, and send the 
connection event to the gateway manager.
    */
- protected  void fetchUpdates() {
+ protected void fetchUpdates() {
     // 1.  get the shard state change
     Map<String, Map<String, Map<String, Map<String, String>>>> 
currentShardStates =
         getChangedParticipantsCurrentState(_userCurrentStateFilePath);
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
index 2e8d277a8..593bec258 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -87,7 +87,7 @@ public class PollChannelUtil {
           new TypeReference<Map<String, Map<String, Map<String, Map<String, 
String>>>>>() {
           });
     } catch (IOException e) {
-      logger.warn("Failed to read from file: " + filePath);
+      logger.warn("Failed to read from file: " + filePath, e);
       return new HashMap<>();
     }
   }
@@ -107,7 +107,7 @@ public class PollChannelUtil {
       });
       return status.isHealthy() && (System.currentTimeMillis()/1000 - 
status.getLastUpdatedTime()) < timeoutInSec;
     } catch (IOException e) {
-      logger.warn("Failed to read from file: " + filePath);
+      logger.warn("Failed to read from file: " + filePath, e);
       return false;
     }
   }

Reply via email to