This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 8cacca12cf302bef08d40ba1345169f0e36e7ffd Author: Kai Sun <[email protected]> AuthorDate: Mon May 20 09:43:36 2019 -0700 Title: Helix-1842: test active a cluster to super cluster with default to FULL_AUTO Description: This is a follow up of previous diff at rb 1666833. In the previous diff, we did to really trigger the logic such that the participant (controllers) in supercluster will monitor the added cluster. We fixed it in this diff. Also, we enhanced the test cases to tear down orphan-ed threads. The following is the original description about this task: Current v2 rest api, adding a cluster to supercluster will put the new ideastate to SEMI_AUTO. In this change, we make the default as follwing: REBALANCE_MODE = FULL_AUTO, replicas = 3, REBALANCER = DelayedAutoRebalancer, REBALANCE_STRATEGY = CrushEdRebalanceStrategy. Also, we will indeed make the replicas of controller to be 3, instead of all of the controllers as currently implemented. RB=1671448 G=helix-reviewers A=hulee Signed-off-by: Hunter Lee <[email protected]> --- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 1 + .../helix/rest/server/AbstractTestClass.java | 21 +++++- .../helix/rest/server/TestClusterAccessor.java | 79 ++++++++++++++++++++-- 3 files changed, 92 insertions(+), 9 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index a8dfcc9..dfb8ba8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -1073,6 +1073,7 @@ public class ZKHelixAdmin implements HelixAdmin { idealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); // TODO: Give user an option, say from RestAPI to config the number of replicas. idealState.setReplicas(Integer.toString(DEFAULT_SUPERCLUSTER_REPLICA)); + idealState.getRecord().setListField(clusterName, new ArrayList<String>()); List<String> controllers = getInstancesInCluster(grandCluster); if (controllers.size() == 0) { diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java index e5c42a0..347be89 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/AbstractTestClass.java @@ -126,7 +126,8 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { protected static Map<String, Set<String>> _liveInstancesMap = new HashMap<>(); protected static Map<String, Set<String>> _resourcesMap = new HashMap<>(); protected static Map<String, Map<String, Workflow>> _workflowMap = new HashMap<>(); - + protected static List<ClusterControllerManager> _clusterControllerManagers = new ArrayList<>(); + protected static List<MockParticipantManager> _mockParticipantManagers = new ArrayList<>(); protected static MockAuditLogger _auditLogger = new MockAuditLogger(); protected static HelixRestServer _helixRestServer; @@ -254,6 +255,19 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { @AfterSuite public void afterSuite() throws Exception { + // tear down orphan-ed threads + for (ClusterControllerManager cm : _clusterControllerManagers) { + if (cm != null && cm.isConnected()) { + cm.syncStop(); + } + } + + for (MockParticipantManager mm: _mockParticipantManagers) { + if (mm != null && mm.isConnected()) { + mm.syncStop(); + } + } + ZKClientPool.reset(); if (_gZkClient != null) { _gZkClient.close(); @@ -298,7 +312,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { _instancesMap.put(cluster, instances); _liveInstancesMap.put(cluster, liveInstances); _resourcesMap.put(cluster, resources); - startController(cluster); + _clusterControllerManagers.add(startController(cluster)); } preSetupForParallelInstancesStoppableTest(STOPPABLE_CLUSTER, STOPPABLE_INSTANCES); } @@ -356,6 +370,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { stateMachineEngine.registerStateModelFactory("Task", new TaskStateModelFactory(participant, taskFactoryReg)); participant.syncStart(); + _mockParticipantManagers.add(participant); liveInstances.add(instance); if (++i > numLiveinstances) { break; @@ -532,7 +547,7 @@ public class AbstractTestClass extends JerseyTestNg.ContainerPerClassTest { // Start participant startInstances(clusterName, new TreeSet<>(instances), 3); createResources(clusterName, 1); - startController(clusterName); + _clusterControllerManagers.add(startController(clusterName)); _clusters.add(STOPPABLE_CLUSTER); _workflowMap.put(STOPPABLE_CLUSTER, createWorkflows(STOPPABLE_CLUSTER, 3)); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index b8254c6..d07c55f 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -20,6 +20,7 @@ package org.apache.helix.rest.server; */ import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -37,11 +38,14 @@ import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.integration.manager.ClusterDistributedController; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZKUtil; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; import org.apache.helix.model.MaintenanceSignal; import org.apache.helix.rest.common.HelixRestNamespace; import org.apache.helix.rest.server.auditlog.AuditLog; @@ -49,7 +53,10 @@ import org.apache.helix.rest.server.resources.AbstractResource; import org.apache.helix.rest.server.resources.AbstractResource.Command; import org.apache.helix.rest.server.resources.helix.ClusterAccessor; import org.apache.helix.rest.server.util.JerseyUriRequestBuilder; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.annotate.JsonTypeInfo; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import org.testng.Assert; @@ -396,22 +403,82 @@ public class TestClusterAccessor extends AbstractTestClass { } @Test() - public void testActivateSuperCluster() { + public void testActivateSuperCluster() throws Exception { System.out.println("Start test :" + TestHelper.getTestMethodName()); - String cluster = "TestCluster_0"; - post("clusters/" + cluster, - ImmutableMap.of("command", "activate", "superCluster", "superCluster"), + final String ACTIVATE_SUPER_CLUSTER = "RestSuperClusterActivationTest_SuperCluster"; + final String ACTIVATE_NORM_CLUSTER = "RestSuperClusterActivationTest_NormalCluster"; + + // create testCluster + _gSetupTool.addCluster(ACTIVATE_NORM_CLUSTER, true); + ClusterConfig clusterConfig = new ClusterConfig(ACTIVATE_NORM_CLUSTER); + clusterConfig.setFaultZoneType("helixZoneId"); + _configAccessor.setClusterConfig(ACTIVATE_NORM_CLUSTER, clusterConfig); + Set<String> resources = createResourceConfigs(ACTIVATE_NORM_CLUSTER, 8); + + // create superCluster + _gSetupTool.addCluster(ACTIVATE_SUPER_CLUSTER,true); + ClusterConfig superClusterConfig = new ClusterConfig(ACTIVATE_SUPER_CLUSTER); + _configAccessor.setClusterConfig(ACTIVATE_SUPER_CLUSTER, superClusterConfig); + Set<String> instances = createInstances(ACTIVATE_SUPER_CLUSTER, 4); + List<ClusterDistributedController> clusterDistributedControllers = new ArrayList<>(); + for (String instance : instances) { + ClusterDistributedController controllerParticipant = + new ClusterDistributedController(ZK_ADDR, ACTIVATE_SUPER_CLUSTER, instance); + clusterDistributedControllers.add(controllerParticipant); + controllerParticipant.syncStart(); + } + + post("clusters/" + ACTIVATE_NORM_CLUSTER, + ImmutableMap.of("command", "activate", "superCluster", ACTIVATE_SUPER_CLUSTER), Entity.entity("", MediaType.APPLICATION_JSON_TYPE), Response.Status.OK .getStatusCode()); - HelixDataAccessor accessor = new ZKHelixDataAccessor(_superCluster, _baseAccessor); + HelixDataAccessor accessor = new ZKHelixDataAccessor(ACTIVATE_SUPER_CLUSTER, _baseAccessor); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - IdealState idealState = accessor.getProperty(keyBuilder.idealStates(cluster)); + final HelixDataAccessor normalAccessor = new ZKHelixDataAccessor(ACTIVATE_NORM_CLUSTER, _baseAccessor); + final PropertyKey.Builder normKeyBuilder = normalAccessor.keyBuilder(); + + boolean result = TestHelper.verify(new TestHelper.Verifier() { + @Override + public boolean verify() { + LiveInstance leader = normalAccessor.getProperty(normKeyBuilder.controllerLeader()); + return leader != null; + } + }, 12000); + Assert.assertTrue(result); + + BestPossibleExternalViewVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(ACTIVATE_SUPER_CLUSTER).setZkAddr(ZK_ADDR) + .setZkClient(_gZkClient).build(); + Assert.assertTrue(verifier.verifyByPolling()); + + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(ACTIVATE_NORM_CLUSTER)); Assert.assertEquals(idealState.getRebalanceMode(), IdealState.RebalanceMode.FULL_AUTO); Assert.assertEquals(idealState.getRebalancerClassName(), DelayedAutoRebalancer.class.getName()); Assert.assertEquals(idealState.getRebalanceStrategy(), CrushEdRebalanceStrategy.class.getName()); // Note, set expected replicas value to 3, as the same value of DEFAULT_SUPERCLUSTER_REPLICA in ClusterAccessor. Assert.assertEquals(idealState.getReplicas(), "3"); + + + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(ACTIVATE_NORM_CLUSTER)); + Map<String, String> extViewMapping = externalView.getRecord().getMapField(ACTIVATE_NORM_CLUSTER); + String superClusterleader = null; + for (Map.Entry<String, String> entry: extViewMapping.entrySet()) { + if (entry.getValue().equals("LEADER")) { + superClusterleader = entry.getKey(); + } + } + LiveInstance leader = normalAccessor.getProperty(normKeyBuilder.controllerLeader()); + Assert.assertEquals(leader.getId(), superClusterleader); + + // clean up by tearing down controllers and delete clusters + for (ClusterDistributedController dc: clusterDistributedControllers) { + if (dc != null && dc.isConnected()) { + dc.syncStop(); + } + } + _gSetupTool.deleteCluster(ACTIVATE_NORM_CLUSTER); + _gSetupTool.deleteCluster(ACTIVATE_SUPER_CLUSTER); } private ClusterConfig getClusterConfigFromRest(String cluster) throws IOException {
