This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 46f3b18e7db949831e47966a7aafe1a3d655f170
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Wed Aug 28 09:35:42 2024 -0700

    Gateway - User report their shards' current state instead of state 
transition message (#2892)
    
    Gateway - User report their shards' current state instead of state 
transition message
---
 .../org/apache/helix/gateway/HelixGatewayMain.java |  2 +-
 .../api/service/HelixGatewayServiceChannel.java    |  6 ++--
 .../channel/HelixGatewayServiceChannelFactory.java |  1 -
 .../channel/HelixGatewayServiceGrpcService.java    | 25 ++++++-------
 .../HelixGatewayServicePollModeChannel.java        |  7 ++--
 .../participant/HelixGatewayParticipant.java       | 31 ++++++++--------
 .../helix/gateway/service/GatewayServiceEvent.java | 23 ++++++------
 .../gateway/service/GatewayServiceManager.java     | 11 +++---
 .../util/StateTransitionMessageTranslateUtil.java  | 27 +++++++-------
 .../src/main/proto/HelixGatewayService.proto       | 25 +++++++------
 .../participant/TestHelixGatewayParticipant.java   | 42 ++++++++++++----------
 .../service/TestGatewayServiceConnection.java      |  4 +--
 .../TestStateTransitionMessageTranslateUtil.java   | 12 +++----
 13 files changed, 109 insertions(+), 107 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java 
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
index df6230ca6..5ad06d908 100644
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
@@ -20,8 +20,8 @@ package org.apache.helix.gateway;
  */
 
 import java.io.IOException;
-import org.apache.helix.gateway.service.GatewayServiceManager;
 import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.service.GatewayServiceManager;
 
 import static java.lang.Integer.*;
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
index 02c0e8a8d..48e3e437d 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java
@@ -22,7 +22,7 @@ package org.apache.helix.gateway.api.service;
 import java.io.IOException;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.model.Message;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
 
 
 /**
@@ -35,10 +35,8 @@ public interface HelixGatewayServiceChannel {
    * Gateway service send a state transition message to a connected 
participant.
    *
    * @param instanceName the name of the participant
-   * @param currentState the current state of the shard
-   * @param message      the message to send
    */
-  void sendStateTransitionMessage(String instanceName, String currentState, 
Message message);
+  void sendStateChangeRequests(String instanceName, 
HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests);
 
   /**
    * Send a GatewayServiceEvent to gateway manager for helix instances changes.
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
index 4c20b1977..fa665a5c8 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java
@@ -19,7 +19,6 @@ package org.apache.helix.gateway.channel;
  * under the License.
  */
 
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.service.GatewayServiceManager;
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index ea867cacf..8e9b0882b 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -30,18 +30,18 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.gateway.service.GatewayServiceManager;
-import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.util.PerKeyLockRegistry;
 import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
-import org.apache.helix.model.Message;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
 
 
 /**
@@ -53,10 +53,10 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   private static final Logger logger = 
LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class);
 
   // Map to store the observer for each instance
-  private final Map<String, StreamObserver<TransitionMessage>> _observerMap = 
new HashMap<>();
+  private final Map<String, StreamObserver<ShardChangeRequests>> _observerMap 
= new HashMap<>();
   // A reverse map to store the instance name for each observer. It is used to 
find the instance when connection is closed.
   // map<observer, pair<instance, cluster>>
-  private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> 
_reversedObserverMap = new HashMap<>();
+  private final Map<StreamObserver<ShardChangeRequests>, Pair<String, String>> 
_reversedObserverMap = new HashMap<>();
 
   private final GatewayServiceManager _manager;
 
@@ -82,7 +82,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    */
   @Override
   public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
-      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage>
 responseObserver) {
+      
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests>
 responseObserver) {
 
     return new StreamObserver<ShardStateMessage>() {
 
@@ -118,15 +118,16 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
    * The instance must already have established a connection to the gateway 
service.
    *
    * @param instanceName the instance name to send the message to
-   * @param currentState the current state of shard
    * @param message the message to convert to the transition message
    */
   @Override
-  public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
-    StreamObserver<TransitionMessage> observer;
+  public void sendStateChangeRequests(String instanceName, ShardChangeRequests 
requests) {
+    StreamObserver<HelixGatewayServiceOuterClass.ShardChangeRequests> observer;
     observer = _observerMap.get(instanceName);
     if (observer != null) {
-      
observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message));
+      observer.onNext(requests);
+    } else {
+      logger.error("Instance {} is not connected to the gateway service", 
instanceName);
     }
   }
 
@@ -152,7 +153,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   }
 
   private void closeConnectionHelper(String instanceName, String errorReason, 
boolean withError) {
-    StreamObserver<TransitionMessage> observer;
+    StreamObserver<ShardChangeRequests> observer;
     observer = _observerMap.get(instanceName);
     if (observer != null) {
       if (withError) {
@@ -180,7 +181,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   }
 
   private void updateObserver(String instanceName, String clusterName,
-      StreamObserver<TransitionMessage> streamObserver) {
+      StreamObserver<ShardChangeRequests> streamObserver) {
     _lockRegistry.withLock(instanceName, () -> {
       _observerMap.put(instanceName, streamObserver);
       _reversedObserverMap.put(streamObserver, new 
ImmutablePair<>(instanceName, clusterName));
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
index 30a9f1802..7623b8d2a 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java
@@ -21,7 +21,8 @@ package org.apache.helix.gateway.channel;
 
 import java.io.IOException;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
-import org.apache.helix.model.Message;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+
 
 // TODO: implement this class
 public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceChannel {
@@ -30,8 +31,8 @@ public class HelixGatewayServicePollModeChannel implements 
HelixGatewayServiceCh
   }
 
   @Override
-  public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
-
+  public void sendStateChangeRequests(String instanceName,
+      HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) {
   }
 
   @Override
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
index d60905d3c..15e080bb8 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java
@@ -19,19 +19,19 @@ package org.apache.helix.gateway.participant;
  * under the License.
  */
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import 
org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory;
+import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil;
 import org.apache.helix.manager.zk.HelixManagerStateListener;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.Message;
@@ -49,7 +49,8 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
   private final HelixManager _helixManager;
   private final Runnable _onDisconnectedCallback;
   private final Map<String, Map<String, String>> _shardStateMap;
-  private final Map<String, CompletableFuture<Boolean>> 
_stateTransitionResultMap;
+
+  private final Map<String, CompletableFuture<String>> 
_stateTransitionResultMap;
 
   private HelixGatewayParticipant(HelixGatewayServiceChannel 
gatewayServiceChannel,
       Runnable onDisconnectedCallback, HelixManager helixManager,
@@ -62,28 +63,29 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
   }
 
   public void processStateTransitionMessage(Message message) throws Exception {
-    String transitionId = message.getMsgId();
     String resourceId = message.getResourceName();
     String shardId = message.getPartitionName();
     String toState = message.getToState();
+    String concatenatedShardName = resourceId + shardId;
 
     try {
       if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) {
         return;
       }
 
-      CompletableFuture<Boolean> future = new CompletableFuture<>();
-      _stateTransitionResultMap.put(transitionId, future);
-      
_gatewayServiceChannel.sendStateTransitionMessage(_helixManager.getInstanceName(),
-          getCurrentState(resourceId, shardId), message);
+      CompletableFuture<String> future = new CompletableFuture<>();
+
+      _stateTransitionResultMap.put(concatenatedShardName, future);
+      
_gatewayServiceChannel.sendStateChangeRequests(_helixManager.getInstanceName(),
+          
StateTransitionMessageTranslateUtil.translateSTMsgToShardChangeRequests(message));
 
-      if (!future.get()) {
+      if (!toState.equals(future.get())) {
         throw new Exception("Failed to transition to state " + toState);
       }
 
       updateState(resourceId, shardId, toState);
     } finally {
-      _stateTransitionResultMap.remove(transitionId);
+      _stateTransitionResultMap.remove(concatenatedShardName);
     }
   }
 
@@ -117,13 +119,12 @@ public class HelixGatewayParticipant implements 
HelixManagerStateListener {
   /**
    * Completes the state transition with the given transitionId.
    *
-   * @param transitionId the transitionId to complete
-   * @param isSuccess    whether the state transition was successful
    */
-  public void completeStateTransition(String transitionId, boolean isSuccess) {
-    CompletableFuture<Boolean> future = 
_stateTransitionResultMap.get(transitionId);
+  public void completeStateTransition(String resourceId, String shardId, 
String currentState) {
+    String concatenatedShardName = resourceId + shardId;
+    CompletableFuture<String> future = 
_stateTransitionResultMap.get(concatenatedShardName);
     if (future != null) {
-      future.complete(isSuccess);
+      future.complete(currentState);
     }
   }
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
index f5c66dea0..1c8c20429 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java
@@ -39,25 +39,26 @@ public class GatewayServiceEvent {
   private List<StateTransitionResult> _stateTransitionResult;
 
   public static class StateTransitionResult {
-    private String stateTransitionId;
-    private boolean isSuccess;
+
+    private String resourceName;
+    private String shardName;
     private String shardState;
 
-    public StateTransitionResult(String stateTransitionId, boolean isSuccess, 
String shardState) {
-      this.stateTransitionId = stateTransitionId;
-      this.isSuccess = isSuccess;
+    public StateTransitionResult(String resourceName,String shardName, String 
shardState) {
       this.shardState = shardState;
+      this.shardName = shardName;
+      this.resourceName = resourceName;
     }
 
-    public String getStateTransitionId() {
-      return stateTransitionId;
-    }
-    public boolean getIsSuccess() {
-      return isSuccess;
-    }
     public String getShardState() {
       return shardState;
     }
+    public String getShardName() {
+      return shardName;
+    }
+    public String getResourceName() {
+      return resourceName;
+    }
   }
 
   private GatewayServiceEvent(GatewayServiceEventType eventType, String 
clusterName, String instanceName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 289c5513c..cceabc887 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -19,18 +19,17 @@ package org.apache.helix.gateway.service;
  * under the License.
  */
 
+import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
-import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
 import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
+import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
+import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.util.PerKeyBlockingExecutor;
 
@@ -104,8 +103,8 @@ public class GatewayServiceManager {
         return;
       }
       _event.getStateTransitionResult().forEach(stateTransitionResult -> {
-        
participant.completeStateTransition(stateTransitionResult.getStateTransitionId(),
-            stateTransitionResult.getIsSuccess());
+        
participant.completeStateTransition(stateTransitionResult.getResourceName(),
+            stateTransitionResult.getShardName(), 
stateTransitionResult.getShardState());
       });
     }
   }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
index 8f45407c0..c869c9c55 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -23,18 +23,17 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
 import org.apache.helix.gateway.participant.HelixGatewayParticipant;
 import org.apache.helix.gateway.service.GatewayServiceEvent;
 import org.apache.helix.model.Message;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
 import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus;
-import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
-
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.SingleShardChangeRequest;
 
 public final class StateTransitionMessageTranslateUtil {
   /**
@@ -44,18 +43,18 @@ public final class StateTransitionMessageTranslateUtil {
    * @param toState      target state
    * @return TransitionType
    */
-  public static 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
translateStatesToTransitionType(
+  public static 
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType 
translateStatesToTransitionType(
       String currentState, String toState) {
     boolean isUnassigned = 
HelixGatewayParticipant.UNASSIGNED_STATE.equals(currentState);
     boolean isToStateDropped = 
HelixDefinedState.DROPPED.name().equals(toState);
 
     if (isToStateDropped && !isUnassigned) {
-      return 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD;
+      return 
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.DELETE_SHARD;
     }
     if (!isToStateDropped && isUnassigned) {
-      return 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD;
+      return 
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.ADD_SHARD;
     }
-    return 
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE;
+    return 
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.CHANGE_ROLE;
   }
 
   /**
@@ -64,12 +63,12 @@ public final class StateTransitionMessageTranslateUtil {
    * @param message Message
    * @return TransitionMessage
    */
-  public static TransitionMessage translateSTMsgToTransitionMessage(Message 
message) {
-    return TransitionMessage.newBuilder().addRequest(
-        HelixGatewayServiceOuterClass.SingleTransitionMessage.newBuilder()
-            .setTransitionID(message.getMsgId()).setTransitionType(
+  public static ShardChangeRequests 
translateSTMsgToShardChangeRequests(Message message) {
+    return ShardChangeRequests.newBuilder().addRequest(
+        SingleShardChangeRequest.newBuilder()
+            .setStateChangeRequestType(
                 translateStatesToTransitionType(message.getFromState(), 
message.getToState()))
-            
.setResourceID(message.getResourceName()).setShardID(message.getPartitionName())
+            
.setResourceName(message.getResourceName()).setShardName(message.getPartitionName())
             .setTargetState(message.getToState()).build()).build();
   }
 
@@ -102,8 +101,8 @@ public final class StateTransitionMessageTranslateUtil {
       List<GatewayServiceEvent.StateTransitionResult> stResult = new 
ArrayList<>();
       for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus 
shardTransition : status) {
         GatewayServiceEvent.StateTransitionResult result =
-            new 
GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(),
-                shardTransition.getIsSuccess(), 
shardTransition.getCurrentState());
+            new 
GatewayServiceEvent.StateTransitionResult(shardTransition.getResourceName(),
+                shardTransition.getShardName(), 
shardTransition.getCurrentState());
         stResult.add(result);
       }
       builder = new 
GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName(
diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto 
b/helix-gateway/src/main/proto/HelixGatewayService.proto
index e7db5473e..50553e372 100644
--- a/helix-gateway/src/main/proto/HelixGatewayService.proto
+++ b/helix-gateway/src/main/proto/HelixGatewayService.proto
@@ -20,21 +20,20 @@ syntax = "proto3";
 
 package proto.org.apache.helix.gateway;
 
-message SingleTransitionMessage {
-  enum TransitionType {
+message SingleShardChangeRequest {
+  enum StateChangeRequestType {
     ADD_SHARD = 0;
     DELETE_SHARD = 1;
     CHANGE_ROLE = 2;
   }
-  string transitionID = 1;    // ID of transition message
-  TransitionType transitionType = 2;   // Transition type for shard operations
-  string resourceID = 3;              // Resource ID
-  string shardID = 4;                 // Shard to perform operation
-  string targetState = 6;     // Shard target state.
+  StateChangeRequestType stateChangeRequestType = 1;   // Transition type for 
shard operations
+  string resourceName = 2;              // Resource ID
+  string shardName = 3;                 // Shard to perform operation
+  string targetState = 4;     // Shard target state.
 }
 
-message TransitionMessage {
-  repeated SingleTransitionMessage request = 1;
+message ShardChangeRequests {
+  repeated SingleShardChangeRequest request = 1;
 }
 
 message SingleResourceState {
@@ -48,9 +47,9 @@ message SingleShardState {
 }
 
 message SingleShardTransitionStatus {
-  string transitionID = 1;       // ID of transition message
-  bool isSuccess = 2;           // Was transition successfully performed
-  optional string currentState = 3;   // If it failed, what is the current 
state it should reported as.
+  string resourceName = 1;        // resource name
+  string shardName = 2;           // shard name
+  string currentState = 3;   // If it failed, what is the current state it 
should reported as.
 }
 
 message ShardTransitionStatus{
@@ -75,6 +74,6 @@ message ShardStateMessage{
 }
 
 service HelixGatewayService {
-  rpc report(stream ShardStateMessage) returns (stream TransitionMessage) {}
+  rpc report(stream ShardStateMessage) returns (stream ShardChangeRequests) {}
 }
 
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
index 2c593ca12..69882fc66 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java
@@ -36,7 +36,6 @@ import 
org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Message;
 import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
 import org.testng.Assert;
@@ -44,6 +43,8 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
+import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests;
 
 
 public class TestHelixGatewayParticipant extends ZkTestBase {
@@ -58,7 +59,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase {
   private ClusterControllerManager _controller;
   private int _nextStartPort = 12000;
   private final List<HelixGatewayParticipant> _participants = 
Lists.newArrayList();
-  private final Map<String, Message> _pendingMessageMap = new 
ConcurrentHashMap<>();
+  private final Map<String, ShardChangeRequests> _pendingMessageMap = new 
ConcurrentHashMap<>();
   private final AtomicInteger _onDisconnectCallbackCount = new AtomicInteger();
 
   @BeforeClass
@@ -164,16 +165,18 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
   /**
    * Retrieve a pending message for a specific participant.
    */
-  private Message getPendingMessage(String instanceName) {
+  private ShardChangeRequests getPendingMessage(String instanceName) {
     return _pendingMessageMap.get(instanceName);
   }
 
   /**
    * Process the pending message for a participant.
    */
-  private void processPendingMessage(HelixGatewayParticipant participant, 
boolean isSuccess) {
-    Message message = _pendingMessageMap.remove(participant.getInstanceName());
-    participant.completeStateTransition(message.getMsgId(), isSuccess);
+  private void processPendingMessage(HelixGatewayParticipant participant, 
boolean isSuccess, String toState) {
+    ShardChangeRequests requests = 
_pendingMessageMap.remove(participant.getInstanceName());
+
+    
participant.completeStateTransition(requests.getRequest(0).getResourceName(),requests.getRequest(0).getShardName(),
+        isSuccess ? toState : "WRONG_STATE");
   }
 
   /**
@@ -240,14 +243,14 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
 
     // Verify that all pending messages have the toState "ONLINE"
     for (HelixGatewayParticipant participant : _participants) {
-      Message message = getPendingMessage(participant.getInstanceName());
-      Assert.assertNotNull(message);
-      Assert.assertEquals(message.getToState(), "ONLINE");
+     HelixGatewayServiceOuterClass.SingleShardChangeRequest request = 
getPendingMessage(participant.getInstanceName()).getRequest(0);
+      Assert.assertNotNull(request);
+      Assert.assertEquals(request.getTargetState(), "ONLINE");
     }
 
     // Process all pending messages successfully
     for (HelixGatewayParticipant participant : _participants) {
-      processPendingMessage(participant, true);
+      processPendingMessage(participant, true, "ONLINE");
     }
 
     // Verify that the cluster converges and all states are "ONLINE"
@@ -263,12 +266,12 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
     verifyPendingMessages(List.of(participant));
 
     // Verify the pending message has the toState "ONLINE"
-    Message message = getPendingMessage(participant.getInstanceName());
-    Assert.assertNotNull(message);
-    Assert.assertEquals(message.getToState(), "ONLINE");
+    HelixGatewayServiceOuterClass.SingleShardChangeRequest request = 
getPendingMessage(participant.getInstanceName()).getRequest(0);
+    Assert.assertNotNull(request);
+    Assert.assertEquals(request.getTargetState(), "ONLINE");
 
     // Process the message with failure
-    processPendingMessage(participant, false);
+    processPendingMessage(participant, false, "ONLINE");
 
     // Verify that the cluster converges and states reflect the failure (e.g., 
"OFFLINE")
     Assert.assertTrue(_clusterVerifier.verify());
@@ -311,7 +314,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase 
{
     verifyPendingMessages(List.of(participantReplacement));
 
     // Process the pending message successfully
-    processPendingMessage(participantReplacement, true);
+    processPendingMessage(participantReplacement, true, "DROPPED");
 
     // Verify that the cluster converges and states are correctly updated to 
"ONLINE"
     Assert.assertTrue(_clusterVerifier.verify());
@@ -344,17 +347,18 @@ public class TestHelixGatewayParticipant extends 
ZkTestBase {
   }
 
   public static class MockHelixGatewayServiceChannel implements 
HelixGatewayServiceChannel {
-    private final Map<String, Message> _pendingMessageMap;
+    private final Map<String, ShardChangeRequests> _pendingMessageMap;
     private static final AtomicInteger _gracefulDisconnectCount = new 
AtomicInteger();
     private static final AtomicInteger _errorDisconnectCount = new 
AtomicInteger();
 
-    public MockHelixGatewayServiceChannel(Map<String, Message> 
pendingMessageMap) {
+    public MockHelixGatewayServiceChannel(Map<String, ShardChangeRequests> 
pendingMessageMap) {
       _pendingMessageMap = pendingMessageMap;
     }
 
     @Override
-    public void sendStateTransitionMessage(String instanceName, String 
currentState, Message message) {
-      _pendingMessageMap.put(instanceName, message);
+    public void sendStateChangeRequests(String instanceName,
+        HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) 
{
+      _pendingMessageMap.put(instanceName, shardChangeRequests);
     }
 
     @Override
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
index 19c92f5e2..f3221a12c 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -91,9 +91,9 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
     }
 
     public void connect() {
-      _requestObserver = asyncStub.report(new 
StreamObserver<HelixGatewayServiceOuterClass.TransitionMessage>() {
+      _requestObserver = asyncStub.report(new 
StreamObserver<HelixGatewayServiceOuterClass.ShardChangeRequests>() {
         @Override
-        public void onNext(HelixGatewayServiceOuterClass.TransitionMessage 
value) {
+        public void onNext(HelixGatewayServiceOuterClass.ShardChangeRequests 
value) {
           // Handle response from server
         }
 
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
index 34e23d0b0..65c299122 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java
@@ -32,11 +32,11 @@ public class TestStateTransitionMessageTranslateUtil {
     String currentState = "ONLINE";
     String toState = HelixDefinedState.DROPPED.name();
 
-    HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
result =
+    
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType 
result =
         
StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState,
 toState);
 
     Assert.assertEquals(result,
-        
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD,
+        
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.DELETE_SHARD,
         "Expected DELETE_SHARD when transitioning to DROPPED state from a 
non-DROPPED state.");
   }
 
@@ -45,11 +45,11 @@ public class TestStateTransitionMessageTranslateUtil {
     String currentState = HelixGatewayParticipant.UNASSIGNED_STATE;
     String toState = "ONLINE";
 
-    HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
result =
+    
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType 
result =
         
StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState,
 toState);
 
     Assert.assertEquals(result,
-        
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD,
+        
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.ADD_SHARD,
         "Expected ADD_SHARD when transitioning from DROPPED state to a 
non-DROPPED state.");
   }
 
@@ -58,11 +58,11 @@ public class TestStateTransitionMessageTranslateUtil {
     String currentState = "ONLINE";
     String toState = "OFFLINE";
 
-    HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType 
result =
+    
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType 
result =
         
StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState,
 toState);
 
     Assert.assertEquals(result,
-        
HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE,
+        
HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.CHANGE_ROLE,
         "Expected CHANGE_ROLE when transitioning between non-DROPPED states.");
   }
 }


Reply via email to