This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit d8e01f9e5fa614d3b5297f5ee615ee0d9ec65312
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Fri Aug 2 10:33:47 2024 -0700

    Implement helix manager disconnect and client disconnect handling for 
HelixGatewayParticipant.(#2868)
    
    * Implement helix manager disconnect and client disconnect handling for 
HelixGatewayParticipant.
---
 .../participant/HelixGatewayParticipant.java       | 53 ++++++++++++++++++----
 .../gateway/service/GatewayServiceManager.java     |  4 +-
 .../participant/TestHelixGatewayParticipant.java   | 40 ++++++++++++++--
 3 files changed, 82 insertions(+), 15 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
index 640552960..96a39bb01 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
 import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateTransitionError;
@@ -42,17 +43,20 @@ import 
org.apache.helix.participant.statemachine.StateTransitionError;
  * for the participant and updates the state of the participant's shards upon 
successful state
  * transitions signaled by remote participant.
  */
-public class HelixGatewayParticipant {
+public class HelixGatewayParticipant implements HelixManagerStateListener {
   public static final String UNASSIGNED_STATE = "UNASSIGNED";
   private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
-  private final HelixManager _participantManager;
+  private final HelixManager _helixManager;
+  private final Runnable _onDisconnectedCallback;
   private final Map<String, Map<String, String>> _shardStateMap;
   private final Map<String, CompletableFuture<Boolean>> 
_stateTransitionResultMap;
 
   private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
-      HelixManager participantManager, Map<String, Map<String, String>> 
initialShardStateMap) {
+      Runnable onDisconnectedCallback, HelixManager helixManager,
+      Map<String, Map<String, String>> initialShardStateMap) {
     _gatewayServiceProcessor = gatewayServiceProcessor;
-    _participantManager = participantManager;
+    _helixManager = helixManager;
+    _onDisconnectedCallback = onDisconnectedCallback;
     _shardStateMap = initialShardStateMap;
     _stateTransitionResultMap = new ConcurrentHashMap<>();
   }
@@ -70,7 +74,7 @@ public class HelixGatewayParticipant {
 
       CompletableFuture<Boolean> future = new CompletableFuture<>();
       _stateTransitionResultMap.put(transitionId, future);
-      
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+      
_gatewayServiceProcessor.sendStateTransitionMessage(_helixManager.getInstanceName(),
           getCurrentState(resourceId, shardId), message);
 
       if (!future.get()) {
@@ -107,7 +111,7 @@ public class HelixGatewayParticipant {
    * @return participant instance name
    */
   public String getInstanceName() {
-    return _participantManager.getInstanceName();
+    return _helixManager.getInstanceName();
   }
 
   /**
@@ -129,7 +133,7 @@ public class HelixGatewayParticipant {
   }
 
   @VisibleForTesting
-  public Map<String, Map<String, String>> getShardStateMap() {
+  Map<String, Map<String, String>> getShardStateMap() {
     return _shardStateMap;
   }
 
@@ -160,8 +164,34 @@ public class HelixGatewayParticipant {
     }
   }
 
+  /**
+   * Invoked when the HelixManager connection to zookeeper is established
+   *
+   * @param helixManager HelixManager that is successfully connected
+   */
+  public void onConnected(HelixManager helixManager) throws Exception {
+    // Do nothing
+  }
+
+  /**
+   * Invoked when the HelixManager connection to zookeeper is closed 
unexpectedly. This will not be
+   * run if the remote participant disconnects from gateway.
+   *
+   * @param helixManager HelixManager that fails to be connected
+   * @param error        connection error
+   */
+  @Override
+  public void onDisconnected(HelixManager helixManager, Throwable error) 
throws Exception {
+    _onDisconnectedCallback.run();
+    
_gatewayServiceProcessor.closeConnectionWithError(_helixManager.getInstanceName(),
+        error.getMessage());
+  }
+
   public void disconnect() {
-    _participantManager.disconnect();
+    if (_helixManager.isConnected()) {
+      _helixManager.disconnect();
+    }
+    
_gatewayServiceProcessor.completeConnection(_helixManager.getInstanceName());
   }
 
   public static class Builder {
@@ -169,15 +199,17 @@ public class HelixGatewayParticipant {
     private final String _instanceName;
     private final String _clusterName;
     private final String _zkAddress;
+    private final Runnable _onDisconnectedCallback;
     private final List<String> _multiTopStateModelDefinitions;
     private final Map<String, Map<String, String>> _initialShardStateMap;
 
     public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, 
String instanceName,
-        String clusterName, String zkAddress) {
+        String clusterName, String zkAddress, Runnable onDisconnectedCallback) 
{
       _helixGatewayServiceProcessor = helixGatewayServiceProcessor;
       _instanceName = instanceName;
       _clusterName = clusterName;
       _zkAddress = zkAddress;
+      _onDisconnectedCallback = onDisconnectedCallback;
       _multiTopStateModelDefinitions = new ArrayList<>();
       _initialShardStateMap = new ConcurrentHashMap<>();
     }
@@ -226,7 +258,8 @@ public class HelixGatewayParticipant {
       HelixManager participantManager =
           new ZKHelixManager(_clusterName, _instanceName, 
InstanceType.PARTICIPANT, _zkAddress);
       HelixGatewayParticipant participant =
-          new HelixGatewayParticipant(_helixGatewayServiceProcessor, 
participantManager,
+          new HelixGatewayParticipant(_helixGatewayServiceProcessor, 
_onDisconnectedCallback,
+              participantManager,
               _initialShardStateMap);
       _multiTopStateModelDefinitions.forEach(
           stateModelDefinition -> participantManager.getStateMachineEngine()
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 4553c04ca..fd7420735 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -136,7 +136,9 @@ public class GatewayServiceManager {
     // Create and add the participant to the participant map
     HelixGatewayParticipant.Builder participantBuilder =
         new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, 
instanceName, clusterName,
-            _zkAddress).setInitialShardState(initialShardStateMap);
+            _zkAddress,
+            () -> removeHelixGatewayParticipant(clusterName, 
instanceName)).setInitialShardState(
+            initialShardStateMap);
     SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach(
         participantBuilder::addMultiTopStateStateModelDefinition);
     _helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new 
ConcurrentHashMap<>())
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index dd22b9fa0..200ac8f04 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import org.apache.helix.ConfigAccessor;
@@ -45,7 +46,7 @@ import org.testng.collections.Lists;
 
 public class TestHelixGatewayParticipant extends ZkTestBase {
   private static final String CLUSTER_NAME = 
TestHelixGatewayParticipant.class.getSimpleName();
-  private static final int START_NUM_NODE = 2;
+  private static final int START_NUM_NODE = 3;
   private static final String TEST_DB = "TestDB";
   private static final String TEST_STATE_MODEL = "OnlineOffline";
   private static final String CONTROLLER_PREFIX = "controller";
@@ -56,6 +57,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase {
   private int _nextStartPort = 12000;
   private final List<HelixGatewayParticipant> _participants = 
Lists.newArrayList();
   private final Map<String, Message> _pendingMessageMap = new 
ConcurrentHashMap<>();
+  private final AtomicInteger _onDisconnectCallbackCount = new AtomicInteger();
 
   @BeforeClass
   public void beforeClass() {
@@ -97,7 +99,8 @@ public class TestHelixGatewayParticipant extends ZkTestBase {
       Map<String, Map<String, String>> initialShardMap) {
     HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(
         new MockHelixGatewayServiceProcessor(_pendingMessageMap), 
participantName, CLUSTER_NAME,
-        ZK_ADDR).addMultiTopStateStateModelDefinition(TEST_STATE_MODEL)
+        ZK_ADDR, 
_onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition(
+            TEST_STATE_MODEL)
         .setInitialShardState(initialShardMap).build();
     _participants.add(participant);
     return participant;
@@ -317,8 +320,37 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
     verifyGatewayStateMatchesHelixState();
   }
 
+  @Test(dependsOnMethods = 
"testProcessStateTransitionAfterReconnectAfterDroppingPartition")
+  public void testGatewayParticipantDisconnectGracefully() {
+    int gracefulDisconnectCount = 
MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get();
+    // Remove the first participant
+    HelixGatewayParticipant participant = _participants.get(0);
+    deleteParticipant(participant);
+
+    
Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(),
+        gracefulDisconnectCount + 1);
+  }
+
+  @Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully")
+  public void testGatewayParticipantDisconnectWithError() throws Exception {
+    int errorDisconnectCount = 
MockHelixGatewayServiceProcessor._errorDisconnectCount.get();
+    int onDisconnectCallbackCount = _onDisconnectCallbackCount.get();
+
+    // Call on disconnect with error for all participants
+    for (HelixGatewayParticipant participant : _participants) {
+      participant.onDisconnected(null, new Exception("Test error"));
+    }
+
+    
Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(),
+        errorDisconnectCount + _participants.size());
+    Assert.assertEquals(_onDisconnectCallbackCount.get(),
+        onDisconnectCallbackCount + _participants.size());
+  }
+
   public static class MockHelixGatewayServiceProcessor implements 
HelixGatewayServiceProcessor {
     private final Map<String, Message> _pendingMessageMap;
+    private static final AtomicInteger _gracefulDisconnectCount = new 
AtomicInteger();
+    private static final AtomicInteger _errorDisconnectCount = new 
AtomicInteger();
 
     public MockHelixGatewayServiceProcessor(Map<String, Message> 
pendingMessageMap) {
       _pendingMessageMap = pendingMessageMap;
@@ -332,12 +364,12 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
 
     @Override
     public void closeConnectionWithError(String instanceName, String reason) {
-
+      _errorDisconnectCount.incrementAndGet();
     }
 
     @Override
     public void completeConnection(String instanceName) {
-
+      _gracefulDisconnectCount.incrementAndGet();
     }
   }
 }

Reply via email to