This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit c6f99b61da74ce17d79889a89e7b0afdd4e0b4e1 Author: Zachary Pinto <zapi...@linkedin.com> AuthorDate: Thu Sep 12 14:13:13 2024 -0700 Expose setting gateway service channel to allow external managment of the lifecycle of the channel. (#2913) Expose setting gateway service channel to allow external managment of the lifecycle of the channel. --- .../channel/HelixGatewayServiceGrpcService.java | 3 +- .../gateway/service/GatewayServiceManager.java | 45 ++++++++++++++++------ .../apache/helix/gateway/util/PollChannelUtil.java | 1 - .../service/TestGatewayServiceConnection.java | 10 ++++- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java index 9ed06ca23..a9c1ca4a2 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/channel/HelixGatewayServiceGrpcService.java @@ -60,7 +60,7 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli private final GatewayServiceManager _manager; // A fine grain lock register on instance level - private final PerKeyLockRegistry _lockRegistry; + private final PerKeyLockRegistry _lockRegistry = new PerKeyLockRegistry();; private final GatewayServiceChannelConfig _config; @@ -69,7 +69,6 @@ public class HelixGatewayServiceGrpcService extends HelixGatewayServiceGrpc.Heli public HelixGatewayServiceGrpcService(GatewayServiceManager manager, GatewayServiceChannelConfig config) { _manager = manager; _config = config; - _lockRegistry = new PerKeyLockRegistry(); } /** diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java index 221c2aeac..94f5783f0 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/service/GatewayServiceManager.java @@ -57,27 +57,44 @@ public class GatewayServiceManager { private final ExecutorService _participantStateTransitionResultUpdator; // link to grpc service - private final HelixGatewayServiceChannel _gatewayServiceChannel; + private HelixGatewayServiceChannel _gatewayServiceChannel; // a per key executor for connection event. All event for the same instance will be executed in sequence. // It is used to ensure for each instance, the connect/disconnect event won't start until the previous one is done. private final PerKeyBlockingExecutor _connectionEventProcessor; - private final GatewayServiceChannelConfig _gatewayServiceChannelConfig; - private final Map<String, GatewayCurrentStateCache> _currentStateCacheMap; - public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) { + public GatewayServiceManager(String zkAddress) { _helixGatewayParticipantMap = new ConcurrentHashMap<>(); _zkAddress = zkAddress; _participantStateTransitionResultUpdator = Executors.newSingleThreadExecutor(); - _gatewayServiceChannel = HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig, this); _connectionEventProcessor = new PerKeyBlockingExecutor(CONNECTION_EVENT_THREAD_POOL_SIZE); // todo: make it configurable - _gatewayServiceChannelConfig = gatewayServiceChannelConfig; _currentStateCacheMap = new HashMap<>(); } + public GatewayServiceManager(String zkAddress, GatewayServiceChannelConfig gatewayServiceChannelConfig) { + this(zkAddress); + _gatewayServiceChannel = HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig, this); + } + + /** + * Set the gateway service channel. This can only be called once. + * The channel is used to send state transition message to the participant. + * + * @param channel the gateway service channel + * @throws IllegalStateException if the channel is already set + */ + public void setGatewayServiceChannel(HelixGatewayServiceChannel channel) { + if (_gatewayServiceChannel != null) { + _gatewayServiceChannel.stop(); + return; + } + throw new IllegalStateException( + "Gateway service channel is already set, it can only be set once."); + } + /** * Process the event from Grpc service and dispatch to async executor for processing. * @@ -85,7 +102,7 @@ public class GatewayServiceManager { */ public void onGatewayServiceEvent(GatewayServiceEvent event) { if (event.getEventType().equals(GatewayServiceEventType.UPDATE)) { - _participantStateTransitionResultUpdator.submit(new shardStateUpdator(event)); + _participantStateTransitionResultUpdator.submit(new ShardStateUpdator(event)); } else { _connectionEventProcessor.offerEvent(event.getInstanceName(), new ParticipantConnectionProcessor(event)); } @@ -117,11 +134,11 @@ public class GatewayServiceManager { /** * Update in memory shard state */ - class shardStateUpdator implements Runnable { + class ShardStateUpdator implements Runnable { private final GatewayServiceEvent _event; - private shardStateUpdator(GatewayServiceEvent event) { + private ShardStateUpdator(GatewayServiceEvent event) { _event = event; } @@ -162,15 +179,19 @@ public class GatewayServiceManager { } } + public void stopManager() { + _connectionEventProcessor.shutdown(); + _participantStateTransitionResultUpdator.shutdown(); + _helixGatewayParticipantMap.clear(); + } + public void startService() throws IOException { _gatewayServiceChannel.start(); } public void stopService() { _gatewayServiceChannel.stop(); - _connectionEventProcessor.shutdown(); - _participantStateTransitionResultUpdator.shutdown(); - _helixGatewayParticipantMap.clear(); + stopManager(); } private void createHelixGatewayParticipant(String clusterName, String instanceName, diff --git a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java index 69c9c219d..26de6db0f 100644 --- a/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java +++ b/helix-gateway/src/main/java/org/apache/helix/gateway/util/PollChannelUtil.java @@ -30,7 +30,6 @@ import io.grpc.health.v1.HealthGrpc; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.tuple.ImmutablePair; diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java index f3221a12c..8253444a8 100644 --- a/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/service/TestGatewayServiceConnection.java @@ -25,9 +25,12 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + +import org.apache.helix.gateway.api.service.HelixGatewayServiceChannel; import org.apache.helix.gateway.base.HelixGatewayTestBase; import org.apache.helix.gateway.channel.GatewayServiceChannelConfig; import org.apache.helix.gateway.api.constant.GatewayServiceEventType; +import org.apache.helix.gateway.channel.HelixGatewayServiceChannelFactory; import org.testng.Assert; import org.testng.annotations.Test; import proto.org.apache.helix.gateway.HelixGatewayServiceGrpc; @@ -40,7 +43,7 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { CountDownLatch disconnectLatch = new CountDownLatch(1); @Test - public void TestLivenessDetection() throws IOException, InterruptedException { + public void testLivenessDetection() throws IOException, InterruptedException { // start the gateway service GatewayServiceChannelConfig config = new GatewayServiceChannelConfig.GatewayServiceProcessorConfigBuilder().setGrpcServerPort(50051).build(); @@ -136,7 +139,10 @@ public class TestGatewayServiceConnection extends HelixGatewayTestBase { class DummyGatewayServiceManager extends GatewayServiceManager { public DummyGatewayServiceManager(GatewayServiceChannelConfig gatewayServiceChannelConfig) { - super("dummyZkAddress", gatewayServiceChannelConfig); + super("dummyZkAddress"); + this.setGatewayServiceChannel( + HelixGatewayServiceChannelFactory.createServiceChannel(gatewayServiceChannelConfig, + this)); } @Override