Updated Branches:
  refs/heads/master a85c5f569 -> ab5c9b091

[HELIX-19]Allow instance to autojoin a cluster

Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/ab5c9b09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/ab5c9b09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/ab5c9b09

Branch: refs/heads/master
Commit: ab5c9b0912da5bff5af06d27e79fe8761f50d7bd
Parents: a85c5f5
Author: slu2011 <[email protected]>
Authored: Tue Apr 2 15:54:11 2013 -0700
Committer: slu2011 <[email protected]>
Committed: Tue Apr 2 15:54:11 2013 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java    |   40 +++++++-
 .../helix/integration/TestInstanceAutoJoin.java    |   76 +++++++++++++++
 2 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ab5c9b09/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4fe430a..67f9354 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -65,9 +65,12 @@ import 
org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.ConfigScope;
 import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.participant.DistClusterControllerElection;
@@ -117,6 +120,7 @@ public class ZKHelixManager implements HelixManager
   public static final int                     FLAPPING_TIME_WINDIOW   = 
300000; // Default to 300 sec
   public static final int                     MAX_DISCONNECT_THRESHOLD = 5;
   LiveInstanceInfoProvider                    _liveInstanceInfoProvider = null;
+  public static final String                  ALLOW_PARTICIPANT_AUTO_JOIN = 
"allowParticipantAutoJoin";
 
   public ZKHelixManager(String clusterName,
                         String instanceName,
@@ -682,11 +686,43 @@ public class ZKHelixManager implements HelixManager
       throw new HelixException("Initial cluster structure is not set up for 
cluster:"
           + _clusterName);
     }
-
+    // Read cluster config and see if instance can auto join the cluster
+    boolean autoJoin = false;
+    try
+    {
+      ConfigScope scope =
+          new ConfigScopeBuilder().forCluster(getClusterName())
+                                  .build();
+      autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, 
ALLOW_PARTICIPANT_AUTO_JOIN));
+      logger.info("Auto joining " + _clusterName +" is true");
+    }
+    catch(Exception e)
+    {
+    }
     if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, 
_instanceType))
     {
-      throw new HelixException("Initial cluster structure is not set up for 
instance:"
+      if(!autoJoin)
+      {
+        throw new HelixException("Initial cluster structure is not set up for 
instance:"
           + _instanceName + " instanceType:" + _instanceType);
+      }
+      else
+      {
+        logger.info("Auto joining instance " + _instanceName);
+        InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
+        String hostName = _instanceName;
+        String port = "";
+        int lastPos = _instanceName.lastIndexOf("_");
+        if (lastPos > 0)
+        {
+          hostName = _instanceName.substring(0, lastPos);
+          port = _instanceName.substring(lastPos + 1);
+        }
+        instanceConfig.setHostName(hostName);
+        instanceConfig.setPort(port);
+        instanceConfig.setInstanceEnabled(true);
+        getClusterManagmentTool().addInstance(_clusterName, instanceConfig);
+      }
     }
 
     if (_instanceType == InstanceType.PARTICIPANT

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/ab5c9b09/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
new file mode 100644
index 0000000..97bf770
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestInstanceAutoJoin.java
@@ -0,0 +1,76 @@
+package org.apache.helix.integration;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.TestHelper;
+import org.apache.helix.TestHelper.StartCMResult;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.model.ConfigScope;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.builder.ConfigScopeBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class TestInstanceAutoJoin extends ZkStandAloneCMTestBase
+{
+  String db2 = TEST_DB+"2";
+  @Test
+  public void testInstanceAutoJoin() throws Exception
+  {
+    String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0);
+    HelixManager manager = _startCMResultMap.get(instanceName)._manager;
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, 60, "OnlineOffline", 
IdealStateModeProperty.AUTO_REBALANCE+"");
+    
+
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, 1);
+    String instance2 = "localhost_279699";
+    StartCMResult result = TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, 
instance2);
+    
+    
+    Thread.sleep(500);
+    Assert.assertFalse(result._thread.isAlive());
+    Assert.assertTrue(null == 
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().liveInstance(instance2)));
+
+    ConfigScope scope =
+        new ConfigScopeBuilder().forCluster(CLUSTER_NAME)
+                                .build();
+    
+    manager.getConfigAccessor().set(scope, 
ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true");
+    
+    result =
+        TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+    
+    StartCMResult result2 =
+        TestHelper.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instance2);
+    
+    Thread.sleep(500);
+    Assert.assertTrue(result._thread.isAlive() || result2._thread.isAlive());
+    Assert.assertTrue(null != 
manager.getHelixDataAccessor().getProperty(accessor.keyBuilder().liveInstance(instance2)));
+    
+    result._manager.disconnect();
+    result2._manager.disconnect();
+    result._thread.interrupt();
+    result2._thread.interrupt();
+  }
+}

Reply via email to