This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch helix-0.9.x in repository https://gitbox.apache.org/repos/asf/helix.git
commit fec3f40725ea11b92620e5b7182864b324f484df Author: zhangmeng916 <[email protected]> AuthorDate: Fri Feb 7 15:41:36 2020 -0800 Modify participant manager to add cluster auto registration logic (#695) Modify participant logic to add logic for instance auto registration to a Helix cluster. Auto registration differs with existing auto join in that auto registration will retrieve the fault domain information for the instance and populate it under instanceConfig. With auto registration on, user does not need to manually populate instance information in Zookeeper. --- .../helix/manager/zk/ParticipantManager.java | 100 ++++++++++++++++----- .../main/java/org/apache/helix/util/HelixUtil.java | 19 ++++ .../manager/MockParticipantManager.java | 10 ++- 3 files changed, 107 insertions(+), 22 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index f25f29e..7ae62b8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -20,6 +20,8 @@ package org.apache.helix.manager.zk; */ import java.lang.management.ManagementFactory; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,6 +33,7 @@ import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixCloudProperty; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerProperty; @@ -40,6 +43,8 @@ import org.apache.helix.PreConnectCallback; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; import org.apache.helix.ZNRecordBucketizer; +import org.apache.helix.api.cloud.CloudInstanceInformation; +import org.apache.helix.api.cloud.CloudInstanceInformationProcessor; import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.model.CurrentState; @@ -52,6 +57,7 @@ import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory; +import org.apache.helix.util.HelixUtil; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +67,7 @@ import org.slf4j.LoggerFactory; */ public class ParticipantManager { private static Logger LOG = LoggerFactory.getLogger(ParticipantManager.class); + private static final String CLOUD_PROCESSOR_PATH_PREFIX = "org.apache.helix.cloud."; final HelixZkClient _zkclient; final HelixManager _manager; @@ -107,7 +114,7 @@ public class ParticipantManager { } /** - * Handle new session for a participang. + * Handle new session for a participant. * @throws Exception */ public void handleNewSession() throws Exception { @@ -132,18 +139,31 @@ public class ParticipantManager { } private void joinCluster() { - // Read cluster config and see if instance can auto join the cluster + // Read cluster config and see if an instance can auto join or auto register to the cluster boolean autoJoin = false; + boolean autoRegistration = false; + + // Read "allowParticipantAutoJoin" flag to see if an instance can auto join to the cluster try { - HelixConfigScope scope = - new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster( - _manager.getClusterName()).build(); - autoJoin = - Boolean.parseBoolean(_configAccessor.get(scope, - ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)); + HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER) + .forCluster(_manager.getClusterName()).build(); + autoJoin = Boolean + .parseBoolean(_configAccessor.get(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)); LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin); } catch (Exception e) { - // autoJoin is false + LOG.info("auto join is false for cluster" + _clusterName); + } + + // Read cloud config and see if an instance can auto register to the cluster + // Difference between auto join and auto registration is that the latter will also populate the + // domain information in instance config + try { + autoRegistration = + Boolean.valueOf(_helixManagerProperty.getHelixCloudProperty().getCloudEnabled()); + LOG.info("instance: " + _instanceName + " auto-register " + _clusterName + " is " + + autoRegistration); + } catch (Exception e) { + LOG.info("auto registration is false for cluster" + _clusterName); } if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) { @@ -151,23 +171,61 @@ public class ParticipantManager { throw new HelixException("Initial cluster structure is not set up for instance: " + _instanceName + ", instanceType: " + _instanceType); } else { - LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName); - InstanceConfig instanceConfig = new InstanceConfig(_instanceName); - String hostName = _instanceName; - String port = ""; - int lastPos = _instanceName.lastIndexOf("_"); - if (lastPos > 0) { - hostName = _instanceName.substring(0, lastPos); - port = _instanceName.substring(lastPos + 1); + if (!autoRegistration) { + LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName); + _helixAdmin.addInstance(_clusterName, HelixUtil.composeInstanceConfig(_instanceName)); + } else { + LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName); + CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation(); + String domain = cloudInstanceInformation + .get(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()); + + // Disable the verification for now + /*String cloudIdInRemote = cloudInstanceInformation + .get(CloudInstanceInformation.CloudInstanceField.INSTANCE_SET_NAME.name()); + String cloudIdInConfig = _configAccessor.getCloudConfig(_clusterName).getCloudID(); + + // validate that the instance is auto registering to the correct cluster + if (!cloudIdInRemote.equals(cloudIdInConfig)) { + throw new IllegalArgumentException(String.format( + "cloudId in config: %s is not consistent with cloudId from remote: %s. The instance is auto registering to a wrong cluster.", + cloudIdInConfig, cloudIdInRemote)); + }*/ + + InstanceConfig instanceConfig = HelixUtil.composeInstanceConfig(_instanceName); + instanceConfig.setDomain(domain); + _helixAdmin.addInstance(_clusterName, instanceConfig); } - instanceConfig.setHostName(hostName); - instanceConfig.setPort(port); - instanceConfig.setInstanceEnabled(true); - _helixAdmin.addInstance(_clusterName, instanceConfig); } } } + private CloudInstanceInformation getCloudInstanceInformation() { + String cloudInstanceInformationProcessorName = + _helixManagerProperty.getHelixCloudProperty().getCloudInfoProcessorName(); + try { + // fetch cloud instance information for the instance + String cloudInstanceInformationProcessorClassName = CLOUD_PROCESSOR_PATH_PREFIX + + _helixManagerProperty.getHelixCloudProperty().getCloudProvider().toLowerCase() + "." + + cloudInstanceInformationProcessorName; + Class processorClass = Class.forName(cloudInstanceInformationProcessorClassName); + Constructor constructor = processorClass.getConstructor(HelixCloudProperty.class); + CloudInstanceInformationProcessor processor = (CloudInstanceInformationProcessor) constructor + .newInstance(_helixManagerProperty.getHelixCloudProperty()); + List<String> responses = processor.fetchCloudInstanceInformation(); + + // parse cloud instance information for the participant + CloudInstanceInformation cloudInstanceInformation = + processor.parseCloudInstanceInformation(responses); + return cloudInstanceInformation; + } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException + | IllegalAccessException | InvocationTargetException ex) { + throw new HelixException( + "Failed to create a new instance for the class: " + cloudInstanceInformationProcessorName, + ex); + } + } + private void createLiveInstance() { String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath(); LiveInstance liveInstance = new LiveInstance(_instanceName); diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java index 5e7f7b4..de8fcf6 100644 --- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java @@ -290,4 +290,23 @@ public final class HelixUtil { return propertyDefaultValue; } + /** + * Compose the config for an instance + * @param instanceName + * @return InstanceConfig + */ + public static InstanceConfig composeInstanceConfig(String instanceName) { + InstanceConfig instanceConfig = new InstanceConfig(instanceName); + String hostName = instanceName; + String port = ""; + int lastPos = instanceName.lastIndexOf("_"); + if (lastPos > 0) { + hostName = instanceName.substring(0, lastPos); + port = instanceName.substring(lastPos + 1); + } + instanceConfig.setHostName(hostName); + instanceConfig.setPort(port); + instanceConfig.setInstanceEnabled(true); + return instanceConfig; + } } diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index d1677c6..c2a446e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -21,6 +21,7 @@ package org.apache.helix.integration.manager; import java.util.List; import java.util.concurrent.CountDownLatch; +import org.apache.helix.HelixCloudProperty; import org.apache.helix.InstanceType; import org.apache.helix.manager.zk.CallbackHandler; import org.apache.helix.manager.zk.ZKHelixManager; @@ -47,6 +48,7 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, protected MockMSModelFactory _msModelFactory; protected DummyLeaderStandbyStateModelFactory _lsModelFactory; protected DummyOnlineOfflineStateModelFactory _ofModelFactory; + protected HelixCloudProperty _helixCloudProperty; public MockParticipantManager(String zkAddr, String clusterName, String instanceName) { this(zkAddr, clusterName, instanceName, 10); @@ -54,11 +56,17 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, public MockParticipantManager(String zkAddr, String clusterName, String instanceName, int transDelay) { + this(zkAddr, clusterName, instanceName, transDelay, null); + } + + public MockParticipantManager(String zkAddr, String clusterName, String instanceName, + int transDelay, HelixCloudProperty helixCloudProperty) { super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr); _transDelay = transDelay; _msModelFactory = new MockMSModelFactory(null); _lsModelFactory = new DummyLeaderStandbyStateModelFactory(_transDelay); _ofModelFactory = new DummyOnlineOfflineStateModelFactory(_transDelay); + _helixCloudProperty = helixCloudProperty; } public void setTransition(MockTransition transition) { @@ -135,4 +143,4 @@ public class MockParticipantManager extends ZKHelixManager implements Runnable, public List<CallbackHandler> getHandlers() { return _handlers; } -} +} \ No newline at end of file
