Updated Branches: refs/heads/master a85c5f569 -> ab5c9b091
[HELIX-19]Allow instance to autojoin a cluster Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/ab5c9b09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/ab5c9b09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/ab5c9b09 Branch: refs/heads/master Commit: ab5c9b0912da5bff5af06d27e79fe8761f50d7bd Parents: a85c5f5 Author: slu2011 <[email protected]> Authored: Tue Apr 2 15:54:11 2013 -0700 Committer: slu2011 <[email protected]> Committed: Tue Apr 2 15:54:11 2013 -0700 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 40 +++++++- .../helix/integration/TestInstanceAutoJoin.java | 76 +++++++++++++++ 2 files changed, 114 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ab5c9b09/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 4fe430a..67f9354 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -65,9 +65,12 @@ import org.apache.helix.healthcheck.ParticipantHealthReportCollector; import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl; import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.messaging.handling.MessageHandlerFactory; +import org.apache.helix.model.ConfigScope; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message.MessageType; +import org.apache.helix.model.builder.ConfigScopeBuilder; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.monitoring.ZKPathDataDumpTask; import org.apache.helix.participant.DistClusterControllerElection; @@ -117,6 +120,7 @@ public class ZKHelixManager implements HelixManager public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec public static final int MAX_DISCONNECT_THRESHOLD = 5; LiveInstanceInfoProvider _liveInstanceInfoProvider = null; + public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin"; public ZKHelixManager(String clusterName, String instanceName, @@ -682,11 +686,43 @@ public class ZKHelixManager implements HelixManager throw new HelixException("Initial cluster structure is not set up for cluster:" + _clusterName); } - + // Read cluster config and see if instance can auto join the cluster + boolean autoJoin = false; + try + { + ConfigScope scope = + new ConfigScopeBuilder().forCluster(getClusterName()) + .build(); + autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN)); + logger.info("Auto joining " + _clusterName +" is true"); + } + catch(Exception e) + { + } if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType)) { - throw new HelixException("Initial cluster structure is not set up for instance:" + if(!autoJoin) + { + throw new HelixException("Initial cluster structure is not set up for instance:" + _instanceName + " instanceType:" + _instanceType); + } + else + { + logger.info("Auto joining instance " + _instanceName); + InstanceConfig instanceConfig = new InstanceConfig(_instanceName); + String hostName = _instanceName; + String port = ""; + int lastPos = _instanceName.lastIndexOf("_"); + if (lastPos > 0) + { + hostName = _instanceName.substring(0, lastPos); + port = _instanceName.substring(lastPos + 1); + } + instanceConfig.setHostName(hostName); + instanceConfig.setPort(port); + instanceConfig.setInstanceEnabled(true); + getClusterManagmentTool().addInstance(_clusterName, instanceConfig); + } } if (_instanceType == InstanceType.PARTICIPANT http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ab5c9b09/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java new file mode 100644 index 0000000..97bf770 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java @@ -0,0 +1,76 @@ +package org.apache.helix.integration; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.TestHelper; +import org.apache.helix.TestHelper.StartCMResult; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.model.ConfigScope; +import org.apache.helix.model.IdealState.IdealStateModeProperty; +import org.apache.helix.model.builder.ConfigScopeBuilder; +import org.testng.Assert; +import org.testng.annotations.Test; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase +{ + String db2 = TEST_DB+"2"; + @Test + public void testInstanceAutoJoin() throws Exception + { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0); + HelixManager manager = _startCMResultMap.get(instanceName)._manager; + HelixDataAccessor accessor = manager.getHelixDataAccessor(); + + _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE+""); + + + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 1); + String instance2 = "localhost_279699"; + StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2); + + + Thread.sleep(500); + Assert.assertFalse(result._thread.isAlive()); + Assert.assertTrue(null == manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().liveInstance(instance2))); + + ConfigScope scope = + new ConfigScopeBuilder().forCluster(CLUSTER_NAME) + .build(); + + manager.getConfigAccessor().set(scope, ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true"); + + result = + TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2); + + StartCMResult result2 = + TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2); + + Thread.sleep(500); + Assert.assertTrue(result._thread.isAlive() || result2._thread.isAlive()); + Assert.assertTrue(null != manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().liveInstance(instance2))); + + result._manager.disconnect(); + result2._manager.disconnect(); + result._thread.interrupt(); + result2._thread.interrupt(); + } +}
