This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git

commit c6f99b61da74ce17d79889a89e7b0afdd4e0b4e1
Author: Zachary Pinto <zapi...@linkedin.com>
AuthorDate: Thu Sep 12 14:13:13 2024 -0700

    Expose setting gateway service channel to allow external managment of the 
lifecycle of the channel. (#2913)
    
    Expose setting gateway service channel to allow external managment of the 
lifecycle of the channel.
---
 .../channel/HelixGatewayServiceGrpcService.java    |  3 +-
 .../gateway/service/GatewayServiceManager.java     | 45 ++++++++++++++++------
 .../apache/helix/gateway/util/PollChannelUtil.java |  1 -
 .../service/TestGatewayServiceConnection.java      | 10 ++++-
 4 files changed, 42 insertions(+), 17 deletions(-)

diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
index 9ed06ca23..a9c1ca4a2 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java
@@ -60,7 +60,7 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   private final GatewayServiceManager _manager;
 
   // A fine grain lock register on instance level
-  private final PerKeyLockRegistry _lockRegistry;
+  private final PerKeyLockRegistry _lockRegistry = new PerKeyLockRegistry();;
 
   private final GatewayServiceChannelConfig _config;
 
@@ -69,7 +69,6 @@ public class HelixGatewayServiceGrpcService extends 
HelixGatewayServiceGrpc.Heli
   public HelixGatewayServiceGrpcService(GatewayServiceManager manager, 
GatewayServiceChannelConfig config) {
     _manager = manager;
     _config = config;
-    _lockRegistry = new PerKeyLockRegistry();
   }
 
   /**
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
index 221c2aeac..94f5783f0 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java
@@ -57,27 +57,44 @@ public class GatewayServiceManager {
   private final ExecutorService _participantStateTransitionResultUpdator;
 
   // link to grpc service
-  private final HelixGatewayServiceChannel _gatewayServiceChannel;
+  private HelixGatewayServiceChannel _gatewayServiceChannel;
 
   // a per key executor for connection event. All event for the same instance 
will be executed in sequence.
   // It is used to ensure for each instance, the connect/disconnect event 
won't start until the previous one is done.
   private final PerKeyBlockingExecutor _connectionEventProcessor;
 
-  private final GatewayServiceChannelConfig _gatewayServiceChannelConfig;
-
   private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap;
 
-  public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
+  public GatewayServiceManager(String zkAddress) {
     _helixGatewayParticipantMap = new ConcurrentHashMap<>();
     _zkAddress = zkAddress;
     _participantStateTransitionResultUpdator = 
Executors.newSingleThreadExecutor();
-    _gatewayServiceChannel = 
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
 this);
     _connectionEventProcessor =
         new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // 
todo: make it configurable
-    _gatewayServiceChannelConfig = gatewayServiceChannelConfig;
     _currentStateCacheMap = new HashMap<>();
   }
 
+  public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
+    this(zkAddress);
+    _gatewayServiceChannel = 
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
 this);
+  }
+
+  /**
+   * Set the gateway service channel. This can only be called once.
+   * The channel is used to send state transition message to the participant.
+   *
+   * @param channel the gateway service channel
+   * @throws IllegalStateException if the channel is already set
+   */
+  public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) {
+    if (_gatewayServiceChannel != null) {
+      _gatewayServiceChannel.stop();
+      return;
+    }
+    throw new IllegalStateException(
+        "Gateway service channel is already set, it can only be set once.");
+  }
+
   /**
    * Process the event from Grpc service and dispatch to async executor for 
processing.
    *
@@ -85,7 +102,7 @@ public class GatewayServiceManager {
    */
   public void onGatewayServiceEvent(GatewayServiceEvent event) {
     if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) {
-      _participantStateTransitionResultUpdator.submit(new 
shardStateUpdator(event));
+      _participantStateTransitionResultUpdator.submit(new 
ShardStateUpdator(event));
     } else {
       _connectionEventProcessor.offerEvent(event.getInstanceName(), new 
ParticipantConnectionProcessor(event));
     }
@@ -117,11 +134,11 @@ public class GatewayServiceManager {
   /**
    * Update in memory shard state
    */
-  class shardStateUpdator implements Runnable {
+  class ShardStateUpdator implements Runnable {
 
     private final GatewayServiceEvent _event;
 
-    private shardStateUpdator(GatewayServiceEvent event) {
+    private ShardStateUpdator(GatewayServiceEvent event) {
       _event = event;
     }
 
@@ -162,15 +179,19 @@ public class GatewayServiceManager {
     }
   }
 
+  public void stopManager() {
+    _connectionEventProcessor.shutdown();
+    _participantStateTransitionResultUpdator.shutdown();
+    _helixGatewayParticipantMap.clear();
+  }
+
   public void startService() throws IOException {
     _gatewayServiceChannel.start();
   }
 
   public void stopService() {
     _gatewayServiceChannel.stop();
-    _connectionEventProcessor.shutdown();
-    _participantStateTransitionResultUpdator.shutdown();
-    _helixGatewayParticipantMap.clear();
+    stopManager();
   }
 
   private void createHelixGatewayParticipant(String clusterName, String 
instanceName,
diff --git 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
index 69c9c219d..26de6db0f 100644
--- 
a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
+++ 
b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java
@@ -30,7 +30,6 @@ import io.grpc.health.v1.HealthGrpc;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.lang3.tuple.ImmutablePair;
diff --git 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
index f3221a12c..8253444a8 100644
--- 
a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
+++ 
b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java
@@ -25,9 +25,12 @@ import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel;
 import org.apache.helix.gateway.base.HelixGatewayTestBase;
 import org.apache.helix.gateway.channel.GatewayServiceChannelConfig;
 import org.apache.helix.gateway.api.constant.GatewayServiceEventType;
+import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc;
@@ -40,7 +43,7 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
   CountDownLatch disconnectLatch = new CountDownLatch(1);
 
   @Test
-  public void TestLivenessDetection() throws IOException, InterruptedException 
{
+  public void testLivenessDetection() throws IOException, InterruptedException 
{
     // start the gateway service
     GatewayServiceChannelConfig config =
         new 
GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build();
@@ -136,7 +139,10 @@ public class TestGatewayServiceConnection extends 
HelixGatewayTestBase {
   class DummyGatewayServiceManager extends GatewayServiceManager {
 
     public DummyGatewayServiceManager(GatewayServiceChannelConfig 
gatewayServiceChannelConfig) {
-      super("dummyZkAddress", gatewayServiceChannelConfig);
+      super("dummyZkAddress");
+      this.setGatewayServiceChannel(
+          
HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig,
+              this));
     }
 
     @Override

Reply via email to