This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit ba8eefbcb61f111f78b35d7e5c08b3ec3516655b Author: xyuanlu <xyua...@gmail.com> AuthorDate: Mon Jul 22 13:43:00 2024 -0700 Refactor and remove mock classes (#2841) Refactor and remove mock classes --- .../org/apache/helix/gateway/HelixGatewayMain.java | 74 -------------- .../grpcservice/HelixGatewayServiceService.java | 2 +- .../helix/gateway/mock/ControllerManager.java | 111 --------------------- .../apache/helix/gateway/mock/MockApplication.java | 100 ------------------- .../helix/gateway/mock/MockProtoRequest.java | 56 ----------- .../helix/gateway/mock/MockProtoResponse.java | 15 --- .../helix/gateway/service/ClusterManager.java | 31 +----- .../HelixGatewayOnlineOfflineStateModel.java | 81 --------------- .../helix/gateway/service/HelixGatewayService.java | 20 ++-- .../HelixGatewayOnlineOfflineStateModel.java | 64 ++++++++++++ ...HelixGatewayOnlineOfflineStateModelFactory.java | 3 +- 11 files changed, 81 insertions(+), 476 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java deleted file mode 100644 index 0577aba02..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java +++ /dev/null @@ -1,74 +0,0 @@ -package org.apache.helix.gateway; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.helix.ConfigAccessor; -import org.apache.helix.HelixAdmin; -import org.apache.helix.InstanceType; -import org.apache.helix.gateway.mock.ControllerManager; -import org.apache.helix.gateway.mock.MockApplication; -import org.apache.helix.gateway.service.HelixGatewayService; -import org.apache.helix.manager.zk.ZKHelixAdmin; -import org.apache.helix.model.ClusterConfig; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.OnlineOfflineSMD; -import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; -import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; -import org.apache.helix.zookeeper.impl.client.ZkClient; - -public final class HelixGatewayMain { - - private static final String ZK_ADDRESS = "localhost:2181"; - private static final String CLUSTER_NAME = "TEST_CLUSTER"; - - private HelixGatewayMain() { - } - - public static void main(String[] args) throws InterruptedException { - RealmAwareZkClient zkClient = new ZkClient(ZK_ADDRESS); - zkClient.setZkSerializer(new ZNRecordSerializer()); - ConfigAccessor configAccessor = new ConfigAccessor(zkClient); - HelixAdmin admin = new ZKHelixAdmin(zkClient); - if (admin.getClusters().isEmpty()) { - admin.addCluster(CLUSTER_NAME); - admin.addStateModelDef(CLUSTER_NAME, "OnlineOffline", OnlineOfflineSMD.build()); - } - - ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME); - clusterConfig.getRecord().setSimpleField("allowParticipantAutoJoin", "true"); - configAccessor.updateClusterConfig(CLUSTER_NAME, clusterConfig); - - String resourceName = "Test_Resource"; - - if (admin.getResourceIdealState(CLUSTER_NAME, resourceName) == null) { - admin.addResource(CLUSTER_NAME, resourceName, 2, "OnlineOffline", - IdealState.RebalanceMode.FULL_AUTO.name(), - "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy"); - admin.rebalance(CLUSTER_NAME, resourceName, 3); - } - - ControllerManager controllerManager = - new ControllerManager(ZK_ADDRESS, CLUSTER_NAME, "CONTROLLER", InstanceType.CONTROLLER); - controllerManager.syncStart(); - - HelixGatewayService service = new HelixGatewayService(ZK_ADDRESS); - service.start(); - - List<MockApplication> mockApplications = new ArrayList<>(); - for (int i = 0; i < 6; i++) { - MockApplication mockApplication = - new MockApplication("INSTANCE_" + i, CLUSTER_NAME, service.getClusterManager()); - service.registerParticipant(mockApplication); - mockApplications.add(mockApplication); - } - - Thread.sleep(100000000); - - MockApplication mockApplication = mockApplications.get(3); - service.deregisterParticipant(mockApplication.getClusterName(), - mockApplication.getInstanceName()); - - controllerManager.syncStop(); - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java index 441fba952..286cf2001 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java @@ -1,8 +1,8 @@ package org.apache.helix.gateway.grpcservice; +import io.grpc.stub.StreamObserver; import proto.org.apache.helix.gateway.*; import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*; -import io.grpc.stub.StreamObserver; public class HelixGatewayServiceService extends HelixGatewayServiceGrpc.HelixGatewayServiceImplBase { diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java deleted file mode 100644 index 3d33874c4..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java +++ /dev/null @@ -1,111 +0,0 @@ -package org.apache.helix.gateway.mock; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.helix.HelixManagerProperty; -import org.apache.helix.InstanceType; -import org.apache.helix.manager.zk.HelixManagerStateListener; -import org.apache.helix.manager.zk.ZKHelixManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ControllerManager extends ZKHelixManager implements Runnable { - private static final int DISCONNECT_WAIT_TIME_MS = 3000; - private static Logger logger = LoggerFactory.getLogger(ControllerManager.class); - private static AtomicLong uid = new AtomicLong(10000); - private final String _clusterName; - private final String _instanceName; - private final InstanceType _type; - protected CountDownLatch _startCountDown = new CountDownLatch(1); - protected CountDownLatch _stopCountDown = new CountDownLatch(1); - protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1); - protected boolean _started = false; - protected Thread _watcher; - private long _uid; - - public ControllerManager(String zkAddr, String clusterName, String instanceName, - InstanceType type) { - super(clusterName, instanceName, type, zkAddr); - _clusterName = clusterName; - _instanceName = instanceName; - _type = type; - _uid = uid.getAndIncrement(); - } - - protected ControllerManager(String clusterName, String instanceName, InstanceType instanceType, - String zkAddress, HelixManagerStateListener stateListener, - HelixManagerProperty helixManagerProperty) { - super(clusterName, instanceName, instanceType, zkAddress, stateListener, helixManagerProperty); - _clusterName = clusterName; - _instanceName = instanceName; - _type = instanceType; - _uid = uid.getAndIncrement(); - } - - public void syncStop() { - _stopCountDown.countDown(); - try { - _waitStopFinishCountDown.await(); - _started = false; - } catch (InterruptedException e) { - logger.error("Interrupted waiting for finish", e); - } - } - - // This should not be called more than once because HelixManager.connect() should not be called more than once. - public void syncStart() { - if (_started) { - throw new RuntimeException( - "Helix Controller already started. Do not call syncStart() more than once."); - } else { - _started = true; - } - - _watcher = new Thread(this); - _watcher.setName( - String.format("ClusterManager_Watcher_%s_%s_%s_%d", _clusterName, _instanceName, - _type.name(), _uid)); - logger.debug("ClusterManager_watcher_{}_{}_{}_{} started, stacktrace {}", _clusterName, - _instanceName, _type.name(), _uid, Thread.currentThread().getStackTrace()); - _watcher.start(); - - try { - _startCountDown.await(); - } catch (InterruptedException e) { - logger.error("Interrupted waiting for start", e); - } - } - - @Override - public void run() { - try { - connect(); - _startCountDown.countDown(); - _stopCountDown.await(); - } catch (Exception e) { - logger.error("exception running controller-manager", e); - } finally { - _startCountDown.countDown(); - disconnect(); - _waitStopFinishCountDown.countDown(); - } - } - - /** - @SuppressWarnings("finalizer") - @Override public void finalize() { - _watcher.interrupt(); - try { - _watcher.join(DISCONNECT_WAIT_TIME_MS); - } catch (InterruptedException e) { - logger.error("ClusterManager watcher cleanup in the finalize method was interrupted.", e); - } finally { - if (isConnected()) { - logger.warn( - "The HelixManager ({}-{}-{}) is still connected after {} ms wait. This is a potential resource leakage!", - _clusterName, _instanceName, _type.name(), DISCONNECT_WAIT_TIME_MS); - } - } - }*/ -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java deleted file mode 100644 index 679f95f05..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java +++ /dev/null @@ -1,100 +0,0 @@ -package org.apache.helix.gateway.mock; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.Executors; - -import org.apache.helix.gateway.service.ClusterManager; - -public class MockApplication { - private final ClusterManager _clusterManager; - private Map<String, Map<String, String>> _currentStates; - private String _instanceName; - private String _clusterName; - private Queue<MockProtoRequest> _requestQueue; - - public MockApplication(String instanceName, String clusterName, ClusterManager clusterManager) { - _instanceName = instanceName; - _clusterName = clusterName; - _currentStates = new HashMap<>(); - _requestQueue = new LinkedList<>(); - _clusterManager = clusterManager; - Executors.newScheduledThreadPool(1) - .scheduleAtFixedRate(this::process, 0, 5000, java.util.concurrent.TimeUnit.MILLISECONDS); - } - - public void process() { - List<MockProtoResponse> completedMessages = new ArrayList<>(); - synchronized (_requestQueue) { - while (!_requestQueue.isEmpty()) { - MockProtoRequest request = _requestQueue.poll(); - switch (request.getMessageType()) { - case ADD: - addShard(request.getResourceName(), request.getShardName()); - completedMessages.add(new MockProtoResponse(request.getMessageId())); - break; - case REMOVE: - removeShard(request.getResourceName(), request.getShardName()); - completedMessages.add(new MockProtoResponse(request.getMessageId())); - break; - case CHANGE_ROLE: - changeRole(request.getResourceName(), request.getShardName(), request.getFromState(), - request.getToState()); - completedMessages.add(new MockProtoResponse(request.getMessageId())); - break; - default: - System.out.println("Unknown message type: " + request.getMessageType()); - throw new RuntimeException("Unknown message type: " + request.getMessageType()); - } - } - } - _clusterManager.receiveResponse(completedMessages, _instanceName); - } - - public void addRequest(MockProtoRequest request) { - synchronized (_requestQueue) { - _requestQueue.add(request); - } - } - - public String getInstanceName() { - return _instanceName; - } - - public String getClusterName() { - return _clusterName; - } - - public void join() { - System.out.println( - "Joining Mock Application for instance " + _instanceName + " in cluster " + _clusterName); - } - - public synchronized void addShard(String resourceName, String shardName) { - System.out.println("ADD | " + shardName + " | " + resourceName + " | " + _instanceName); - } - - public synchronized void removeShard(String resourceName, String shardName) { - System.out.println("REMOVE | " + shardName + " | " + resourceName + " | " + _instanceName); - } - - public synchronized void changeRole(String resourceName, String shardName, String fromState, - String toState) { - System.out.println( - "CHANGE ROLE | " + shardName + " | " + resourceName + " | " + _instanceName + " | " - + fromState + " -> " + toState); - _currentStates.computeIfAbsent(resourceName, k -> new HashMap<>()).put(shardName, toState); - } - - private void sleep(long ms) { - try { - Thread.sleep(ms); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java deleted file mode 100644 index 4e462e254..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.helix.gateway.mock; - -import org.apache.helix.gateway.constant.MessageType; - -public class MockProtoRequest { - - private String _messageId; - private String _instanceName; - - private MessageType _messageType; - private String _resourceName; - private String _shardName; - - private String _fromState; - private String _toState; - - public MockProtoRequest(MessageType messageType, String resourceName, String shardName, - String instanceName, String messageId, String fromState, String toState) { - System.out.println( - messageType + " | " + shardName + " | " + resourceName + " | " + instanceName + " | " - + messageId + " | " + fromState + " | " + toState); - _messageId = messageId; - _instanceName = instanceName; - _messageType = messageType; - _resourceName = resourceName; - _shardName = shardName; - } - - public MessageType getMessageType() { - return _messageType; - } - - public String getResourceName() { - return _resourceName; - } - - public String getShardName() { - return _shardName; - } - - public String getFromState() { - return _fromState; - } - - public String getToState() { - return _toState; - } - - public String getMessageId() { - return _messageId; - } - - public String getInstanceName() { - return _instanceName; - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java deleted file mode 100644 index 108f49807..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.helix.gateway.mock; - -public class MockProtoResponse { - - private String _messageId; - - public MockProtoResponse(String messageId) { - System.out.println("Finished process of message : " + messageId); - _messageId = messageId; - } - - public String getMessageId() { - return _messageId; - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java index 80bd15aaa..b96694f8b 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java @@ -1,19 +1,15 @@ package org.apache.helix.gateway.service; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import org.apache.helix.gateway.mock.MockApplication; -import org.apache.helix.gateway.mock.MockProtoRequest; -import org.apache.helix.gateway.mock.MockProtoResponse; + public class ClusterManager { private Map<String, Map<String, AtomicBoolean>> _flagMap; - private Map<String, MockApplication> _channelMap; private Lock _lock = new ReentrantLock(); // event queue @@ -21,34 +17,17 @@ public class ClusterManager { public ClusterManager() { _flagMap = new ConcurrentHashMap<>(); - _channelMap = new ConcurrentHashMap<>(); } - public void addChannel(MockApplication mockApplication) { - _channelMap.put(mockApplication.getInstanceName(), mockApplication); - _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new ConcurrentHashMap<>()); + public void addChannel() { } public void removeChannel(String instanceName) { - _channelMap.remove(instanceName); _flagMap.remove(instanceName); } - public AtomicBoolean sendMessage(MockProtoRequest request) { - MockApplication mockApplication = _channelMap.get(request.getInstanceName()); - synchronized (mockApplication) { - mockApplication.addRequest(request); - AtomicBoolean flag = new AtomicBoolean(false); - _flagMap.computeIfAbsent(request.getInstanceName(), k -> new ConcurrentHashMap<>()) - .put(request.getMessageId(), flag); - return flag; - } - } - - public synchronized void receiveResponse(List<MockProtoResponse> responses, String instanceName) { - for (MockProtoResponse response : responses) { - AtomicBoolean flag = _flagMap.get(instanceName).remove(response.getMessageId()); - flag.set(true); - } + public AtomicBoolean sendMessage() { + AtomicBoolean flag = new AtomicBoolean(false); + return flag; } } diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java deleted file mode 100644 index 37453e7d9..000000000 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java +++ /dev/null @@ -1,81 +0,0 @@ -package org.apache.helix.gateway.service; - -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.helix.NotificationContext; -import org.apache.helix.gateway.constant.MessageType; -import org.apache.helix.gateway.mock.MockProtoRequest; -import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; - -public class HelixGatewayOnlineOfflineStateModel extends StateModel { - private boolean _firstTime = true; - private ClusterManager _clusterManager; - - private String _resourceName; - private String _partitionKey; - - private AtomicBoolean _completed; - - public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey, - ClusterManager clusterManager) { - _resourceName = resourceName; - _partitionKey = partitionKey; - _clusterManager = clusterManager; - } - - public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { - if (_firstTime) { - wait(_clusterManager.sendMessage( - new MockProtoRequest(MessageType.ADD, message.getResourceName(), - message.getPartitionName(), message.getTgtName(), UUID.randomUUID().toString(), - message.getToState(), message.getFromState()))); - System.out.println( - "Message for " + message.getPartitionName() + " instance " + message.getTgtName() - + " with ADD for " + message.getResourceName() + " processed"); - _firstTime = false; - } - wait(_clusterManager.sendMessage( - new MockProtoRequest(MessageType.CHANGE_ROLE, message.getResourceName(), - message.getPartitionName(), message.getTgtName(), UUID.randomUUID().toString(), - message.getToState(), message.getFromState()))); - System.out.println( - "Message for " + message.getPartitionName() + " instance " + message.getTgtName() - + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed"); - } - - public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { - wait(_clusterManager.sendMessage( - new MockProtoRequest(MessageType.CHANGE_ROLE, message.getResourceName(), - message.getPartitionName(), message.getTgtName(), UUID.randomUUID().toString(), - message.getToState(), message.getFromState()))); - System.out.println( - "Message for " + message.getPartitionName() + " instance " + message.getTgtName() - + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed"); - } - - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - wait(_clusterManager.sendMessage( - new MockProtoRequest(MessageType.REMOVE, message.getResourceName(), - message.getPartitionName(), message.getTgtName(), UUID.randomUUID().toString(), - message.getToState(), message.getFromState()))); - System.out.println( - "Message for " + message.getPartitionName() + " instance " + message.getTgtName() - + " with REMOVE for " + message.getResourceName() + " processed"); - } - - private void wait(AtomicBoolean completed) { - _completed = completed; - while (true) { - try { - if (_completed.get()) { - break; - } - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } -} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java index 4f44efcd6..12810f80d 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java @@ -2,11 +2,11 @@ package org.apache.helix.gateway.service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; -import org.apache.helix.gateway.mock.MockApplication; +import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory; +import org.apache.helix.manager.zk.ZKHelixManager; + public class HelixGatewayService { final private Map<String, Map<String, HelixManager>> _participantsMap; @@ -28,15 +28,13 @@ public class HelixGatewayService { System.out.println("Starting Helix Gateway Service"); } - public void registerParticipant(MockApplication mockApplication) { - HelixManager manager = _participantsMap.computeIfAbsent(mockApplication.getClusterName(), - k -> new ConcurrentHashMap<>()).computeIfAbsent(mockApplication.getInstanceName(), - k -> HelixManagerFactory.getZKHelixManager(mockApplication.getClusterName(), - mockApplication.getInstanceName(), InstanceType.PARTICIPANT, _zkAddress)); - manager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", - new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager)); + public void registerParticipant() { + // TODO: create participant manager and add to _participantsMap + HelixManager manager = new ZKHelixManager("clusterName", "instanceName", InstanceType.PARTICIPANT, _zkAddress); + manager.getStateMachineEngine() + .registerStateModelFactory("OnlineOffline", new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager)); try { - _clusterManager.addChannel(mockApplication); + _clusterManager.addChannel(); manager.connect(); } catch (Exception e) { throw new RuntimeException(e); diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java new file mode 100644 index 000000000..5c95feb38 --- /dev/null +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java @@ -0,0 +1,64 @@ +package org.apache.helix.gateway.statemodel; + +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.helix.NotificationContext; +import org.apache.helix.gateway.service.ClusterManager; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; + +public class HelixGatewayOnlineOfflineStateModel extends StateModel { + private boolean _firstTime = true; + private ClusterManager _clusterManager; + + private String _resourceName; + private String _partitionKey; + + private AtomicBoolean _completed; + + public HelixGatewayOnlineOfflineStateModel(String resourceName, String partitionKey, + ClusterManager clusterManager) { + _resourceName = resourceName; + _partitionKey = partitionKey; + _clusterManager = clusterManager; + } + + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + if (_firstTime) { + wait(_clusterManager.sendMessage()); + System.out.println( + "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with ADD for " + + message.getResourceName() + " processed"); + _firstTime = false; + } + wait(_clusterManager.sendMessage()); + System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName() + + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() + " processed"); + } + + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + wait(_clusterManager.sendMessage()); + System.out.println("Message for " + message.getPartitionName() + " instance " + message.getTgtName() + + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() + " processed"); + } + + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + wait(_clusterManager.sendMessage()); + System.out.println( + "Message for " + message.getPartitionName() + " instance " + message.getTgtName() + " with REMOVE for " + + message.getResourceName() + " processed"); + } + + private void wait(AtomicBoolean completed) { + _completed = completed; + while (true) { + try { + if (_completed.get()) { + break; + } + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java similarity index 85% rename from helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java rename to helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java index 71570ef15..5db789112 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java @@ -1,5 +1,6 @@ -package org.apache.helix.gateway.service; +package org.apache.helix.gateway.statemodel; +import org.apache.helix.gateway.service.ClusterManager; import org.apache.helix.participant.statemachine.StateModelFactory; public class HelixGatewayOnlineOfflineStateModelFactory extends StateModelFactory<HelixGatewayOnlineOfflineStateModel> {