This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3ed3def948f55002b47067701104795afefd3b8a
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Mon Aug 5 21:27:05 2024 -0700

    Interfaces of gateway service (#2871)
    
    Interfaces of gateway service
---
 ...elixGatewayServiceClientConnectionMonitor.java} | 31 ++++-----
 .../api/service/HelixGatewayServiceProcessor.java  | 38 +++++------
 ...=> HelixGatewayServiceShardStateProcessor.java} | 25 +------
 .../HelixGatewayServiceGrpcService.java            | 53 +++++++--------
 .../participant/TestHelixGatewayParticipant.java   | 77 +++++++++++-----------
 5 files changed, 94 insertions(+), 130 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
similarity index 58%
copy from 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
copy to 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
index c06443802..0f547354a 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceClientConnectionMonitor.java
@@ -19,35 +19,30 @@ package org.apache.helix.gateway.api.service;
  * under the License.
  */
 
-import org.apache.helix.model.Message;
-
 /**
- * Helix Gateway Service Processor interface allows sending state transition 
messages to
- * participants through service implementing this interface.
+ * Interface for gateway manager to interact with clients on connection.
  */
-public interface HelixGatewayServiceProcessor {
-
-  /**
-   * Send a state transition message to a remote participant.
-   *
-   * @param instanceName the name of the participant
-   * @param currentState the current state of the shard
-   * @param message      the message to send
-   */
-  void sendStateTransitionMessage(String instanceName, String currentState,
-      Message message);
-
+public interface HelixGatewayServiceClientConnectionMonitor {
   /**
-   * Close connection with error.
+   * Gateway service close connection with error. This function is called when 
manager wants to close client
+   * connection when there is an error. e.g. HelixManager connection is lost.
    * @param instanceName  instance name
    * @param reason  reason for closing connection
    */
   public void closeConnectionWithError(String instanceName, String reason);
 
   /**
-   * Close connection with success.
+   * Gateway service close client connection with success. This function is 
called when manager wants to close client
+   * connection gracefully, e.g., when gateway service is shutting down.
    * @param instanceName  instance name
    */
   public void completeConnection(String instanceName);
 
+  /**
+   * Callback when we detect client connection is closed. It could be when 
client gracefully close the connection,
+   * or when client connection is timed out.
+   * @param clusterName  cluster name
+   * @param instanceName  instance name
+   */
+  public void onClientClose(String clusterName, String instanceName);
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
index c06443802..3ca7aeac5 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
@@ -19,35 +19,29 @@ package org.apache.helix.gateway.api.service;
  * under the License.
  */
 
-import org.apache.helix.model.Message;
+import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+
 
 /**
  * Helix Gateway Service Processor interface allows sending state transition 
messages to
  * participants through service implementing this interface.
  */
-public interface HelixGatewayServiceProcessor {
+public interface HelixGatewayServiceProcessor
+    extends HelixGatewayServiceClientConnectionMonitor, 
HelixGatewayServiceShardStateProcessor {
 
   /**
-   * Send a state transition message to a remote participant.
+   * Callback when receiving a client event.
+   * Event could be a connection closed event (event type DISCONNECT),
+   * an initial connection establish event that contains a map of current 
chard states (event type CONNECT),
+   * or a state transition result message (event type UPDATE).
    *
-   * @param instanceName the name of the participant
-   * @param currentState the current state of the shard
-   * @param message      the message to send
-   */
-  void sendStateTransitionMessage(String instanceName, String currentState,
-      Message message);
-
-  /**
-   * Close connection with error.
-   * @param instanceName  instance name
-   * @param reason  reason for closing connection
-   */
-  public void closeConnectionWithError(String instanceName, String reason);
-
-  /**
-   * Close connection with success.
-   * @param instanceName  instance name
+   * The default implementation push an event to the Gateway Service Manager.
+   *
+   * @param gatewayServiceManager the Gateway Service Manager
+   * @param event the event to push
    */
-  public void completeConnection(String instanceName);
-
+  default void onClientEvent(GatewayServiceManager gatewayServiceManager, 
GatewayServiceEvent event) {
+    gatewayServiceManager.newGatewayServiceEvent(event);
+  }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
similarity index 63%
copy from 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
copy to 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
index c06443802..fb9bd6294 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceShardStateProcessor.java
@@ -21,33 +21,14 @@ package org.apache.helix.gateway.api.service;
 
 import org.apache.helix.model.Message;
 
-/**
- * Helix Gateway Service Processor interface allows sending state transition 
messages to
- * participants through service implementing this interface.
- */
-public interface HelixGatewayServiceProcessor {
 
+public interface HelixGatewayServiceShardStateProcessor {
   /**
-   * Send a state transition message to a remote participant.
+   * Gateway service send a state transition message to a connected 
participant.
    *
    * @param instanceName the name of the participant
    * @param currentState the current state of the shard
    * @param message      the message to send
    */
-  void sendStateTransitionMessage(String instanceName, String currentState,
-      Message message);
-
-  /**
-   * Close connection with error.
-   * @param instanceName  instance name
-   * @param reason  reason for closing connection
-   */
-  public void closeConnectionWithError(String instanceName, String reason);
-
-  /**
-   * Close connection with success.
-   * @param instanceName  instance name
-   */
-  public void completeConnection(String instanceName);
-
+  void sendStateTransitionMessage(String instanceName, String currentState, 
Message message);
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
index 344a2649d..3291e48b4 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -50,6 +50,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   // Map to store the observer for each instance
   private final Map<String, StreamObserver<TransitionMessage>> _observerMap = 
new HashMap<>();
   // A reverse map to store the instance name for each observer. It is used to 
find the instance when connection is closed.
+  // map<observer, pair<instance, cluster>>
   private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> 
_reversedObserverMap = new HashMap<>();
 
   private final GatewayServiceManager _manager;
@@ -82,19 +83,22 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
           ShardState shardState = request.getShardState();
           updateObserver(shardState.getInstanceName(), 
shardState.getClusterName(), responseObserver);
         }
-        
_manager.newGatewayServiceEvent(StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
+        onClientEvent(_manager,
+            
StateTransitionMessageTranslateUtil.translateShardStateMessageToEvent(request));
       }
 
       @Override
       public void onError(Throwable t) {
-        logger.info("Receive on error message: {}", t.getMessage());
-        onClientClose(responseObserver);
+        logger.info("Receive on error, reason: {} message: {}", 
Status.fromThrowable(t).getCode(), t.getMessage());
+        Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
+        onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
       }
 
       @Override
       public void onCompleted() {
         logger.info("Receive on complete message");
-        onClientClose(responseObserver);
+        Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
+        onClientClose(instanceInfo.getRight(), instanceInfo.getLeft());
       }
     };
   }
@@ -108,13 +112,11 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    * @param message the message to convert to the transition message
    */
   @Override
-  public void sendStateTransitionMessage(String instanceName, String 
currentState,
-      Message message) {
+  public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
     StreamObserver<TransitionMessage> observer;
     observer = _observerMap.get(instanceName);
     if (observer != null) {
-      observer.onNext(
-          
StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
+      
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
     }
   }
 
@@ -125,7 +127,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    */
   @Override
   public void closeConnectionWithError(String instanceName, String 
errorReason) {
-    logger.info("Close connection for instance: {} with error reason: {}", 
instanceName,  errorReason);
+    logger.info("Close connection for instance: {} with error reason: {}", 
instanceName, errorReason);
     closeConnectionHelper(instanceName, errorReason, true);
   }
 
@@ -139,7 +141,6 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
     closeConnectionHelper(instanceName, null, false);
   }
 
-
   private void closeConnectionHelper(String instanceName, String errorReason, 
boolean withError) {
     StreamObserver<TransitionMessage> observer;
     observer = _observerMap.get(instanceName);
@@ -152,34 +153,28 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
     }
   }
 
-  private void updateObserver(String instanceName, String clusterName,
-      StreamObserver<TransitionMessage> streamObserver) {
-    _lockRegistry.withLock(instanceName, () -> {
-      _observerMap.put(instanceName, streamObserver);
-      _reversedObserverMap.put(streamObserver, new 
ImmutablePair<>(instanceName, clusterName));
-    });
-  }
-
-  private void onClientClose(
-      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
-    String instanceName;
-    String clusterName;
-    Pair<String, String> instanceInfo = 
_reversedObserverMap.get(responseObserver);
-    clusterName = instanceInfo.getRight();
-    instanceName = instanceInfo.getLeft();
-    logger.info("Client close connection for instance: {}", instanceName);
-
+  @Override
+  public void onClientClose(String clusterName, String instanceName) {
     if (instanceName == null || clusterName == null) {
       // TODO: log error;
       return;
     }
+    logger.info("Client close connection for instance: {}", instanceName);
     GatewayServiceEvent event =
         
StateTransitionMessageTranslateUtil.translateClientCloseToEvent(clusterName, 
instanceName);
-    _manager.newGatewayServiceEvent(event);
+    onClientEvent(_manager, event);
     _lockRegistry.withLock(instanceName, () -> {
-      _reversedObserverMap.remove(responseObserver);
+      _reversedObserverMap.remove(_observerMap.get(instanceName));
       _observerMap.remove(instanceName);
       _lockRegistry.removeLock(instanceName);
     });
   }
+
+  private void updateObserver(String instanceName, String clusterName,
+      StreamObserver<TransitionMessage> streamObserver) {
+    _lockRegistry.withLock(instanceName, () -> {
+      _observerMap.put(instanceName, streamObserver);
+      _reversedObserverMap.put(streamObserver, new 
ImmutablePair<>(instanceName, clusterName));
+    });
+  }
 }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index 200ac8f04..0c81ac354 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -44,6 +44,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
+
 public class TestHelixGatewayParticipant extends ZkTestBase {
   private static final String CLUSTER_NAME = 
TestHelixGatewayParticipant.class.getSimpleName();
   private static final int START_NUM_NODE = 3;
@@ -97,11 +98,10 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
    */
   private HelixGatewayParticipant addParticipant(String participantName,
       Map<String, Map<String, String>> initialShardMap) {
-    HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(
-        new MockHelixGatewayServiceProcessor(_pendingMessageMap), 
participantName, CLUSTER_NAME,
-        ZK_ADDR, 
_onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition(
-            TEST_STATE_MODEL)
-        .setInitialShardState(initialShardMap).build();
+    HelixGatewayParticipant participant =
+        new HelixGatewayParticipant.Builder(new 
MockHelixGatewayServiceProcessor(_pendingMessageMap), participantName,
+            CLUSTER_NAME, ZK_ADDR, 
_onDisconnectCallbackCount::incrementAndGet).addMultiTopStateStateModelDefinition(
+            TEST_STATE_MODEL).setInitialShardState(initialShardMap).build();
     _participants.add(participant);
     return participant;
   }
@@ -126,9 +126,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
    * Add a participant to the IdealState's preference list.
    */
   private void addToPreferenceList(HelixGatewayParticipant participant) {
-    IdealState idealState =
-        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
-    idealState.getPreferenceLists().values()
+    IdealState idealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
+    idealState.getPreferenceLists()
+        .values()
         .forEach(preferenceList -> 
preferenceList.add(participant.getInstanceName()));
     
idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas())
 + 1));
     _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
TEST_DB, idealState);
@@ -138,9 +138,9 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
    * Remove a participant from the IdealState's preference list.
    */
   private void removeFromPreferenceList(HelixGatewayParticipant participant) {
-    IdealState idealState =
-        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
-    idealState.getPreferenceLists().values()
+    IdealState idealState = 
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
+    idealState.getPreferenceLists()
+        .values()
         .forEach(preferenceList -> 
preferenceList.remove(participant.getInstanceName()));
     
idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas())
 - 1));
     _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
TEST_DB, idealState);
@@ -151,13 +151,13 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
    */
   private void createDB() {
     createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB,
-        _participants.stream().map(HelixGatewayParticipant::getInstanceName)
-            .collect(Collectors.toList()), TEST_STATE_MODEL, 1, 
_participants.size());
+        
_participants.stream().map(HelixGatewayParticipant::getInstanceName).collect(Collectors.toList()),
+        TEST_STATE_MODEL, 1, _participants.size());
 
     _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
-        .setResources(new HashSet<>(
-            
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
-        
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+        .setResources(new 
HashSet<>(_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
+        .setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME)
+        .build();
   }
 
   /**
@@ -178,10 +178,10 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
   /**
    * Get the current state of a Helix shard.
    */
-  private String getHelixCurrentState(String instanceName, String resourceName,
-      String shardId) {
+  private String getHelixCurrentState(String instanceName, String 
resourceName, String shardId) {
     return _gSetupTool.getClusterManagementTool()
-        .getResourceExternalView(CLUSTER_NAME, 
resourceName).getStateMap(shardId)
+        .getResourceExternalView(CLUSTER_NAME, resourceName)
+        .getStateMap(shardId)
         .getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE);
   }
 
@@ -189,8 +189,8 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
    * Verify that all specified participants have pending messages.
    */
   private void verifyPendingMessages(List<HelixGatewayParticipant> 
participants) throws Exception {
-    Assert.assertTrue(TestHelper.verify(() -> participants.stream()
-            .allMatch(participant -> 
getPendingMessage(participant.getInstanceName()) != null),
+    Assert.assertTrue(TestHelper.verify(
+        () -> participants.stream().allMatch(participant -> 
getPendingMessage(participant.getInstanceName()) != null),
         TestHelper.WAIT_DURATION));
   }
 
@@ -200,12 +200,11 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
   private void verifyGatewayStateMatchesHelixState() throws Exception {
     Assert.assertTrue(TestHelper.verify(() -> 
_participants.stream().allMatch(participant -> {
       String instanceName = participant.getInstanceName();
-      for (String resourceName : _gSetupTool.getClusterManagementTool()
-          .getResourcesInCluster(CLUSTER_NAME)) {
+      for (String resourceName : 
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) {
         for (String shardId : _gSetupTool.getClusterManagementTool()
-            .getResourceIdealState(CLUSTER_NAME, 
resourceName).getPartitionSet()) {
-          String helixCurrentState =
-              getHelixCurrentState(instanceName, resourceName, shardId);
+            .getResourceIdealState(CLUSTER_NAME, resourceName)
+            .getPartitionSet()) {
+          String helixCurrentState = getHelixCurrentState(instanceName, 
resourceName, shardId);
           if (!participant.getCurrentState(resourceName, 
shardId).equals(helixCurrentState)) {
             return false;
           }
@@ -220,10 +219,10 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
    */
   private void verifyHelixPartitionStates(String instanceName, String state) 
throws Exception {
     Assert.assertTrue(TestHelper.verify(() -> {
-      for (String resourceName : _gSetupTool.getClusterManagementTool()
-          .getResourcesInCluster(CLUSTER_NAME)) {
+      for (String resourceName : 
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)) {
         for (String shardId : _gSetupTool.getClusterManagementTool()
-            .getResourceIdealState(CLUSTER_NAME, 
resourceName).getPartitionSet()) {
+            .getResourceIdealState(CLUSTER_NAME, resourceName)
+            .getPartitionSet()) {
           if (!getHelixCurrentState(instanceName, resourceName, 
shardId).equals(state)) {
             return false;
           }
@@ -287,8 +286,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
     deleteParticipant(participant);
 
     // Verify the Helix state transitions to "UNASSIGNED_STATE" for the 
participant
-    verifyHelixPartitionStates(participant.getInstanceName(),
-        HelixGatewayParticipant.UNASSIGNED_STATE);
+    verifyHelixPartitionStates(participant.getInstanceName(), 
HelixGatewayParticipant.UNASSIGNED_STATE);
 
     // Re-add the participant with its initial state
     addParticipant(participant.getInstanceName(), 
participant.getShardStateMap());
@@ -303,8 +301,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
     // Remove the first participant and verify state
     HelixGatewayParticipant participant = _participants.get(0);
     deleteParticipant(participant);
-    verifyHelixPartitionStates(participant.getInstanceName(),
-        HelixGatewayParticipant.UNASSIGNED_STATE);
+    verifyHelixPartitionStates(participant.getInstanceName(), 
HelixGatewayParticipant.UNASSIGNED_STATE);
 
     // Remove shard preference and re-add the participant
     removeFromPreferenceList(participant);
@@ -327,8 +324,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
     HelixGatewayParticipant participant = _participants.get(0);
     deleteParticipant(participant);
 
-    
Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(),
-        gracefulDisconnectCount + 1);
+    
Assert.assertEquals(MockHelixGatewayServiceProcessor._gracefulDisconnectCount.get(),
 gracefulDisconnectCount + 1);
   }
 
   @Test(dependsOnMethods = "testGatewayParticipantDisconnectGracefully")
@@ -343,8 +339,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
 
     
Assert.assertEquals(MockHelixGatewayServiceProcessor._errorDisconnectCount.get(),
         errorDisconnectCount + _participants.size());
-    Assert.assertEquals(_onDisconnectCallbackCount.get(),
-        onDisconnectCallbackCount + _participants.size());
+    Assert.assertEquals(_onDisconnectCallbackCount.get(), 
onDisconnectCallbackCount + _participants.size());
   }
 
   public static class MockHelixGatewayServiceProcessor implements 
HelixGatewayServiceProcessor {
@@ -357,8 +352,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
     }
 
     @Override
-    public void sendStateTransitionMessage(String instanceName, String 
currentState,
-        Message message) {
+    public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
       _pendingMessageMap.put(instanceName, message);
     }
 
@@ -371,5 +365,10 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
     public void completeConnection(String instanceName) {
       _gracefulDisconnectCount.incrementAndGet();
     }
+
+    @Override
+    public void onClientClose(String clusterName, String instanceName) {
+
+    }
   }
 }

Reply via email to