This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/helix-gateway-service by this 
push:
     new cf6a202c5 Gateway service - service structure dummy class  (#2840)
cf6a202c5 is described below

commit cf6a202c5ef7ce0819c11938eeeab37263d02307
Author: xyuanlu <xyua...@gmail.com>
AuthorDate: Mon Jul 22 17:28:17 2024 -0700

    Gateway service - service structure dummy class  (#2840)
    
    Gateway service - service structure dummy class
---
 .../org/apache/helix/gateway/HelixGatewayMain.java | 15 +++++
 .../grpcservice/HelixGatewayServiceService.java    | 35 +++++++++++-
 .../helix/gateway/service/ClusterManager.java      | 33 -----------
 .../gateway/service/GatewayServiceManager.java     | 65 ++++++++++++++++++++++
 .../service/GatewayServiceManagerFactory.java      | 11 ++++
 .../helix/gateway/service/HelixGatewayService.java | 52 +++++++++++++----
 .../service/HelixGatewayServiceProcessor.java      | 11 ++++
 .../helix/gateway/service/ReplicaStateTracker.java |  2 +-
 .../service/StateTransitionMessageTranslator.java  |  4 --
 .../HelixGatewayOnlineOfflineStateModel.java       | 16 +++---
 ...HelixGatewayOnlineOfflineStateModelFactory.java |  7 ++-
 .../util/StateTransitionMessageTranslateUtil.java  | 17 ++++++
 .../src/main/proto/HelixGatewayService.proto       | 17 +++---
 13 files changed, 216 insertions(+), 69 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java 
b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
new file mode 100644
index 000000000..c08eb618d
--- /dev/null
+++ b/helix-gateway/src/main/java/org/apache/helix/gateway/HelixGatewayMain.java
@@ -0,0 +1,15 @@
+package org.apache.helix.gateway;
+
+/**
+ * Main class for Helix Gateway.
+ * It starts the Helix Gateway grpc service.
+ */
+public final class HelixGatewayMain {
+
+  private HelixGatewayMain() {
+  }
+
+  public static void main(String[] args) throws InterruptedException {
+
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
index 286cf2001..237f1f272 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/grpcservice/HelixGatewayServiceService.java
@@ -1,10 +1,23 @@
 package org.apache.helix.gateway.grpcservice;
 
 import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.service.HelixGatewayServiceProcessor;
 import proto.org.apache.helix.gateway.*;
 import proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.*;
 
-public class HelixGatewayServiceService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase {
+import java.util.Map;
+
+
+/**
+ * Helix Gateway Service GRPC UI implementation.
+ */
+public class HelixGatewayServiceService extends 
HelixGatewayServiceGrpc.HelixGatewayServiceImplBase
+    implements HelixGatewayServiceProcessor {
+
+  Map<String, StreamObserver<TransitionMessage>> _observerMap =
+      new ConcurrentHashMap<String, StreamObserver<TransitionMessage>>();
 
   @Override
   public 
StreamObserver<proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage>
 report(
@@ -16,6 +29,12 @@ public class HelixGatewayServiceService extends 
HelixGatewayServiceGrpc.HelixGat
       public void onNext(ShardStateMessage request) {
         // called when a client sends a message
         //....
+        String instanceName = request.getInstanceName();
+        if (!_observerMap.containsValue(instanceName)) {
+          // update state map
+          updateObserver(instanceName, responseObserver);
+        }
+        // process the message
       }
 
       @Override
@@ -31,4 +50,18 @@ public class HelixGatewayServiceService extends 
HelixGatewayServiceGrpc.HelixGat
       }
     };
   }
+
+  @Override
+  public boolean sendStateTransitionMessage(String instanceName) {
+    return false;
+  }
+
+  @Override
+  public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent 
event) {
+
+  }
+
+  public void updateObserver(String instanceName, 
StreamObserver<TransitionMessage> streamObserver) {
+    _observerMap.put(instanceName, streamObserver);
+  }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
deleted file mode 100644
index b96694f8b..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ClusterManager.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package org.apache.helix.gateway.service;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-
-public class ClusterManager {
-  private Map<String, Map<String, AtomicBoolean>> _flagMap;
-  private Lock _lock = new ReentrantLock();
-
-  // event queue
-  // state tracker, call tracker.update
-
-  public ClusterManager() {
-    _flagMap = new ConcurrentHashMap<>();
-  }
-
-  public void addChannel() {
-  }
-
-  public void removeChannel(String instanceName) {
-    _flagMap.remove(instanceName);
-  }
-
-  public AtomicBoolean sendMessage() {
-    AtomicBoolean flag = new AtomicBoolean(false);
-    return flag;
-  }
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
new file mode 100644
index 000000000..ee803b9cc
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -0,0 +1,65 @@
+package org.apache.helix.gateway.service;
+
+import java.util.Map;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.helix.gateway.grpcservice.HelixGatewayServiceService;
+
+
+/**
+ * A top layer class that send/receive messages from Grpc end point, and 
dispatch them to corrsponding gateway services.
+ *  1. get event from Grpc service
+ *  2. Maintain a gateway service registry, one gateway service maps to one 
Helix cluster
+ *  3. On init connect, create the participant manager
+ *  4. For ST reply message, update the tracker
+ */
+
+public class GatewayServiceManager {
+
+  HelixGatewayServiceService _helixGatewayServiceService;
+
+  HelixGatewayServiceProcessor _helixGatewayServiceProcessor;
+
+  Map<String, HelixGatewayService> _helixGatewayServiceMap;
+
+  // TODO: add thread pool for init
+  // single thread tp for update
+
+  public enum EventType {
+    CONNECT,    // init connection to gateway service
+    UPDATE,  // update state transition result
+    DISCONNECT // shutdown connection to gateway service.
+  }
+
+  public class GateWayServiceEvent {
+    // event type
+    EventType eventType;
+    // event data
+    String clusterName;
+    String participantName;
+
+    // todo: add more fields
+  }
+
+  public GatewayServiceManager() {
+    _helixGatewayServiceMap = new ConcurrentHashMap<>();
+  }
+
+  public AtomicBoolean sendTransitionRequestToApplicationInstance() {
+
+    return null;
+  }
+
+  public void updateShardState() {
+
+  }
+
+  public void newParticipantConnecting() {
+
+  }
+
+  public void participantDisconnected() {
+
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
new file mode 100644
index 000000000..4bab8f0b7
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManagerFactory.java
@@ -0,0 +1,11 @@
+package org.apache.helix.gateway.service;
+
+/**
+ * Factory class to create GatewayServiceManager
+ */
+public class GatewayServiceManagerFactory {
+
+  public GatewayServiceManager createGatewayServiceManager() {
+    return new GatewayServiceManager();
+  }
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
index 12810f80d..49f1bbf2c 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayService.java
@@ -1,27 +1,35 @@
 package org.apache.helix.gateway.service;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import 
org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModelFactory;
 import org.apache.helix.manager.zk.ZKHelixManager;
 
 
+/**
+ * A service object for each Helix cluster.
+ * This service object manages the Helix participants in the cluster.
+ */
 public class HelixGatewayService {
   final private Map<String, Map<String, HelixManager>> _participantsMap;
 
   final private String _zkAddress;
-  private final ClusterManager _clusterManager;
-
-  public HelixGatewayService(String zkAddress) {
+  private final GatewayServiceManager _gatewayServiceManager;
+  private Map<String, Map<String, AtomicBoolean>> _flagMap;
+  public HelixGatewayService(GatewayServiceManager gatewayServiceManager, 
String zkAddress) {
     _participantsMap = new ConcurrentHashMap<>();
     _zkAddress = zkAddress;
-    _clusterManager = new ClusterManager();
+    _gatewayServiceManager = gatewayServiceManager;
+    _flagMap = new ConcurrentHashMap<>();
   }
 
-  public ClusterManager getClusterManager() {
-    return _clusterManager;
+  public GatewayServiceManager getClusterManager() {
+    return _gatewayServiceManager;
   }
 
   public void start() {
@@ -32,9 +40,8 @@ public class HelixGatewayService {
     // TODO: create participant manager and add to _participantsMap
     HelixManager manager = new ZKHelixManager("clusterName", "instanceName", 
InstanceType.PARTICIPANT, _zkAddress);
     manager.getStateMachineEngine()
-        .registerStateModelFactory("OnlineOffline", new 
HelixGatewayOnlineOfflineStateModelFactory(_clusterManager));
+        .registerStateModelFactory("OnlineOffline", new 
HelixGatewayOnlineOfflineStateModelFactory(_gatewayServiceManager));
     try {
-      _clusterManager.addChannel();
       manager.connect();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -45,11 +52,36 @@ public class HelixGatewayService {
     HelixManager manager = 
_participantsMap.get(clusterName).remove(participantName);
     if (manager != null) {
       manager.disconnect();
-      _clusterManager.removeChannel(participantName);
+      removeChannel(participantName);
     }
   }
 
+  public void addChannel() {
+   // _flagMap.computeIfAbsent(mockApplication.getInstanceName(), k -> new 
ConcurrentHashMap<>());
+  }
+
+  public void removeChannel(String instanceName) {
+    _flagMap.remove(instanceName);
+  }
+
+  public AtomicBoolean sendMessage() {
+      AtomicBoolean flag = new AtomicBoolean(false);
+      return flag;
+  }
+
+  public void receiveSTResponse() {
+     // AtomicBoolean flag = 
_flagMap.get(instanceName).remove(response.getMessageId());
+  }
+
+  public void newParticipantConnecting(){
+
+  }
+
+  public void participantDisconnected(){
+
+  }
+
   public void stop() {
-    System.out.println("Stoping Helix Gateway Service");
+    System.out.println("Stopping Helix Gateway Service");
   }
 }
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
new file mode 100644
index 000000000..d419a71b5
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/HelixGatewayServiceProcessor.java
@@ -0,0 +1,11 @@
+package org.apache.helix.gateway.service;
+
+/**
+ * Translate from/to GRPC function call to Helix Gateway Service event.
+ */
+public interface HelixGatewayServiceProcessor {
+
+  public boolean sendStateTransitionMessage(String instanceName);
+
+  public void sendEventToManager(GatewayServiceManager.GateWayServiceEvent 
event);
+}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
index 1e7c16e38..3df3b00db 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/ReplicaStateTracker.java
@@ -2,7 +2,7 @@ package org.apache.helix.gateway.service;
 
 public class ReplicaStateTracker {
 
-  boolean compareTargetState(){
+  boolean compareTargetState() {
     return true;
   }
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java
deleted file mode 100644
index ae2dbfe94..000000000
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/StateTransitionMessageTranslator.java
+++ /dev/null
@@ -1,4 +0,0 @@
-package org.apache.helix.gateway.service;
-
-public class StateTransitionMessageTranslator {
-}
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
index 5c95feb38..1f2846c6c 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModel.java
@@ -2,13 +2,13 @@ package org.apache.helix.gateway.statemodel;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.helix.NotificationContext;
-import org.apache.helix.gateway.service.ClusterManager;
+import org.apache.helix.gateway.service.GatewayServiceManager;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
 
 public class HelixGatewayOnlineOfflineStateModel extends StateModel {
   private boolean _firstTime = true;
-  private ClusterManager _clusterManager;
+  private GatewayServiceManager _gatewayServiceManager;
 
   private String _resourceName;
   private String _partitionKey;
@@ -16,33 +16,33 @@ public class HelixGatewayOnlineOfflineStateModel extends 
StateModel {
   private AtomicBoolean _completed;
 
   public HelixGatewayOnlineOfflineStateModel(String resourceName, String 
partitionKey,
-      ClusterManager clusterManager) {
+      GatewayServiceManager gatewayServiceManager) {
     _resourceName = resourceName;
     _partitionKey = partitionKey;
-    _clusterManager = clusterManager;
+    _gatewayServiceManager = gatewayServiceManager;
   }
 
   public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
     if (_firstTime) {
-      wait(_clusterManager.sendMessage());
+      
wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
       System.out.println(
           "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName() + " with ADD for "
               + message.getResourceName() + " processed");
       _firstTime = false;
     }
-    wait(_clusterManager.sendMessage());
+    wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
     System.out.println("Message for " + message.getPartitionName() + " 
instance " + message.getTgtName()
         + " with CHANGE_ROLE_OFFLINE_ONLINE for " + message.getResourceName() 
+ " processed");
   }
 
   public void onBecomeOfflineFromOnline(Message message, NotificationContext 
context) {
-    wait(_clusterManager.sendMessage());
+    wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
     System.out.println("Message for " + message.getPartitionName() + " 
instance " + message.getTgtName()
         + " with CHANGE_ROLE_ONLINE_OFFLINE for " + message.getResourceName() 
+ " processed");
   }
 
   public void onBecomeDroppedFromOffline(Message message, NotificationContext 
context) {
-    wait(_clusterManager.sendMessage());
+    wait(_gatewayServiceManager.sendTransitionRequestToApplicationInstance());
     System.out.println(
         "Message for " + message.getPartitionName() + " instance " + 
message.getTgtName() + " with REMOVE for "
             + message.getResourceName() + " processed");
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
index 5db789112..b7e06051e 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/statemodel/HelixGatewayOnlineOfflineStateModelFactory.java
@@ -1,12 +1,13 @@
 package org.apache.helix.gateway.statemodel;
 
-import org.apache.helix.gateway.service.ClusterManager;
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import org.apache.helix.gateway.statemodel.HelixGatewayOnlineOfflineStateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 
 public class HelixGatewayOnlineOfflineStateModelFactory extends 
StateModelFactory<HelixGatewayOnlineOfflineStateModel> {
-  private ClusterManager _clusterManager;
+  private GatewayServiceManager _clusterManager;
 
-  public HelixGatewayOnlineOfflineStateModelFactory(ClusterManager 
clusterManager) {
+  public HelixGatewayOnlineOfflineStateModelFactory(GatewayServiceManager 
clusterManager) {
     _clusterManager = clusterManager;
   }
 
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
new file mode 100644
index 000000000..7f0c592c2
--- /dev/null
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/StateTransitionMessageTranslateUtil.java
@@ -0,0 +1,17 @@
+package org.apache.helix.gateway.util;
+
+import org.apache.helix.gateway.service.GatewayServiceManager;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.TransitionMessage;
+import 
proto.org.apache.helix.gateway.HelixGatewayServiceOuterClass.ShardStateMessage;
+
+
+public final class StateTransitionMessageTranslateUtil {
+
+  public static TransitionMessage translateSTMsgToProto() {
+    return null;
+  }
+
+  public static GatewayServiceManager.GateWayServiceEvent 
translateProtoToSTMsg(ShardStateMessage message) {
+    return null;
+  }
+}
diff --git a/helix-gateway/src/main/proto/HelixGatewayService.proto 
b/helix-gateway/src/main/proto/HelixGatewayService.proto
index c82431123..dbf488b5a 100644
--- a/helix-gateway/src/main/proto/HelixGatewayService.proto
+++ b/helix-gateway/src/main/proto/HelixGatewayService.proto
@@ -26,26 +26,25 @@ message SingleShardTransitionStatus {
   optional string currentState = 3;   // If it failed, what is the current 
state it should reported as.
 }
 
-// resource has list of replica
-
 message SingleResourceState {
-  string resource = 1;
-  repeated SingleShardState SingleReplicaState= 2;
+  string resource = 1;             // name of the resource
+  repeated SingleShardState SingleShardState = 2;    // State of each shard
 
 }
 
 message SingleShardState {
-  string shardaName = 1;
-  string currentState = 2;
+  string shardName = 1;       // Name of the shard
+  string currentState = 2;     // Current state of the shard
 }
 
 //
 message ShardStateMessage{
-  repeated SingleShardTransitionStatus replicaStateST = 1;
-  repeated SingleShardState shardState = 2;
+  string instanceName = 1;         // Name of the application instance
+  string clusterName = 2;          // Name of the cluster to connect to
+  repeated SingleShardTransitionStatus shardStatus = 3;    // state transition 
result for a shard
+  repeated SingleShardState shardState = 4;        // State of each shard, 
only reported upon init connection
 }
 
-
 service HelixGatewayService {
   rpc report(stream ShardStateMessage) returns (stream TransitionMessage) {}
 }

Reply via email to