This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 46f3b18e7db949831e47966a7aafe1a3d655f170 Author: xyuanlu <xyua...@gmail.com> AuthorDate: Wed Aug 28 09:35:42 2024 -0700 Gateway - User report their shards' current state instead of state transition message (#2892) Gateway - User report their shards' current state instead of state transition message --- .../org/apache/helix/gateway/HelixGatewayMain.java | 2 +- .../api/service/HelixGatewayServiceChannel.java | 6 ++-- .../channel/HelixGatewayServiceChannelFactory.java | 1 - .../channel/HelixGatewayServiceGrpcService.java | 25 ++++++------- .../HelixGatewayServicePollModeChannel.java | 7 ++-- .../participant/HelixGatewayParticipant.java | 31 ++++++++-------- .../helix/gateway/service/GatewayServiceEvent.java | 23 ++++++------ .../gateway/service/GatewayServiceManager.java | 11 +++--- .../util/StateTransitionMessageTranslateUtil.java | 27 +++++++------- .../src/main/proto/HelixGatewayService.proto | 25 +++++++------ .../participant/TestHelixGatewayParticipant.java | 42 ++++++++++++---------- .../service/TestGatewayServiceConnection.java | 4 +-- .../TestStateTransitionMessageTranslateUtil.java | 12 +++---- 13 files changed, 109 insertions(+), 107 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java index df6230ca6..5ad06d908 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java @@ -20,8 +20,8 @@ package org.apache.helix.gateway; */ import java.io.IOException; -import org.apache.helix.gateway.service.GatewayServiceManager; import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; +import org.apache.helix.gateway.service.GatewayServiceManager; import static java.lang.Integer.*; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java index 02c0e8a8d..48e3e437d 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/api/service/HelixGatewayServiceChannel.java @@ -22,7 +22,7 @@ package org.apache.helix.gateway.api.service; import java.io.IOException; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.model.Message; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; /** @@ -35,10 +35,8 @@ public interface HelixGatewayServiceChannel { * Gateway service send a state transition message to a connected participant. * * @param instanceName the name of the participant - * @param currentState the current state of the shard - * @param message the message to send */ - void sendStateTransitionMessage(String instanceName, String currentState, Message message); + void sendStateChangeRequests(String instanceName, HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests); /** * Send a GatewayServiceEvent to gateway manager for helix instances changes. diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java index 4c20b1977..fa665a5c8 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceChannelFactory.java @@ -19,7 +19,6 @@ package org.apache.helix.gateway.channel; * under the License. */ -import org.apache.commons.lang3.NotImplementedException; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.service.GatewayServiceManager; diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index ea867cacf..8e9b0882b 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -30,18 +30,18 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.gateway.service.GatewayServiceManager; -import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.util.PerKeyLockRegistry; import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; -import org.apache.helix.model.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; -import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage; /** @@ -53,10 +53,10 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli private static final Logger logger = LoggerFactory.getLogger(HelixGatewayServiceGrpcService.class); // Map to store the observer for each instance - private final Map<String, StreamObserver<TransitionMessage>> _observerMap = new HashMap<>(); + private final Map<String, StreamObserver<ShardChangeRequests>> _observerMap = new HashMap<>(); // A reverse map to store the instance name for each observer. It is used to find the instance when connection is closed. // map<observer, pair<instance, cluster>> - private final Map<StreamObserver<TransitionMessage>, Pair<String, String>> _reversedObserverMap = new HashMap<>(); + private final Map<StreamObserver<ShardChangeRequests>, Pair<String, String>> _reversedObserverMap = new HashMap<>(); private final GatewayServiceManager _manager; @@ -82,7 +82,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli */ @Override public StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage> report( - StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage> responseObserver) { + StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests> responseObserver) { return new StreamObserver<ShardStateMessage>() { @@ -118,15 +118,16 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli * The instance must already have established a connection to the gateway service. * * @param instanceName the instance name to send the message to - * @param currentState the current state of shard * @param message the message to convert to the transition message */ @Override - public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { - StreamObserver<TransitionMessage> observer; + public void sendStateChangeRequests(String instanceName, ShardChangeRequests requests) { + StreamObserver<HelixGatewayServiceOuterClass.ShardChangeRequests> observer; observer = _observerMap.get(instanceName); if (observer != null) { - observer.onNext(StateTransitionMessageTranslateUtil.translateSTMsgToTransitionMessage(message)); + observer.onNext(requests); + } else { + logger.error("Instance {} is not connected to the gateway service", instanceName); } } @@ -152,7 +153,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli } private void closeConnectionHelper(String instanceName, String errorReason, boolean withError) { - StreamObserver<TransitionMessage> observer; + StreamObserver<ShardChangeRequests> observer; observer = _observerMap.get(instanceName); if (observer != null) { if (withError) { @@ -180,7 +181,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli } private void updateObserver(String instanceName, String clusterName, - StreamObserver<TransitionMessage> streamObserver) { + StreamObserver<ShardChangeRequests> streamObserver) { _lockRegistry.withLock(instanceName, () -> { _observerMap.put(instanceName, streamObserver); _reversedObserverMap.put(streamObserver, new ImmutablePair<>(instanceName, clusterName)); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java index 30a9f1802..7623b8d2a 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServicePollModeChannel.java @@ -21,7 +21,8 @@ package org.apache.helix.gateway.channel; import java.io.IOException; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; -import org.apache.helix.model.Message; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; + // TODO: implement this class public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceChannel { @@ -30,8 +31,8 @@ public class HelixGatewayServicePollModeChannel implements HelixGatewayServiceCh } @Override - public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { - + public void sendStateChangeRequests(String instanceName, + HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) { } @Override diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java index d60905d3c..15e080bb8 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/participant/HelixGatewayParticipant.java @@ -19,19 +19,19 @@ package org.apache.helix.gateway.participant; * under the License. */ +import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; - -import com.google.common.annotations.VisibleForTesting; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.statemodel.HelixGatewayMultiTopStateStateModelFactory; +import org.apache.helix.gateway.util.StateTransitionMessageTranslateUtil; import org.apache.helix.manager.zk.HelixManagerStateListener; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.Message; @@ -49,7 +49,8 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { private final HelixManager _helixManager; private final Runnable _onDisconnectedCallback; private final Map<String, Map<String, String>> _shardStateMap; - private final Map<String, CompletableFuture<Boolean>> _stateTransitionResultMap; + + private final Map<String, CompletableFuture<String>> _stateTransitionResultMap; private HelixGatewayParticipant(HelixGatewayServiceChannel gatewayServiceChannel, Runnable onDisconnectedCallback, HelixManager helixManager, @@ -62,28 +63,29 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { } public void processStateTransitionMessage(Message message) throws Exception { - String transitionId = message.getMsgId(); String resourceId = message.getResourceName(); String shardId = message.getPartitionName(); String toState = message.getToState(); + String concatenatedShardName = resourceId + shardId; try { if (isCurrentStateAlreadyTarget(resourceId, shardId, toState)) { return; } - CompletableFuture<Boolean> future = new CompletableFuture<>(); - _stateTransitionResultMap.put(transitionId, future); - _gatewayServiceChannel.sendStateTransitionMessage(_helixManager.getInstanceName(), - getCurrentState(resourceId, shardId), message); + CompletableFuture<String> future = new CompletableFuture<>(); + + _stateTransitionResultMap.put(concatenatedShardName, future); + _gatewayServiceChannel.sendStateChangeRequests(_helixManager.getInstanceName(), + StateTransitionMessageTranslateUtil.translateSTMsgToShardChangeRequests(message)); - if (!future.get()) { + if (!toState.equals(future.get())) { throw new Exception("Failed to transition to state " + toState); } updateState(resourceId, shardId, toState); } finally { - _stateTransitionResultMap.remove(transitionId); + _stateTransitionResultMap.remove(concatenatedShardName); } } @@ -117,13 +119,12 @@ public class HelixGatewayParticipant implements HelixManagerStateListener { /** * Completes the state transition with the given transitionId. * - * @param transitionId the transitionId to complete - * @param isSuccess whether the state transition was successful */ - public void completeStateTransition(String transitionId, boolean isSuccess) { - CompletableFuture<Boolean> future = _stateTransitionResultMap.get(transitionId); + public void completeStateTransition(String resourceId, String shardId, String currentState) { + String concatenatedShardName = resourceId + shardId; + CompletableFuture<String> future = _stateTransitionResultMap.get(concatenatedShardName); if (future != null) { - future.complete(isSuccess); + future.complete(currentState); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java index f5c66dea0..1c8c20429 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceEvent.java @@ -39,25 +39,26 @@ public class GatewayServiceEvent { private List<StateTransitionResult> _stateTransitionResult; public static class StateTransitionResult { - private String stateTransitionId; - private boolean isSuccess; + + private String resourceName; + private String shardName; private String shardState; - public StateTransitionResult(String stateTransitionId, boolean isSuccess, String shardState) { - this.stateTransitionId = stateTransitionId; - this.isSuccess = isSuccess; + public StateTransitionResult(String resourceName,String shardName, String shardState) { this.shardState = shardState; + this.shardName = shardName; + this.resourceName = resourceName; } - public String getStateTransitionId() { - return stateTransitionId; - } - public boolean getIsSuccess() { - return isSuccess; - } public String getShardState() { return shardState; } + public String getShardName() { + return shardName; + } + public String getResourceName() { + return resourceName; + } } private GatewayServiceEvent(GatewayServiceEventType eventType, String clusterName, String instanceName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 289c5513c..cceabc887 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -19,18 +19,17 @@ package org.apache.helix.gateway.service; * under the License. */ +import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - -import com.google.common.collect.ImmutableSet; -import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; -import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; +import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; +import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.util.PerKeyBlockingExecutor; @@ -104,8 +103,8 @@ public class GatewayServiceManager { return; } _event.getStateTransitionResult().forEach(stateTransitionResult -> { - participant.completeStateTransition(stateTransitionResult.getStateTransitionId(), - stateTransitionResult.getIsSuccess()); + participant.completeStateTransition(stateTransitionResult.getResourceName(), + stateTransitionResult.getShardName(), stateTransitionResult.getShardState()); }); } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java index 8f45407c0..c869c9c55 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java @@ -23,18 +23,17 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.helix.HelixDefinedState; import org.apache.helix.gateway.api.constant.GatewayServiceEventType; import org.apache.helix.gateway.participant.HelixGatewayParticipant; import org.apache.helix.gateway.service.GatewayServiceEvent; import org.apache.helix.model.Message; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardState; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardTransitionStatus; -import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage; - +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.SingleShardChangeRequest; public final class StateTransitionMessageTranslateUtil { /** @@ -44,18 +43,18 @@ public final class StateTransitionMessageTranslateUtil { * @param toState target state * @return TransitionType */ - public static HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType translateStatesToTransitionType( + public static HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType translateStatesToTransitionType( String currentState, String toState) { boolean isUnassigned = HelixGatewayParticipant.UNASSIGNED_STATE.equals(currentState); boolean isToStateDropped = HelixDefinedState.DROPPED.name().equals(toState); if (isToStateDropped && !isUnassigned) { - return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD; + return HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.DELETE_SHARD; } if (!isToStateDropped && isUnassigned) { - return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD; + return HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.ADD_SHARD; } - return HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE; + return HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.CHANGE_ROLE; } /** @@ -64,12 +63,12 @@ public final class StateTransitionMessageTranslateUtil { * @param message Message * @return TransitionMessage */ - public static TransitionMessage translateSTMsgToTransitionMessage(Message message) { - return TransitionMessage.newBuilder().addRequest( - HelixGatewayServiceOuterClass.SingleTransitionMessage.newBuilder() - .setTransitionID(message.getMsgId()).setTransitionType( + public static ShardChangeRequests translateSTMsgToShardChangeRequests(Message message) { + return ShardChangeRequests.newBuilder().addRequest( + SingleShardChangeRequest.newBuilder() + .setStateChangeRequestType( translateStatesToTransitionType(message.getFromState(), message.getToState())) - .setResourceID(message.getResourceName()).setShardID(message.getPartitionName()) + .setResourceName(message.getResourceName()).setShardName(message.getPartitionName()) .setTargetState(message.getToState()).build()).build(); } @@ -102,8 +101,8 @@ public final class StateTransitionMessageTranslateUtil { List<GatewayServiceEvent.StateTransitionResult> stResult = new ArrayList<>(); for (HelixGatewayServiceOuterClass.SingleShardTransitionStatus shardTransition : status) { GatewayServiceEvent.StateTransitionResult result = - new GatewayServiceEvent.StateTransitionResult(shardTransition.getTransitionID(), - shardTransition.getIsSuccess(), shardTransition.getCurrentState()); + new GatewayServiceEvent.StateTransitionResult(shardTransition.getResourceName(), + shardTransition.getShardName(), shardTransition.getCurrentState()); stResult.add(result); } builder = new GatewayServiceEvent.GateWayServiceEventBuilder(GatewayServiceEventType.UPDATE).setClusterName( diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto b/helix-gateway/src/main/proto/HelixGatewayService.proto index e7db5473e..50553e372 100644 --- a/helix-gateway/src/main/proto/HelixGatewayService.proto +++ b/helix-gateway/src/main/proto/HelixGatewayService.proto @@ -20,21 +20,20 @@ syntax = "proto3"; package proto.org.apache.helix.gateway; -message SingleTransitionMessage { - enum TransitionType { +message SingleShardChangeRequest { + enum StateChangeRequestType { ADD_SHARD = 0; DELETE_SHARD = 1; CHANGE_ROLE = 2; } - string transitionID = 1; // ID of transition message - TransitionType transitionType = 2; // Transition type for shard operations - string resourceID = 3; // Resource ID - string shardID = 4; // Shard to perform operation - string targetState = 6; // Shard target state. + StateChangeRequestType stateChangeRequestType = 1; // Transition type for shard operations + string resourceName = 2; // Resource ID + string shardName = 3; // Shard to perform operation + string targetState = 4; // Shard target state. } -message TransitionMessage { - repeated SingleTransitionMessage request = 1; +message ShardChangeRequests { + repeated SingleShardChangeRequest request = 1; } message SingleResourceState { @@ -48,9 +47,9 @@ message SingleShardState { } message SingleShardTransitionStatus { - string transitionID = 1; // ID of transition message - bool isSuccess = 2; // Was transition successfully performed - optional string currentState = 3; // If it failed, what is the current state it should reported as. + string resourceName = 1; // resource name + string shardName = 2; // shard name + string currentState = 3; // If it failed, what is the current state it should reported as. } message ShardTransitionStatus{ @@ -75,6 +74,6 @@ message ShardStateMessage{ } service HelixGatewayService { - rpc report(stream ShardStateMessage) returns (stream TransitionMessage) {} + rpc report(stream ShardStateMessage) returns (stream ShardChangeRequests) {} } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java index 2c593ca12..69882fc66 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/participant/TestHelixGatewayParticipant.java @@ -36,7 +36,6 @@ import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; -import org.apache.helix.model.Message; import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; import org.testng.Assert; @@ -44,6 +43,8 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import org.testng.collections.Lists; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass; +import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardChangeRequests; public class TestHelixGatewayParticipant extends ZkTestBase { @@ -58,7 +59,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { private ClusterControllerManager _controller; private int _nextStartPort = 12000; private final List<HelixGatewayParticipant> _participants = Lists.newArrayList(); - private final Map<String, Message> _pendingMessageMap = new ConcurrentHashMap<>(); + private final Map<String, ShardChangeRequests> _pendingMessageMap = new ConcurrentHashMap<>(); private final AtomicInteger _onDisconnectCallbackCount = new AtomicInteger(); @BeforeClass @@ -164,16 +165,18 @@ public class TestHelixGatewayParticipant extends ZkTestBase { /** * Retrieve a pending message for a specific participant. */ - private Message getPendingMessage(String instanceName) { + private ShardChangeRequests getPendingMessage(String instanceName) { return _pendingMessageMap.get(instanceName); } /** * Process the pending message for a participant. */ - private void processPendingMessage(HelixGatewayParticipant participant, boolean isSuccess) { - Message message = _pendingMessageMap.remove(participant.getInstanceName()); - participant.completeStateTransition(message.getMsgId(), isSuccess); + private void processPendingMessage(HelixGatewayParticipant participant, boolean isSuccess, String toState) { + ShardChangeRequests requests = _pendingMessageMap.remove(participant.getInstanceName()); + + participant.completeStateTransition(requests.getRequest(0).getResourceName(),requests.getRequest(0).getShardName(), + isSuccess ? toState : "WRONG_STATE"); } /** @@ -240,14 +243,14 @@ public class TestHelixGatewayParticipant extends ZkTestBase { // Verify that all pending messages have the toState "ONLINE" for (HelixGatewayParticipant participant : _participants) { - Message message = getPendingMessage(participant.getInstanceName()); - Assert.assertNotNull(message); - Assert.assertEquals(message.getToState(), "ONLINE"); + HelixGatewayServiceOuterClass.SingleShardChangeRequest request = getPendingMessage(participant.getInstanceName()).getRequest(0); + Assert.assertNotNull(request); + Assert.assertEquals(request.getTargetState(), "ONLINE"); } // Process all pending messages successfully for (HelixGatewayParticipant participant : _participants) { - processPendingMessage(participant, true); + processPendingMessage(participant, true, "ONLINE"); } // Verify that the cluster converges and all states are "ONLINE" @@ -263,12 +266,12 @@ public class TestHelixGatewayParticipant extends ZkTestBase { verifyPendingMessages(List.of(participant)); // Verify the pending message has the toState "ONLINE" - Message message = getPendingMessage(participant.getInstanceName()); - Assert.assertNotNull(message); - Assert.assertEquals(message.getToState(), "ONLINE"); + HelixGatewayServiceOuterClass.SingleShardChangeRequest request = getPendingMessage(participant.getInstanceName()).getRequest(0); + Assert.assertNotNull(request); + Assert.assertEquals(request.getTargetState(), "ONLINE"); // Process the message with failure - processPendingMessage(participant, false); + processPendingMessage(participant, false, "ONLINE"); // Verify that the cluster converges and states reflect the failure (e.g., "OFFLINE") Assert.assertTrue(_clusterVerifier.verify()); @@ -311,7 +314,7 @@ public class TestHelixGatewayParticipant extends ZkTestBase { verifyPendingMessages(List.of(participantReplacement)); // Process the pending message successfully - processPendingMessage(participantReplacement, true); + processPendingMessage(participantReplacement, true, "DROPPED"); // Verify that the cluster converges and states are correctly updated to "ONLINE" Assert.assertTrue(_clusterVerifier.verify()); @@ -344,17 +347,18 @@ public class TestHelixGatewayParticipant extends ZkTestBase { } public static class MockHelixGatewayServiceChannel implements HelixGatewayServiceChannel { - private final Map<String, Message> _pendingMessageMap; + private final Map<String, ShardChangeRequests> _pendingMessageMap; private static final AtomicInteger _gracefulDisconnectCount = new AtomicInteger(); private static final AtomicInteger _errorDisconnectCount = new AtomicInteger(); - public MockHelixGatewayServiceChannel(Map<String, Message> pendingMessageMap) { + public MockHelixGatewayServiceChannel(Map<String, ShardChangeRequests> pendingMessageMap) { _pendingMessageMap = pendingMessageMap; } @Override - public void sendStateTransitionMessage(String instanceName, String currentState, Message message) { - _pendingMessageMap.put(instanceName, message); + public void sendStateChangeRequests(String instanceName, + HelixGatewayServiceOuterClass.ShardChangeRequests shardChangeRequests) { + _pendingMessageMap.put(instanceName, shardChangeRequests); } @Override diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java index 19c92f5e2..f3221a12c 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java @@ -91,9 +91,9 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { } public void connect() { - _requestObserver = asyncStub.report(new StreamObserver<HelixGatewayServiceOuterClass.TransitionMessage>() { + _requestObserver = asyncStub.report(new StreamObserver<HelixGatewayServiceOuterClass.ShardChangeRequests>() { @Override - public void onNext(HelixGatewayServiceOuterClass.TransitionMessage value) { + public void onNext(HelixGatewayServiceOuterClass.ShardChangeRequests value) { // Handle response from server } diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java index 34e23d0b0..65c299122 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/utils/TestStateTransitionMessageTranslateUtil.java @@ -32,11 +32,11 @@ public class TestStateTransitionMessageTranslateUtil { String currentState = "ONLINE"; String toState = HelixDefinedState.DROPPED.name(); - HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result = + HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType result = StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState); Assert.assertEquals(result, - HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.DELETE_SHARD, + HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.DELETE_SHARD, "Expected DELETE_SHARD when transitioning to DROPPED state from a non-DROPPED state."); } @@ -45,11 +45,11 @@ public class TestStateTransitionMessageTranslateUtil { String currentState = HelixGatewayParticipant.UNASSIGNED_STATE; String toState = "ONLINE"; - HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result = + HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType result = StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState); Assert.assertEquals(result, - HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.ADD_SHARD, + HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.ADD_SHARD, "Expected ADD_SHARD when transitioning from DROPPED state to a non-DROPPED state."); } @@ -58,11 +58,11 @@ public class TestStateTransitionMessageTranslateUtil { String currentState = "ONLINE"; String toState = "OFFLINE"; - HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType result = + HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType result = StateTransitionMessageTranslateUtil.translateStatesToTransitionType(currentState, toState); Assert.assertEquals(result, - HelixGatewayServiceOuterClass.SingleTransitionMessage.TransitionType.CHANGE_ROLE, + HelixGatewayServiceOuterClass.SingleShardChangeRequest.StateChangeRequestType.CHANGE_ROLE, "Expected CHANGE_ROLE when transitioning between non-DROPPED states."); } }