This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 92153d2470da9352936543ed588dece4ac5ab1f9
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Wed Jul 31 16:02:48 2024 -0700

    Implement Helix ST handling logic and HelixGatewayParticipant (#2845)
    
    - Add ST handling logic to support multi top state state model definitions 
without intermediary states
    - Encapsulate the participant manager create, connect, disconnect logic in 
HelixGatewayParticipant
    - Add replica state tracking in HelixGatewayParticipant
---
 helix-gateway/pom.xml                              |   6 +
 .../service/HelixGatewayServiceProcessor.java      |  17 +-
 .../helix/gateway/constant/MessageStatus.java      |  24 --
 .../apache/helix/gateway/constant/MessageType.java |  24 --
 .../HelixGatewayServiceGrpcService.java            |  21 +-
 .../participant/HelixGatewayParticipant.java       | 245 +++++++++++++++
 .../helix/gateway/service/GatewayServiceEvent.java |  10 +-
 .../gateway/service/GatewayServiceManager.java     |  88 +++---
 .../service/GatewayServiceManagerFactory.java      |   1 -
 .../helix/gateway/service/HelixGatewayService.java | 110 -------
 .../HelixGatewayMultiTopStateStateModel.java       |  60 ++++
 ...elixGatewayMultiTopStateStateModelFactory.java} |  15 +-
 .../HelixGatewayOnlineOfflineStateModel.java       |  83 -----
 .../util/StateTransitionMessageTranslateUtil.java  |  61 +++-
 .../src/main/proto/HelixGatewayService.proto       |   3 +-
 .../participant/TestHelixGatewayParticipant.java   | 333 +++++++++++++++++++++
 .../gateway/service/TestGatewayServiceManager.java |   1 -
 .../TestStateTransitionMessageTranslateUtil.java   |  68 +++++
 18 files changed, 857 insertions(+), 313 deletions(-)

diff --git a/helix-gateway/pom.xml b/helix-gateway/pom.xml
index 8a7c50ed7..62e927d11 100644
--- a/helix-gateway/pom.xml
+++ b/helix-gateway/pom.xml
@@ -86,6 +86,12 @@
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.helix</groupId>
+      <artifactId>helix-core</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-services</artifactId>
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
similarity index 60%
rename from 
helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
rename to 
helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
index 814cfb0d0..fe57e69c9 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.helix.gateway.service;
+package org.apache.helix.gateway.api.service;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,11 +19,22 @@ package org.apache.helix.gateway.service;
  * under the License.
  */
 
+import org.apache.helix.model.Message;
+
 /**
- * Translate from/to GRPC function call to Helix Gateway Service event.
+ * Helix Gateway Service Processor interface allows sending state transition 
messages to
+ * participants through service implementing this interface.
  */
 public interface HelixGatewayServiceProcessor {
 
-  public boolean sendStateTransitionMessage( String instanceName);
+  /**
+   * Send a state transition message to a remote participant.
+   *
+   * @param instanceName the name of the participant
+   * @param currentState the current state of the shard
+   * @param message      the message to send
+   */
+  void sendStateTransitionMessage(String instanceName, String currentState,
+      Message message);
 
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
deleted file mode 100644
index 528b28e2f..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageStatus.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum MessageStatus {
-  SUCCESS, FAILURE
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
deleted file mode 100644
index 49619dec8..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/constant/MessageType.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package org.apache.helix.gateway.constant;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-public enum MessageType {
-  ADD, REMOVE, CHANGE_ROLE
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
index 09fe3d07b..018d6591e 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceGrpcService.java
@@ -26,9 +26,10 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
 import org.apache.helix.gateway.util.PerKeyLockRegistry;
 import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import org.apache.helix.model.Message;
 import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
@@ -59,8 +60,9 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   /**
    * Grpc service end pint.
    * Application instances Report the state of the shard or result of 
transition request to the gateway service.
-   * @param responseObserver
-   * @return
+   *
+   * @param responseObserver the observer to send the response to the client
+   * @return the observer to receive the state of the shard or result of 
transition request
    */
   @Override
   public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
@@ -92,17 +94,20 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   /**
    * Send state transition message to the instance.
    * The instance must already have established a connection to the gateway 
service.
-   * @param instanceName
-   * @return
+   *
+   * @param instanceName the instance name to send the message to
+   * @param currentState the current state of shard
+   * @param message the message to convert to the transition message
    */
   @Override
-  public boolean sendStateTransitionMessage(String instanceName) {
+  public void sendStateTransitionMessage(String instanceName, String 
currentState,
+      Message message) {
     StreamObserver<TransitionMessage> observer;
     observer = _observerMap.get(instanceName);
     if (observer != null) {
-      
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage());
+      observer.onNext(
+          
StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
     }
-    return true;
   }
 
   private void updateObserver(String instanceName, String clusterName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
new file mode 100644
index 000000000..640552960
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -0,0 +1,245 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+
+/**
+ * HelixGatewayParticipant encapsulates the Helix Participant Manager and 
handles tracking the state
+ * of a remote participant connected to the Helix Gateway Service. It 
processes state transitions
+ * for the participant and updates the state of the participant's shards upon 
successful state
+ * transitions signaled by remote participant.
+ */
+public class HelixGatewayParticipant {
+  public static final String UNASSIGNED_STATE = "UNASSIGNED";
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
+  private final HelixManager _participantManager;
+  private final Map<String, Map<String, String>> _shardStateMap;
+  private final Map<String, CompletableFuture<Boolean>> 
_stateTransitionResultMap;
+
+  private HelixGatewayParticipant(HelixGatewayServiceProcessor 
gatewayServiceProcessor,
+      HelixManager participantManager, Map<String, Map<String, String>> 
initialShardStateMap) {
+    _gatewayServiceProcessor = gatewayServiceProcessor;
+    _participantManager = participantManager;
+    _shardStateMap = initialShardStateMap;
+    _stateTransitionResultMap = new ConcurrentHashMap<>();
+  }
+
+  public void processStateTransitionMessage(Message message) throws Exception {
+    String transitionId = message.getMsgId();
+    String resourceId = message.getResourceName();
+    String shardId = message.getPartitionName();
+    String toState = message.getToState();
+
+    try {
+      if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
+        return;
+      }
+
+      CompletableFuture<Boolean> future = new CompletableFuture<>();
+      _stateTransitionResultMap.put(transitionId, future);
+      
_gatewayServiceProcessor.sendStateTransitionMessage(_participantManager.getInstanceName(),
+          getCurrentState(resourceId, shardId), message);
+
+      if (!future.get()) {
+        throw new Exception("Failed to transition to state " + toState);
+      }
+
+      updateState(resourceId, shardId, toState);
+    } finally {
+      _stateTransitionResultMap.remove(transitionId);
+    }
+  }
+
+  public void handleStateTransitionError(Message message, StateTransitionError 
error) {
+    // Remove the stateTransitionResultMap future for the message
+    String transitionId = message.getMsgId();
+    String resourceId = message.getResourceName();
+    String shardId = message.getPartitionName();
+
+    // Remove the future from the stateTransitionResultMap since we are no 
longer able
+    // to process the state transition due to participant manager either 
timing out
+    // or failing to process the state transition
+    _stateTransitionResultMap.remove(transitionId);
+
+    // Set the replica state to ERROR
+    updateState(resourceId, shardId, HelixDefinedState.ERROR.name());
+
+    // Notify the HelixGatewayParticipantClient that it is in ERROR state
+    // TODO: We need a better way than sending the state transition with a 
toState of ERROR
+  }
+
+  /**
+   * Get the instance name of the participant.
+   *
+   * @return participant instance name
+   */
+  public String getInstanceName() {
+    return _participantManager.getInstanceName();
+  }
+
+  /**
+   * Completes the state transition with the given transitionId.
+   *
+   * @param transitionId the transitionId to complete
+   * @param isSuccess    whether the state transition was successful
+   */
+  public void completeStateTransition(String transitionId, boolean isSuccess) {
+    CompletableFuture<Boolean> future = 
_stateTransitionResultMap.get(transitionId);
+    if (future != null) {
+      future.complete(isSuccess);
+    }
+  }
+
+  private boolean isCurrentStateAlreadyTarget(String resourceId, String 
shardId,
+      String targetState) {
+    return getCurrentState(resourceId, shardId).equals(targetState);
+  }
+
+  @VisibleForTesting
+  public Map<String, Map<String, String>> getShardStateMap() {
+    return _shardStateMap;
+  }
+
+  /**
+   * Get the current state of the shard.
+   *
+   * @param resourceId the resource id
+   * @param shardId    the shard id
+   * @return the current state of the shard or DROPPED if it does not exist
+   */
+  public String getCurrentState(String resourceId, String shardId) {
+    return getShardStateMap().getOrDefault(resourceId, Collections.emptyMap())
+        .getOrDefault(shardId, UNASSIGNED_STATE);
+  }
+
+  private void updateState(String resourceId, String shardId, String state) {
+    if (state.equals(HelixDefinedState.DROPPED.name())) {
+      getShardStateMap().computeIfPresent(resourceId, (k, v) -> {
+        v.remove(shardId);
+        if (v.isEmpty()) {
+          return null;
+        }
+        return v;
+      });
+    } else {
+      getShardStateMap().computeIfAbsent(resourceId, k -> new 
ConcurrentHashMap<>())
+          .put(shardId, state);
+    }
+  }
+
+  public void disconnect() {
+    _participantManager.disconnect();
+  }
+
+  public static class Builder {
+    private final HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
+    private final String _instanceName;
+    private final String _clusterName;
+    private final String _zkAddress;
+    private final List<String> _multiTopStateModelDefinitions;
+    private final Map<String, Map<String, String>> _initialShardStateMap;
+
+    public Builder(HelixGatewayServiceProcessor helixGatewayServiceProcessor, 
String instanceName,
+        String clusterName, String zkAddress) {
+      _helixGatewayServiceProcessor = helixGatewayServiceProcessor;
+      _instanceName = instanceName;
+      _clusterName = clusterName;
+      _zkAddress = zkAddress;
+      _multiTopStateModelDefinitions = new ArrayList<>();
+      _initialShardStateMap = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Add a multi-top state model definition to the participant to be 
registered in the
+     * participant's state machine engine.
+     *
+     * @param stateModelDefinitionName the state model definition name to add 
(should be multi-top
+     *                                 state model)
+     * @return the builder
+     */
+    public Builder addMultiTopStateStateModelDefinition(String 
stateModelDefinitionName) {
+      // TODO: Add validation that the state model definition is a multi-top 
state model
+      _multiTopStateModelDefinitions.add(stateModelDefinitionName);
+      return this;
+    }
+
+    /**
+     * Add initial shard state to the participant. This is used to initialize 
the participant with
+     * the initial state of the shards in order to reduce unnecessary state 
transitions from being
+     * forwarded to the participant.
+     *
+     * @param initialShardStateMap the initial shard state map to add
+     * @return the Builder
+     */
+    public Builder setInitialShardState(Map<String, Map<String, String>> 
initialShardStateMap) {
+      // TODO: Add handling for shard states that where never assigned to the 
participant since
+      //  the participant was last online.
+      // deep copy into the initialShardStateMap into concurrent hash map
+      initialShardStateMap.forEach((resourceId, shardStateMap) -> {
+        _initialShardStateMap.put(resourceId, new 
ConcurrentHashMap<>(shardStateMap));
+      });
+
+      return this;
+    }
+
+    /**
+     * Build the HelixGatewayParticipant. This will create a HelixManager for 
the participant and
+     * connect to the Helix cluster. The participant will be registered with 
the multi-top state
+     * model definitions and initialized with the initial shard state map.
+     *
+     * @return the HelixGatewayParticipant
+     */
+    public HelixGatewayParticipant build() {
+      HelixManager participantManager =
+          new ZKHelixManager(_clusterName, _instanceName, 
InstanceType.PARTICIPANT, _zkAddress);
+      HelixGatewayParticipant participant =
+          new HelixGatewayParticipant(_helixGatewayServiceProcessor, 
participantManager,
+              _initialShardStateMap);
+      _multiTopStateModelDefinitions.forEach(
+          stateModelDefinition -> participantManager.getStateMachineEngine()
+              .registerStateModelFactory(stateModelDefinition,
+                  new 
HelixGatewayMultiTopStateStateModelFactory(participant)));
+      try {
+        participantManager.connect();
+      } catch (Exception e) {
+        // TODO: When API for gracefully triggering disconnect from remote 
participant
+        //  is available, we should call it here instead of throwing exception.
+        throw new RuntimeException(e);
+      }
+      return participant;
+    }
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
index 5745dbf03..b919429b9 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -40,20 +40,20 @@ public class GatewayServiceEvent {
 
   public static class StateTransitionResult {
     private String stateTransitionId;
-    private String stateTransitionStatus;
+    private boolean isSuccess;
     private String shardState;
 
-    public StateTransitionResult(String stateTransitionId, String 
stateTransitionStatus, String shardState) {
+    public StateTransitionResult(String stateTransitionId, boolean isSuccess, 
String shardState) {
       this.stateTransitionId = stateTransitionId;
-      this.stateTransitionStatus = stateTransitionStatus;
+      this.isSuccess = isSuccess;
       this.shardState = shardState;
     }
 
     public String getStateTransitionId() {
       return stateTransitionId;
     }
-    public String getStateTransitionStatus() {
-      return stateTransitionStatus;
+    public boolean getIsSuccess() {
+      return isSuccess;
     }
     public String getShardState() {
       return shardState;
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 830bb97c6..85a274156 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,14 +19,17 @@ package org.apache.helix.gateway.service;
  * under the License.
  */
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
 import org.apache.helix.gateway.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.grpcservice.HelixGatewayServiceGrpcService;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
 
 
@@ -37,40 +40,35 @@ import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
  *  3. On init connect, create the participant manager
  *  4. For ST reply message, update the tracker
  */
-
 public class GatewayServiceManager {
   public static final int CONNECTION_EVENT_THREAD_POOL_SIZE = 10;
-  private final Map<String, HelixGatewayService> _helixGatewayServiceMap;
+  public static final ImmutableSet<String> SUPPORTED_MULTI_STATE_MODEL_TYPES =
+      ImmutableSet.of("OnlineOffline");
+  private final Map<String, Map<String, HelixGatewayParticipant>> 
_helixGatewayParticipantMap;
+  private final String _zkAddress;
 
   // a single thread tp for event processing
   private final ExecutorService _participantStateTransitionResultUpdator;
 
   // link to grpc service
-  private final HelixGatewayServiceGrpcService _grpcService;
+  private final HelixGatewayServiceProcessor _gatewayServiceProcessor;
 
   // a per key executor for connection event. All event for the same instance 
will be executed in sequence.
   // It is used to ensure for each instance, the connect/disconnect event 
won't start until the previous one is done.
   private final PerKeyBlockingExecutor _connectionEventProcessor;
 
   public GatewayServiceManager() {
-    _helixGatewayServiceMap = new ConcurrentHashMap<>();
+    _helixGatewayParticipantMap = new ConcurrentHashMap<>();
+    _zkAddress = "foo";
     _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
-    _grpcService = new HelixGatewayServiceGrpcService(this);
+    _gatewayServiceProcessor = new HelixGatewayServiceGrpcService(this);
     _connectionEventProcessor =
         new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // 
todo: make it configurable
   }
 
   /**
-   * send state transition message to application instance
-   * @return
-   */
-  public AtomicBoolean sendTransitionRequestToApplicationInstance() {
-    // TODO: add param
-    return null;
-  }
-
-  /**
-   * Process the event from Grpc service
+   * Process the event from Grpc service and dispatch to async executor for 
processing.
+   *
    * @param event
    */
   public void newGatewayServiceEvent(GatewayServiceEvent event) {
@@ -86,20 +84,24 @@ public class GatewayServiceManager {
    */
   class shardStateUpdator implements Runnable {
 
-    GatewayServiceEvent _event;
+    private final GatewayServiceEvent _event;
 
-    public shardStateUpdator(GatewayServiceEvent event) {
+    private shardStateUpdator(GatewayServiceEvent event) {
       _event = event;
     }
 
     @Override
     public void run() {
-      HelixGatewayService helixGatewayService = 
_helixGatewayServiceMap.get(_event.getClusterName());
-      if (helixGatewayService == null) {
+      HelixGatewayParticipant participant =
+          getHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName());
+      if (participant == null) {
         // TODO: return error code and throw exception.
         return;
       }
-      helixGatewayService.receiveSTResponse();
+      _event.getStateTransitionResult().forEach(stateTransitionResult -> {
+        
participant.completeStateTransition(stateTransitionResult.getStateTransitionId(),
+            stateTransitionResult.getIsSuccess());
+      });
     }
   }
 
@@ -108,33 +110,47 @@ public class GatewayServiceManager {
    * It includes waiting for ZK connection, and also wait for previous 
LiveInstance to expire.
    */
   class participantConnectionProcessor implements Runnable {
-    GatewayServiceEvent _event;
+    private final GatewayServiceEvent _event;
 
-    public participantConnectionProcessor(GatewayServiceEvent event) {
+    private participantConnectionProcessor(GatewayServiceEvent event) {
       _event = event;
     }
 
     @Override
     public void run() {
-      HelixGatewayService helixGatewayService;
-      _helixGatewayServiceMap.computeIfAbsent(_event.getClusterName(),
-          k -> new HelixGatewayService(GatewayServiceManager.this, 
_event.getClusterName()));
-      helixGatewayService = 
_helixGatewayServiceMap.get(_event.getClusterName());
       if (_event.getEventType().equals(GatewayServiceEventType.CONNECT)) {
-        helixGatewayService.registerParticipant();
+        createHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName(),
+            _event.getShardStateMap());
       } else {
-        helixGatewayService.deregisterParticipant(_event.getClusterName(), 
_event.getInstanceName());
+        removeHelixGatewayParticipant(_event.getClusterName(), 
_event.getInstanceName());
       }
     }
   }
 
-  @VisibleForTesting
-  HelixGatewayServiceGrpcService getGrpcService() {
-    return _grpcService;
+  private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
+      Map<String, Map<String, String>> initialShardStateMap) {
+    // Create and add the participant to the participant map
+    HelixGatewayParticipant.Builder participantBuilder =
+        new HelixGatewayParticipant.Builder(_gatewayServiceProcessor, 
instanceName, clusterName,
+            _zkAddress).setInitialShardState(initialShardStateMap);
+    SUPPORTED_MULTI_STATE_MODEL_TYPES.forEach(
+        participantBuilder::addMultiTopStateStateModelDefinition);
+    _helixGatewayParticipantMap.computeIfAbsent(clusterName, k -> new 
ConcurrentHashMap<>())
+        .put(instanceName, participantBuilder.build());
+  }
+
+  private void removeHelixGatewayParticipant(String clusterName, String 
instanceName) {
+    // Disconnect and remove the participant from the participant map
+    HelixGatewayParticipant participant = 
getHelixGatewayParticipant(clusterName, instanceName);
+    if (participant != null) {
+      participant.disconnect();
+      _helixGatewayParticipantMap.get(clusterName).remove(instanceName);
+    }
   }
 
-  @VisibleForTesting
-  HelixGatewayService getHelixGatewayService(String clusterName) {
-    return _helixGatewayServiceMap.get(clusterName);
+  private HelixGatewayParticipant getHelixGatewayParticipant(String 
clusterName,
+      String instanceName) {
+    return _helixGatewayParticipantMap.getOrDefault(clusterName, 
Collections.emptyMap())
+        .get(instanceName);
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
index dce5a44a0..aed7518b9 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
@@ -23,7 +23,6 @@ package org.apache.helix.gateway.service;
  * Factory class to create GatewayServiceManager
  */
 public class GatewayServiceManagerFactory {
-
   public GatewayServiceManager createGatewayServiceManager() {
     return new GatewayServiceManager();
   }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
deleted file mode 100644
index 2ef35820c..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++ /dev/null
@@ -1,110 +0,0 @@
-package org.apache.helix.gateway.service;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.HelixManager;
-import org.apache.helix.InstanceType;
-import 
org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
-import org.apache.helix.manager.zk.ZKHelixManager;
-
-
-/**
- * A service object for each Helix cluster.
- * This service object manages the Helix participants in the cluster.
- */
-public class HelixGatewayService {
-  final private Map<String, Map<String, HelixManager>> _participantsMap;
-
-  final private String _zkAddress;
-  private final GatewayServiceManager _gatewayServiceManager;
-  private Map<String, Map<String, AtomicBoolean>> _flagMap;
-  public HelixGatewayService(GatewayServiceManager gatewayServiceManager, 
String zkAddress) {
-    _participantsMap = new ConcurrentHashMap<>();
-    _zkAddress = zkAddress;
-    _gatewayServiceManager = gatewayServiceManager;
-    _flagMap = new ConcurrentHashMap<>();
-  }
-
-  public GatewayServiceManager getClusterManager() {
-    return _gatewayServiceManager;
-  }
-
-  /**
-   * Register a participant to the Helix cluster.
-   * It creates a HelixParticipantManager and connects to the Helix controller.
-   */
-  public void registerParticipant() {
-    // TODO: create participant manager and add to _participantsMap
-    HelixManager manager = new ZKHelixManager("clusterName", "instanceName", 
InstanceType.PARTICIPANT, _zkAddress);
-    manager.getStateMachineEngine()
-        .registerStateModelFactory("OnlineOffline", new 
HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
-    try {
-      manager.connect();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Deregister a participant from the Helix cluster when app instance is 
gracefully stopped or connection lost.
-   * @param clusterName
-   * @param participantName
-   */
-  public void deregisterParticipant(String clusterName, String 
participantName) {
-    HelixManager manager = 
_participantsMap.get(clusterName).remove(participantName);
-    if (manager != null) {
-      manager.disconnect();
-      removeChannel(participantName);
-    }
-  }
-
-  public void addChannel() {
-   // _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new 
ConcurrentHashMap<>());
-  }
-
-  public void removeChannel(String instanceName) {
-    _flagMap.remove(instanceName);
-  }
-
-  public AtomicBoolean sendMessage() {
-      AtomicBoolean flag = new AtomicBoolean(false);
-      return flag;
-  }
-
-  /**
-   * Entry point for receive the state transition response from the 
participant.
-   * It will update in memory state accordingly.
-   */
-  public void receiveSTResponse() {
-     // AtomicBoolean flag = 
_flagMap.get(instanceName).remove(response.getMessageId());
-  }
-
-  /**
-   * Stop the HelixGatewayService.
-   * It stops all participants in the cluster.
-   */
-  public void stop() {
-    // TODO: stop all participants
-    System.out.println("Stopping Helix Gateway Service");
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
new file mode 100644
index 000000000..37de51b42
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModel.java
@@ -0,0 +1,60 @@
+package org.apache.helix.gateway.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.StateTransitionError;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@StateModelInfo(initialState = "OFFLINE", states = {})
+public class HelixGatewayMultiTopStateStateModel extends StateModel {
+  private static final Logger _logger =
+      LoggerFactory.getLogger(HelixGatewayMultiTopStateStateModel.class);
+
+  private final HelixGatewayParticipant _helixGatewayParticipant;
+
+  public HelixGatewayMultiTopStateStateModel(
+      HelixGatewayParticipant helixGatewayParticipant) {
+    _helixGatewayParticipant = helixGatewayParticipant;
+  }
+
+  @Transition(to = "*", from = "*")
+  public void genericStateTransitionHandler(Message message, 
NotificationContext context)
+      throws Exception {
+    _helixGatewayParticipant.processStateTransitionMessage(message);
+  }
+
+  @Override
+  public void reset() {
+    // no-op we don't want to start from init state again.
+  }
+
+  @Override
+  public void rollbackOnError(Message message, NotificationContext context,
+      StateTransitionError error) {
+    _helixGatewayParticipant.handleStateTransitionError(message, error);
+  }
+}
\ No newline at end of file
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
similarity index 63%
rename from 
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
rename to 
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
index 7550fef51..64662998e 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayMultiTopStateStateModelFactory.java
@@ -19,19 +19,20 @@ package org.apache.helix.gateway.statemodel;
  * under the License.
  */
 
-import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
-public class HelixGatewayOnlineOfflineStateModelFactory extends 
StateModelFactory<HelixGatewayOnlineOfflineStateModel> {
-  private GatewayServiceManager _clusterManager;
+public class HelixGatewayMultiTopStateStateModelFactory extends 
StateModelFactory<HelixGatewayMultiTopStateStateModel> {
+  private final HelixGatewayParticipant _helixGatewayParticipant;
 
-  public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager 
clusterManager) {
-    _clusterManager = clusterManager;
+  public HelixGatewayMultiTopStateStateModelFactory(
+      HelixGatewayParticipant helixGatewayParticipant) {
+    _helixGatewayParticipant = helixGatewayParticipant;
   }
 
   @Override
-  public HelixGatewayOnlineOfflineStateModel createNewStateModel(String 
resourceName,
+  public HelixGatewayMultiTopStateStateModel createNewStateModel(String 
resourceName,
       String partitionKey) {
-    return new HelixGatewayOnlineOfflineStateModel(resourceName, partitionKey, 
_clusterManager);
+    return new HelixGatewayMultiTopStateStateModel(_helixGatewayParticipant);
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
deleted file mode 100644
index 4585ea378..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.helix.gateway.statemodel;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-
-public class HelixGatewayOnlineOfflineStateModel extends StateModel {
-  private boolean _firstTime = true;
-  private GatewayServiceManager _gatewayServiceManager;
-
-  private String _resourceName;
-  private String _partitionKey;
-
-  private AtomicBoolean _completed;
-
-  public HelixGatewayOnlineOfflineStateModel(String resourceName, String 
partitionKey,
-      GatewayServiceManager gatewayServiceManager) {
-    _resourceName = resourceName;
-    _partitionKey = partitionKey;
-    _gatewayServiceManager = gatewayServiceManager;
-  }
-
-  public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
-    if (_firstTime) {
-      
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
-      System.out.println(
-          "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName() + " with ADD for "
-              + message.getResourceName() + " processed");
-      _firstTime = false;
-    }
-    wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
-    System.out.println("Message for " + message.getPartitionName() + " 
instance " + message.getTgtName()
-        + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() 
+ " processed");
-  }
-
-  public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
-    wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
-    System.out.println("Message for " + message.getPartitionName() + " 
instance " + message.getTgtName()
-        + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() 
+ " processed");
-  }
-
-  public void onBecomeDroppedFromOffline(Message message, NotificationContext 
context) {
-    wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
-    System.out.println(
-        "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName() + " with REMOVE for "
-            + message.getResourceName() + " processed");
-  }
-
-  private void wait(AtomicBoolean completed) {
-    _completed = completed;
-    while (true) {
-      try {
-        if (_completed.get()) {
-          break;
-        }
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index 383f38b58..ecc6c9568 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -23,8 +23,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.gateway.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
+import org.apache.helix.model.Message;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
@@ -33,27 +37,63 @@ import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMe
 
 
 public final class StateTransitionMessageTranslateUtil {
+  /**
+   * Determine the transition type based on the current state and the target 
state.
+   *
+   * @param currentState current state
+   * @param toState      target state
+   * @return TransitionType
+   */
+  public static 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
translateStatesToTransitionType(
+      String currentState, String toState) {
+    boolean isUnassigned = 
HelixGatewayParticipant.UNASSIGNED_STATE.equals(currentState);
+    boolean isToStateDropped = 
HelixDefinedState.DROPPED.name().equals(toState);
 
-  public static TransitionMessage translateSTMsgToTransitionMessage() {
-    return null;
+    if (isToStateDropped && !isUnassigned) {
+      return 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD;
+    }
+    if (!isToStateDropped && isUnassigned) {
+      return 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD;
+    }
+    return 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE;
+  }
+
+  /**
+   * Translate from Helix ST Message to Helix Gateway Service 
TransitionMessage.
+   *
+   * @param message Message
+   * @return TransitionMessage
+   */
+  public static TransitionMessage translateSTMsgToTransitionMessage(Message 
message) {
+    return TransitionMessage.newBuilder().addRequest(
+        HelixGatewayServiceOuterClass.SingleTransitionMessage.newBuilder()
+            .setTransitionID(message.getMsgId()).setTransitionType(
+                translateStatesToTransitionType(message.getFromState(), 
message.getToState()))
+            
.setResourceID(message.getResourceName()).setShardID(message.getPartitionName())
+            .setTargetState(message.getToState()).build()).build();
   }
 
   /**
    * Translate from user sent ShardStateMessage message to Helix Gateway 
Service event.
+   *
+   * @param request ShardStateMessage message
+   *                contains the state of each shard upon connection or result 
of state transition request.
+   * @return GatewayServiceEvent
    */
   public static GatewayServiceEvent 
translateShardStateMessageToEvent(ShardStateMessage request) {
-
     GatewayServiceEvent.GateWayServiceEventBuilder builder;
     if (request.hasShardState()) { // init connection to gateway service
       ShardState shardState = request.getShardState();
-      Map<String, String> shardStateMap = new HashMap<>();
+      Map<String, Map<String, String>> shardStateMap = new HashMap<>();
       for (HelixGatewayServiceOuterClass.SingleResourceState resourceState : 
shardState.getResourceStateList()) {
         for (HelixGatewayServiceOuterClass.SingleShardState state : 
resourceState.getShardStatesList()) {
-          shardStateMap.put(resourceState.getResource() + "_" + 
state.getShardName(), state.getCurrentState());
+          shardStateMap.computeIfAbsent(resourceState.getResource(), k -> new 
HashMap<>())
+              .put(state.getShardName(), state.getCurrentState());
         }
       }
       builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.CONNECT).setClusterName(
-          
shardState.getClusterName()).setParticipantName(shardState.getInstanceName());
+              
shardState.getClusterName()).setParticipantName(shardState.getInstanceName())
+          .setShardStateMap(shardStateMap);
     } else {
       ShardTransitionStatus shardTransitionStatus = 
request.getShardTransitionStatus();
       // this is status update for established connection
@@ -63,7 +103,7 @@ public final class StateTransitionMessageTranslateUtil {
       for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus 
shardTransition : status) {
         GatewayServiceEvent.StateTransitionResult result =
             new 
GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(),
-                shardTransition.getCurrentState(), 
shardTransition.getCurrentState());
+                shardTransition.getIsSuccess(), 
shardTransition.getCurrentState());
         stResult.add(result);
       }
       builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName(
@@ -75,9 +115,12 @@ public final class StateTransitionMessageTranslateUtil {
   }
 
   /**
-   * Translate termination event to GatewayServiceEvent.
+   * Translate from client close to Helix Gateway Service event.
+   *
+   * @param instanceName the instance name to send the message to
+   * @param clusterName the cluster name
+   * @return GatewayServiceEvent
    */
-
   public static GatewayServiceEvent translateClientCloseToEvent(String 
instanceName, String clusterName) {
     GatewayServiceEvent.GateWayServiceEventBuilder builder =
         new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.DISCONNECT).setClusterName(
diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto 
b/helix-gateway/src/main/proto/HelixGatewayService.proto
index d32423a34..e7db5473e 100644
--- a/helix-gateway/src/main/proto/HelixGatewayService.proto
+++ b/helix-gateway/src/main/proto/HelixGatewayService.proto
@@ -30,11 +30,10 @@ message SingleTransitionMessage {
   TransitionType transitionType = 2;   // Transition type for shard operations
   string resourceID = 3;              // Resource ID
   string shardID = 4;                 // Shard to perform operation
-  optional string startState = 5;      // Shard start state
   string targetState = 6;     // Shard target state.
 }
 
-message TransitionMessage{
+message TransitionMessage {
   repeated SingleTransitionMessage request = 1;
 }
 
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
new file mode 100644
index 000000000..fda7fbb1b
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -0,0 +1,333 @@
+package org.apache.helix.gateway.participant;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceProcessor;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+public class TestHelixGatewayParticipant extends ZkTestBase {
+  private static final String CLUSTER_NAME = 
TestHelixGatewayParticipant.class.getSimpleName();
+  private static final int START_NUM_NODE = 2;
+  private static final String TEST_DB = "TestDB";
+  private static final String TEST_STATE_MODEL = "OnlineOffline";
+  private static final String CONTROLLER_PREFIX = "controller";
+  private static final String PARTICIPANT_PREFIX = "participant";
+
+  private ZkHelixClusterVerifier _clusterVerifier;
+  private ClusterControllerManager _controller;
+  private int _nextStartPort = 12000;
+  private final List<HelixGatewayParticipant> _participants = 
Lists.newArrayList();
+  private final Map<String, Message> _pendingMessageMap = new 
ConcurrentHashMap<>();
+
+  @BeforeClass
+  public void beforeClass() {
+    // Set up the Helix cluster
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
+    
clusterConfig.getRecord().setSimpleField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN,
 "true");
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    // Start initial participants
+    for (int i = 0; i < START_NUM_NODE; i++) {
+      addParticipant();
+    }
+
+    // Start the controller
+    String controllerName = CONTROLLER_PREFIX + '_' + CLUSTER_NAME;
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // Enable best possible assignment persistence
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // Clean up by disconnecting the controller and participants
+    _controller.disconnect();
+    for (HelixGatewayParticipant participant : _participants) {
+      participant.disconnect();
+    }
+  }
+
+  /**
+   * Add a participant with a specific initial state map.
+   */
+  private HelixGatewayParticipant addParticipant(String participantName,
+      Map<String, Map<String, String>> initialShardMap) {
+    HelixGatewayParticipant participant = new HelixGatewayParticipant.Builder(
+        new MockHelixGatewayServiceProcessor(_pendingMessageMap), 
participantName, CLUSTER_NAME,
+        ZK_ADDR).addMultiTopStateStateModelDefinition(TEST_STATE_MODEL)
+        .setInitialShardState(initialShardMap).build();
+    _participants.add(participant);
+    return participant;
+  }
+
+  /**
+   * Add a participant with an empty initial state map.
+   */
+  private HelixGatewayParticipant addParticipant() {
+    String participantName = PARTICIPANT_PREFIX + "_" + _nextStartPort++;
+    return addParticipant(participantName, Collections.emptyMap());
+  }
+
+  /**
+   * Remove a participant from the cluster.
+   */
+  private void deleteParticipant(HelixGatewayParticipant participant) {
+    participant.disconnect();
+    _participants.remove(participant);
+  }
+
+  /**
+   * Add a participant to the IdealState's preference list.
+   */
+  private void addToPreferenceList(HelixGatewayParticipant participant) {
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
+    idealState.getPreferenceLists().values()
+        .forEach(preferenceList -> 
preferenceList.add(participant.getInstanceName()));
+    
idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas())
 + 1));
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
TEST_DB, idealState);
+  }
+
+  /**
+   * Remove a participant from the IdealState's preference list.
+   */
+  private void removeFromPreferenceList(HelixGatewayParticipant participant) {
+    IdealState idealState =
+        
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, 
TEST_DB);
+    idealState.getPreferenceLists().values()
+        .forEach(preferenceList -> 
preferenceList.remove(participant.getInstanceName()));
+    
idealState.setReplicas(String.valueOf(Integer.parseInt(idealState.getReplicas())
 - 1));
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, 
TEST_DB, idealState);
+  }
+
+  /**
+   * Create a test database in the cluster with a semi-automatic state model.
+   */
+  private void createDB() {
+    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB,
+        _participants.stream().map(HelixGatewayParticipant::getInstanceName)
+            .collect(Collectors.toList()), TEST_STATE_MODEL, 1, 
_participants.size());
+
+    _clusterVerifier = new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(new HashSet<>(
+            
_gSetupTool.getClusterManagementTool().getResourcesInCluster(CLUSTER_NAME)))
+        
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+  }
+
+  /**
+   * Retrieve a pending message for a specific participant.
+   */
+  private Message getPendingMessage(String instanceName) {
+    return _pendingMessageMap.get(instanceName);
+  }
+
+  /**
+   * Process the pending message for a participant.
+   */
+  private void processPendingMessage(HelixGatewayParticipant participant, 
boolean isSuccess) {
+    Message message = _pendingMessageMap.remove(participant.getInstanceName());
+    participant.completeStateTransition(message.getMsgId(), isSuccess);
+  }
+
+  /**
+   * Get the current state of a Helix shard.
+   */
+  private String getHelixCurrentState(String instanceName, String resourceName,
+      String shardId) {
+    return _gSetupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, 
resourceName).getStateMap(shardId)
+        .getOrDefault(instanceName, HelixGatewayParticipant.UNASSIGNED_STATE);
+  }
+
+  /**
+   * Verify that all specified participants have pending messages.
+   */
+  private void verifyPendingMessages(List<HelixGatewayParticipant> 
participants) throws Exception {
+    Assert.assertTrue(TestHelper.verify(() -> participants.stream()
+            .allMatch(participant -> 
getPendingMessage(participant.getInstanceName()) != null),
+        TestHelper.WAIT_DURATION));
+  }
+
+  /**
+   * Verify that the gateway state matches the Helix state for all 
participants.
+   */
+  private void verifyGatewayStateMatchesHelixState() throws Exception {
+    Assert.assertTrue(TestHelper.verify(() -> 
_participants.stream().allMatch(participant -> {
+      String instanceName = participant.getInstanceName();
+      for (String resourceName : _gSetupTool.getClusterManagementTool()
+          .getResourcesInCluster(CLUSTER_NAME)) {
+        for (String shardId : _gSetupTool.getClusterManagementTool()
+            .getResourceIdealState(CLUSTER_NAME, 
resourceName).getPartitionSet()) {
+          String helixCurrentState =
+              getHelixCurrentState(instanceName, resourceName, shardId);
+          if (!participant.getCurrentState(resourceName, 
shardId).equals(helixCurrentState)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }), TestHelper.WAIT_DURATION));
+  }
+
+  /**
+   * Verify that all shards for a given instance are in a specific state.
+   */
+  private void verifyHelixPartitionStates(String instanceName, String state) 
throws Exception {
+    Assert.assertTrue(TestHelper.verify(() -> {
+      for (String resourceName : _gSetupTool.getClusterManagementTool()
+          .getResourcesInCluster(CLUSTER_NAME)) {
+        for (String shardId : _gSetupTool.getClusterManagementTool()
+            .getResourceIdealState(CLUSTER_NAME, 
resourceName).getPartitionSet()) {
+          if (!getHelixCurrentState(instanceName, resourceName, 
shardId).equals(state)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TestHelper.WAIT_DURATION));
+  }
+
+  @Test
+  public void testProcessStateTransitionMessageSuccess() throws Exception {
+    createDB();
+    verifyPendingMessages(_participants);
+
+    // Verify that all pending messages have the toState "ONLINE"
+    for (HelixGatewayParticipant participant : _participants) {
+      Message message = getPendingMessage(participant.getInstanceName());
+      Assert.assertNotNull(message);
+      Assert.assertEquals(message.getToState(), "ONLINE");
+    }
+
+    // Process all pending messages successfully
+    for (HelixGatewayParticipant participant : _participants) {
+      processPendingMessage(participant, true);
+    }
+
+    // Verify that the cluster converges and all states are "ONLINE"
+    Assert.assertTrue(_clusterVerifier.verify());
+    verifyGatewayStateMatchesHelixState();
+  }
+
+  @Test(dependsOnMethods = "testProcessStateTransitionMessageSuccess")
+  public void testProcessStateTransitionMessageFailure() throws Exception {
+    // Add a new participant and include it in the preference list
+    HelixGatewayParticipant participant = addParticipant();
+    addToPreferenceList(participant);
+    verifyPendingMessages(List.of(participant));
+
+    // Verify the pending message has the toState "ONLINE"
+    Message message = getPendingMessage(participant.getInstanceName());
+    Assert.assertNotNull(message);
+    Assert.assertEquals(message.getToState(), "ONLINE");
+
+    // Process the message with failure
+    processPendingMessage(participant, false);
+
+    // Verify that the cluster converges and states reflect the failure (e.g., 
"OFFLINE")
+    Assert.assertTrue(_clusterVerifier.verify());
+    verifyGatewayStateMatchesHelixState();
+
+    // Remove the participant from the preference list and delete it
+    removeFromPreferenceList(participant);
+    deleteParticipant(participant);
+    Assert.assertTrue(_clusterVerifier.verify());
+  }
+
+  @Test(dependsOnMethods = "testProcessStateTransitionMessageFailure")
+  public void testProcessStateTransitionAfterReconnect() throws Exception {
+    // Remove the first participant
+    HelixGatewayParticipant participant = _participants.get(0);
+    deleteParticipant(participant);
+
+    // Verify the Helix state transitions to "UNASSIGNED_STATE" for the 
participant
+    verifyHelixPartitionStates(participant.getInstanceName(),
+        HelixGatewayParticipant.UNASSIGNED_STATE);
+
+    // Re-add the participant with its initial state
+    addParticipant(participant.getInstanceName(), 
participant.getShardStateMap());
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    // Verify the Helix state is "ONLINE"
+    verifyHelixPartitionStates(participant.getInstanceName(), "ONLINE");
+  }
+
+  @Test(dependsOnMethods = "testProcessStateTransitionAfterReconnect")
+  public void testProcessStateTransitionAfterReconnectAfterDroppingPartition() 
throws Exception {
+    // Remove the first participant and verify state
+    HelixGatewayParticipant participant = _participants.get(0);
+    deleteParticipant(participant);
+    verifyHelixPartitionStates(participant.getInstanceName(),
+        HelixGatewayParticipant.UNASSIGNED_STATE);
+
+    // Remove shard preference and re-add the participant
+    removeFromPreferenceList(participant);
+    HelixGatewayParticipant participantReplacement =
+        addParticipant(participant.getInstanceName(), 
participant.getShardStateMap());
+    verifyPendingMessages(List.of(participantReplacement));
+
+    // Process the pending message successfully
+    processPendingMessage(participantReplacement, true);
+
+    // Verify that the cluster converges and states are correctly updated to 
"ONLINE"
+    Assert.assertTrue(_clusterVerifier.verify());
+    verifyGatewayStateMatchesHelixState();
+  }
+
+  public static class MockHelixGatewayServiceProcessor implements 
HelixGatewayServiceProcessor {
+    private final Map<String, Message> _pendingMessageMap;
+
+    public MockHelixGatewayServiceProcessor(Map<String, Message> 
pendingMessageMap) {
+      _pendingMessageMap = pendingMessageMap;
+    }
+
+    @Override
+    public void sendStateTransitionMessage(String instanceName, String 
currentState,
+        Message message) {
+      _pendingMessageMap.put(instanceName, message);
+    }
+  }
+}
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
index 01b78593c..a345f008e 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceManager.java
@@ -40,7 +40,6 @@ public class TestGatewayServiceManager {
 
     // Process disconnection event
     grpcService.report(null).onNext(disconnectionEvent);
-    HelixGatewayService gatewayService = 
manager.getHelixGatewayService("cluster1");
     // Verify the events were processed in sequence
     verify(manager, times(2)).newGatewayServiceEvent(any());
   }
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
new file mode 100644
index 000000000..34e23d0b0
--- /dev/null
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
@@ -0,0 +1,68 @@
+package org.apache.helix.gateway.utils;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixDefinedState;
+
+import org.apache.helix.gateway.participant.HelixGatewayParticipant;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
+public class TestStateTransitionMessageTranslateUtil {
+
+  @Test
+  public void testTranslateStatesToTransitionType_DeleteShard() {
+    String currentState = "ONLINE";
+    String toState = HelixDefinedState.DROPPED.name();
+
+    HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
result =
+        
StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState,
 toState);
+
+    Assert.assertEquals(result,
+        
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD,
+        "Expected DELETE_SHARD when transitioning to DROPPED state from a 
non-DROPPED state.");
+  }
+
+  @Test
+  public void testTranslateStatesToTransitionType_AddShard() {
+    String currentState = HelixGatewayParticipant.UNASSIGNED_STATE;
+    String toState = "ONLINE";
+
+    HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
result =
+        
StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState,
 toState);
+
+    Assert.assertEquals(result,
+        
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD,
+        "Expected ADD_SHARD when transitioning from DROPPED state to a 
non-DROPPED state.");
+  }
+
+  @Test
+  public void testTranslateStatesToTransitionType_ChangeRole() {
+    String currentState = "ONLINE";
+    String toState = "OFFLINE";
+
+    HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
result =
+        
StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState,
 toState);
+
+    Assert.assertEquals(result,
+        
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE,
+        "Expected CHANGE_ROLE when transitioning between non-DROPPED states.");
+  }
+}

Reply via email to