This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit ba8eefbcb61f111f78b35d7e5c08b3ec3516655b
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Mon Jul 22 13:43:00 2024 -0700

    Refactor and remove mock classes (#2841)
    
    Refactor and remove mock classes
---
 .../org/apache/helix/gateway/HelixGatewayMain.java |  74 --------------
 .../grpcservice/HelixGatewayServiceService.java    |   2 +-
 .../helix/gateway/mock/ControllerManager.java      | 111 ---------------------
 .../apache/helix/gateway/mock/MockApplication.java | 100 -------------------
 .../helix/gateway/mock/MockProtoRequest.java       |  56 -----------
 .../helix/gateway/mock/MockProtoResponse.java      |  15 ---
 .../helix/gateway/service/ClusterManager.java      |  31 +-----
 .../HelixGatewayOnlineOfflineStateModel.java       |  81 ---------------
 .../helix/gateway/service/HelixGatewayService.java |  20 ++--
 .../HelixGatewayOnlineOfflineStateModel.java       |  64 ++++++++++++
 ...HelixGatewayOnlineOfflineStateModelFactory.java |   3 +-
 11 files changed, 81 insertions(+), 476 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java 
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
deleted file mode 100644
index 0577aba02..000000000
--- a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.helix.gateway;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.mock.ControllerManager;
-import org.apache.helix.gateway.mock.MockApplication;
-import org.apache.helix.gateway.service.HelixGatewayService;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.OnlineOfflineSMD;
-import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
-import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-
-public final class HelixGatewayMain {
-
-  private static final String ZK_ADDRESS = "localhost:2181";
-  private static final String CLUSTER_NAME = "TEST_CLUSTER";
-
-  private HelixGatewayMain() {
-  }
-
-  public static void main(String[] args) throws InterruptedException {
-    RealmAwareZkClient zkClient = new ZkClient(ZK_ADDRESS);
-    zkClient.setZkSerializer(new ZNRecordSerializer());
-    ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
-    HelixAdmin admin = new ZKHelixAdmin(zkClient);
-    if (admin.getClusters().isEmpty()) {
-      admin.addCluster(CLUSTER_NAME);
-      admin.addStateModelDef(CLUSTER_NAME, "OnlineOffline", 
OnlineOfflineSMD.build());
-    }
-
-    ClusterConfig clusterConfig = 
configAccessor.getClusterConfig(CLUSTER_NAME);
-    clusterConfig.getRecord().setSimpleField("allowParticipantAutoJoin", 
"true");
-    configAccessor.updateClusterConfig(CLUSTER_NAME, clusterConfig);
-
-    String resourceName = "Test_Resource";
-
-    if (admin.getResourceIdealState(CLUSTER_NAME, resourceName) == null) {
-      admin.addResource(CLUSTER_NAME, resourceName, 2, "OnlineOffline",
-          IdealState.RebalanceMode.FULL_AUTO.name(),
-          
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
-      admin.rebalance(CLUSTER_NAME, resourceName, 3);
-    }
-
-    ControllerManager controllerManager =
-        new ControllerManager(ZK_ADDRESS, CLUSTER_NAME, "CONTROLLER", 
InstanceType.CONTROLLER);
-    controllerManager.syncStart();
-
-    HelixGatewayService service = new HelixGatewayService(ZK_ADDRESS);
-    service.start();
-
-    List<MockApplication> mockApplications = new ArrayList<>();
-    for (int i = 0; i < 6; i++) {
-      MockApplication mockApplication =
-          new MockApplication("INSTANCE_" + i, CLUSTER_NAME, 
service.getClusterManager());
-      service.registerParticipant(mockApplication);
-      mockApplications.add(mockApplication);
-    }
-
-    Thread.sleep(100000000);
-
-    MockApplication mockApplication = mockApplications.get(3);
-    service.deregisterParticipant(mockApplication.getClusterName(),
-        mockApplication.getInstanceName());
-
-    controllerManager.syncStop();
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
index 441fba952..286cf2001 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
@@ -1,8 +1,8 @@
 package org.apache.helix.gateway.grpcservice;
 
+import io.grpc.stub.StreamObserver;
 import proto.org.apache.helix.gateway.*;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
-import io.grpc.stub.StreamObserver;
 
 public class HelixGatewayServiceService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase {
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java
deleted file mode 100644
index 3d33874c4..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/ControllerManager.java
+++ /dev/null
@@ -1,111 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.helix.HelixManagerProperty;
-import org.apache.helix.InstanceType;
-import org.apache.helix.manager.zk.HelixManagerStateListener;
-import org.apache.helix.manager.zk.ZKHelixManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ControllerManager extends ZKHelixManager implements Runnable {
-  private static final int DISCONNECT_WAIT_TIME_MS = 3000;
-  private static Logger logger = 
LoggerFactory.getLogger(ControllerManager.class);
-  private static AtomicLong uid = new AtomicLong(10000);
-  private final String _clusterName;
-  private final String _instanceName;
-  private final InstanceType _type;
-  protected CountDownLatch _startCountDown = new CountDownLatch(1);
-  protected CountDownLatch _stopCountDown = new CountDownLatch(1);
-  protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
-  protected boolean _started = false;
-  protected Thread _watcher;
-  private long _uid;
-
-  public ControllerManager(String zkAddr, String clusterName, String 
instanceName,
-      InstanceType type) {
-    super(clusterName, instanceName, type, zkAddr);
-    _clusterName = clusterName;
-    _instanceName = instanceName;
-    _type = type;
-    _uid = uid.getAndIncrement();
-  }
-
-  protected ControllerManager(String clusterName, String instanceName, 
InstanceType instanceType,
-      String zkAddress, HelixManagerStateListener stateListener,
-      HelixManagerProperty helixManagerProperty) {
-    super(clusterName, instanceName, instanceType, zkAddress, stateListener, 
helixManagerProperty);
-    _clusterName = clusterName;
-    _instanceName = instanceName;
-    _type = instanceType;
-    _uid = uid.getAndIncrement();
-  }
-
-  public void syncStop() {
-    _stopCountDown.countDown();
-    try {
-      _waitStopFinishCountDown.await();
-      _started = false;
-    } catch (InterruptedException e) {
-      logger.error("Interrupted waiting for finish", e);
-    }
-  }
-
-  // This should not be called more than once because HelixManager.connect() 
should not be called more than once.
-  public void syncStart() {
-    if (_started) {
-      throw new RuntimeException(
-          "Helix Controller already started. Do not call syncStart() more than 
once.");
-    } else {
-      _started = true;
-    }
-
-    _watcher = new Thread(this);
-    _watcher.setName(
-        String.format("ClusterManager_Watcher_%s_%s_%s_%d", _clusterName, 
_instanceName,
-            _type.name(), _uid));
-    logger.debug("ClusterManager_watcher_{}_{}_{}_{} started, stacktrace {}", 
_clusterName,
-        _instanceName, _type.name(), _uid, 
Thread.currentThread().getStackTrace());
-    _watcher.start();
-
-    try {
-      _startCountDown.await();
-    } catch (InterruptedException e) {
-      logger.error("Interrupted waiting for start", e);
-    }
-  }
-
-  @Override
-  public void run() {
-    try {
-      connect();
-      _startCountDown.countDown();
-      _stopCountDown.await();
-    } catch (Exception e) {
-      logger.error("exception running controller-manager", e);
-    } finally {
-      _startCountDown.countDown();
-      disconnect();
-      _waitStopFinishCountDown.countDown();
-    }
-  }
-
-  /**
-   @SuppressWarnings("finalizer")
-   @Override public void finalize() {
-   _watcher.interrupt();
-   try {
-   _watcher.join(DISCONNECT_WAIT_TIME_MS);
-   } catch (InterruptedException e) {
-   logger.error("ClusterManager watcher cleanup in the finalize method was 
interrupted.", e);
-   } finally {
-   if (isConnected()) {
-   logger.warn(
-   "The HelixManager ({}-{}-{}) is still connected after {} ms wait. This is a 
potential resource leakage!",
-   _clusterName, _instanceName, _type.name(), DISCONNECT_WAIT_TIME_MS);
-   }
-   }
-   }*/
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java
deleted file mode 100644
index 679f95f05..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockApplication.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.Executors;
-
-import org.apache.helix.gateway.service.ClusterManager;
-
-public class MockApplication {
-  private final ClusterManager _clusterManager;
-  private Map<String, Map<String, String>> _currentStates;
-  private String _instanceName;
-  private String _clusterName;
-  private Queue<MockProtoRequest> _requestQueue;
-
-  public MockApplication(String instanceName, String clusterName, 
ClusterManager clusterManager) {
-    _instanceName = instanceName;
-    _clusterName = clusterName;
-    _currentStates = new HashMap<>();
-    _requestQueue = new LinkedList<>();
-    _clusterManager = clusterManager;
-    Executors.newScheduledThreadPool(1)
-        .scheduleAtFixedRate(this::process, 0, 5000, 
java.util.concurrent.TimeUnit.MILLISECONDS);
-  }
-
-  public void process() {
-    List<MockProtoResponse> completedMessages = new ArrayList<>();
-    synchronized (_requestQueue) {
-      while (!_requestQueue.isEmpty()) {
-        MockProtoRequest request = _requestQueue.poll();
-        switch (request.getMessageType()) {
-          case ADD:
-            addShard(request.getResourceName(), request.getShardName());
-            completedMessages.add(new 
MockProtoResponse(request.getMessageId()));
-            break;
-          case REMOVE:
-            removeShard(request.getResourceName(), request.getShardName());
-            completedMessages.add(new 
MockProtoResponse(request.getMessageId()));
-            break;
-          case CHANGE_ROLE:
-            changeRole(request.getResourceName(), request.getShardName(), 
request.getFromState(),
-                request.getToState());
-            completedMessages.add(new 
MockProtoResponse(request.getMessageId()));
-            break;
-          default:
-            System.out.println("Unknown message type: " + 
request.getMessageType());
-            throw new RuntimeException("Unknown message type: " + 
request.getMessageType());
-        }
-      }
-    }
-    _clusterManager.receiveResponse(completedMessages, _instanceName);
-  }
-
-  public void addRequest(MockProtoRequest request) {
-    synchronized (_requestQueue) {
-      _requestQueue.add(request);
-    }
-  }
-
-  public String getInstanceName() {
-    return _instanceName;
-  }
-
-  public String getClusterName() {
-    return _clusterName;
-  }
-
-  public void join() {
-    System.out.println(
-        "Joining Mock Application for instance " + _instanceName + " in 
cluster " + _clusterName);
-  }
-
-  public synchronized void addShard(String resourceName, String shardName) {
-    System.out.println("ADD | " + shardName + " | " + resourceName + " | " + 
_instanceName);
-  }
-
-  public synchronized void removeShard(String resourceName, String shardName) {
-    System.out.println("REMOVE | " + shardName + " | " + resourceName + " | " 
+ _instanceName);
-  }
-
-  public synchronized void changeRole(String resourceName, String shardName, 
String fromState,
-      String toState) {
-    System.out.println(
-        "CHANGE ROLE | " + shardName + " | " + resourceName + " | " + 
_instanceName + " | "
-            + fromState + " -> " + toState);
-    _currentStates.computeIfAbsent(resourceName, k -> new 
HashMap<>()).put(shardName, toState);
-  }
-
-  private void sleep(long ms) {
-    try {
-      Thread.sleep(ms);
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java
deleted file mode 100644
index 4e462e254..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoRequest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-import org.apache.helix.gateway.constant.MessageType;
-
-public class MockProtoRequest {
-
-  private String _messageId;
-  private String _instanceName;
-
-  private MessageType _messageType;
-  private String _resourceName;
-  private String _shardName;
-
-  private String _fromState;
-  private String _toState;
-
-  public MockProtoRequest(MessageType messageType, String resourceName, String 
shardName,
-      String instanceName, String messageId, String fromState, String toState) 
{
-    System.out.println(
-        messageType + " | " + shardName + " | " + resourceName + " | " + 
instanceName + " | "
-            + messageId + " | " + fromState + " | " + toState);
-    _messageId = messageId;
-    _instanceName = instanceName;
-    _messageType = messageType;
-    _resourceName = resourceName;
-    _shardName = shardName;
-  }
-
-  public MessageType getMessageType() {
-    return _messageType;
-  }
-
-  public String getResourceName() {
-    return _resourceName;
-  }
-
-  public String getShardName() {
-    return _shardName;
-  }
-
-  public String getFromState() {
-    return _fromState;
-  }
-
-  public String getToState() {
-    return _toState;
-  }
-
-  public String getMessageId() {
-    return _messageId;
-  }
-
-  public String getInstanceName() {
-    return _instanceName;
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java
deleted file mode 100644
index 108f49807..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/mock/MockProtoResponse.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.helix.gateway.mock;
-
-public class MockProtoResponse {
-
-  private String _messageId;
-
-  public MockProtoResponse(String messageId) {
-    System.out.println("Finished process of message : " + messageId);
-    _messageId = messageId;
-  }
-
-  public String getMessageId() {
-    return _messageId;
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
index 80bd15aaa..b96694f8b 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
@@ -1,19 +1,15 @@
 package org.apache.helix.gateway.service;
 
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.helix.gateway.mock.MockApplication;
-import org.apache.helix.gateway.mock.MockProtoRequest;
-import org.apache.helix.gateway.mock.MockProtoResponse;
+
 
 public class ClusterManager {
   private Map<String, Map<String, AtomicBoolean>> _flagMap;
-  private Map<String, MockApplication> _channelMap;
   private Lock _lock = new ReentrantLock();
 
   // event queue
@@ -21,34 +17,17 @@ public class ClusterManager {
 
   public ClusterManager() {
     _flagMap = new ConcurrentHashMap<>();
-    _channelMap = new ConcurrentHashMap<>();
   }
 
-  public void addChannel(MockApplication mockApplication) {
-    _channelMap.put(mockApplication.getInstanceName(), mockApplication);
-    _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new 
ConcurrentHashMap<>());
+  public void addChannel() {
   }
 
   public void removeChannel(String instanceName) {
-    _channelMap.remove(instanceName);
     _flagMap.remove(instanceName);
   }
 
-  public AtomicBoolean sendMessage(MockProtoRequest request) {
-    MockApplication mockApplication = 
_channelMap.get(request.getInstanceName());
-    synchronized (mockApplication) {
-      mockApplication.addRequest(request);
-      AtomicBoolean flag = new AtomicBoolean(false);
-      _flagMap.computeIfAbsent(request.getInstanceName(), k -> new 
ConcurrentHashMap<>())
-          .put(request.getMessageId(), flag);
-      return flag;
-    }
-  }
-
-  public synchronized void receiveResponse(List<MockProtoResponse> responses, 
String instanceName) {
-    for (MockProtoResponse response : responses) {
-      AtomicBoolean flag = 
_flagMap.get(instanceName).remove(response.getMessageId());
-      flag.set(true);
-    }
+  public AtomicBoolean sendMessage() {
+    AtomicBoolean flag = new AtomicBoolean(false);
+    return flag;
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java
deleted file mode 100644
index 37453e7d9..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModel.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package org.apache.helix.gateway.service;
-
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.constant.MessageType;
-import org.apache.helix.gateway.mock.MockProtoRequest;
-import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-
-public class HelixGatewayOnlineOfflineStateModel extends StateModel {
-  private boolean _firstTime = true;
-  private ClusterManager _clusterManager;
-
-  private String _resourceName;
-  private String _partitionKey;
-
-  private AtomicBoolean _completed;
-
-  public HelixGatewayOnlineOfflineStateModel(String resourceName, String 
partitionKey,
-      ClusterManager clusterManager) {
-    _resourceName = resourceName;
-    _partitionKey = partitionKey;
-    _clusterManager = clusterManager;
-  }
-
-  public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
-    if (_firstTime) {
-      wait(_clusterManager.sendMessage(
-          new MockProtoRequest(MessageType.ADD, message.getResourceName(),
-              message.getPartitionName(), message.getTgtName(), 
UUID.randomUUID().toString(),
-              message.getToState(), message.getFromState())));
-      System.out.println(
-          "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName()
-              + " with ADD for " + message.getResourceName() + " processed");
-      _firstTime = false;
-    }
-    wait(_clusterManager.sendMessage(
-        new MockProtoRequest(MessageType.CHANGE_ROLE, 
message.getResourceName(),
-            message.getPartitionName(), message.getTgtName(), 
UUID.randomUUID().toString(),
-            message.getToState(), message.getFromState())));
-    System.out.println(
-        "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName()
-            + " with CHANGE_ROLE_OFFLINE_ONLINE for " + 
message.getResourceName() + " processed");
-  }
-
-  public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
-    wait(_clusterManager.sendMessage(
-        new MockProtoRequest(MessageType.CHANGE_ROLE, 
message.getResourceName(),
-            message.getPartitionName(), message.getTgtName(), 
UUID.randomUUID().toString(),
-            message.getToState(), message.getFromState())));
-    System.out.println(
-        "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName()
-            + " with CHANGE_ROLE_ONLINE_OFFLINE for " + 
message.getResourceName() + " processed");
-  }
-
-  public void onBecomeDroppedFromOffline(Message message, NotificationContext 
context) {
-    wait(_clusterManager.sendMessage(
-        new MockProtoRequest(MessageType.REMOVE, message.getResourceName(),
-            message.getPartitionName(), message.getTgtName(), 
UUID.randomUUID().toString(),
-            message.getToState(), message.getFromState())));
-    System.out.println(
-        "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName()
-            + " with REMOVE for " + message.getResourceName() + " processed");
-  }
-
-  private void wait(AtomicBoolean completed) {
-    _completed = completed;
-    while (true) {
-      try {
-        if (_completed.get()) {
-          break;
-        }
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
index 4f44efcd6..12810f80d 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
@@ -2,11 +2,11 @@ package org.apache.helix.gateway.service;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
-import org.apache.helix.gateway.mock.MockApplication;
+import 
org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
+import org.apache.helix.manager.zk.ZKHelixManager;
+
 
 public class HelixGatewayService {
   final private Map<String, Map<String, HelixManager>> _participantsMap;
@@ -28,15 +28,13 @@ public class HelixGatewayService {
     System.out.println("Starting Helix Gateway Service");
   }
 
-  public void registerParticipant(MockApplication mockApplication) {
-    HelixManager manager = 
_participantsMap.computeIfAbsent(mockApplication.getClusterName(),
-        k -> new 
ConcurrentHashMap<>()).computeIfAbsent(mockApplication.getInstanceName(),
-        k -> 
HelixManagerFactory.getZKHelixManager(mockApplication.getClusterName(),
-            mockApplication.getInstanceName(), InstanceType.PARTICIPANT, 
_zkAddress));
-    manager.getStateMachineEngine().registerStateModelFactory("OnlineOffline",
-        new HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
+  public void registerParticipant() {
+    // TODO: create participant manager and add to _participantsMap
+    HelixManager manager = new ZKHelixManager("clusterName", "instanceName", 
InstanceType.PARTICIPANT, _zkAddress);
+    manager.getStateMachineEngine()
+        .registerStateModelFactory("OnlineOffline", new 
HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
     try {
-      _clusterManager.addChannel(mockApplication);
+      _clusterManager.addChannel();
       manager.connect();
     } catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
new file mode 100644
index 000000000..5c95feb38
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
@@ -0,0 +1,64 @@
+package org.apache.helix.gateway.statemodel;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.gateway.service.ClusterManager;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+
+public class HelixGatewayOnlineOfflineStateModel extends StateModel {
+  private boolean _firstTime = true;
+  private ClusterManager _clusterManager;
+
+  private String _resourceName;
+  private String _partitionKey;
+
+  private AtomicBoolean _completed;
+
+  public HelixGatewayOnlineOfflineStateModel(String resourceName, String 
partitionKey,
+      ClusterManager clusterManager) {
+    _resourceName = resourceName;
+    _partitionKey = partitionKey;
+    _clusterManager = clusterManager;
+  }
+
+  public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
+    if (_firstTime) {
+      wait(_clusterManager.sendMessage());
+      System.out.println(
+          "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName() + " with ADD for "
+              + message.getResourceName() + " processed");
+      _firstTime = false;
+    }
+    wait(_clusterManager.sendMessage());
+    System.out.println("Message for " + message.getPartitionName() + " 
instance " + message.getTgtName()
+        + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() 
+ " processed");
+  }
+
+  public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
+    wait(_clusterManager.sendMessage());
+    System.out.println("Message for " + message.getPartitionName() + " 
instance " + message.getTgtName()
+        + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() 
+ " processed");
+  }
+
+  public void onBecomeDroppedFromOffline(Message message, NotificationContext 
context) {
+    wait(_clusterManager.sendMessage());
+    System.out.println(
+        "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName() + " with REMOVE for "
+            + message.getResourceName() + " processed");
+  }
+
+  private void wait(AtomicBoolean completed) {
+    _completed = completed;
+    while (true) {
+      try {
+        if (_completed.get()) {
+          break;
+        }
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
similarity index 85%
rename from 
helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java
rename to 
helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
index 71570ef15..5db789112 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayOnlineOfflineStateModelFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
@@ -1,5 +1,6 @@
-package org.apache.helix.gateway.service;
+package org.apache.helix.gateway.statemodel;
 
+import org.apache.helix.gateway.service.ClusterManager;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
 public class HelixGatewayOnlineOfflineStateModelFactory extends 
StateModelFactory<HelixGatewayOnlineOfflineStateModel> {

Reply via email to