[HELIX-236] Create a hierarchical cluster snapshot to replace ClusterDataCache
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/0b67257a Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/0b67257a Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/0b67257a Branch: refs/heads/master Commit: 0b67257ac579042a80a30c180d51d3ec2aa8860f Parents: 24fd868 Author: zzhang <[email protected]> Authored: Wed Sep 25 15:42:26 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Nov 6 13:17:34 2013 -0800 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 1 + .../helix/tools/ClusterStateVerifier.java | 14 +++----- .../helix/integration/TestAutoRebalance.java | 29 ++++++++++------ .../TestAutoRebalancePartitionLimit.java | 35 ++++++++++---------- .../helix/integration/TestBatchMessage.java | 11 +++--- .../TestCustomizedIdealStateRebalancer.java | 33 +++++++++--------- 6 files changed, 65 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index f50b95c..2c4d8e1 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -42,6 +42,7 @@ import org.apache.log4j.Logger; * Reads the data from the cluster using data accessor. This output ClusterData which * provides useful methods to search/lookup properties */ +@Deprecated public class ClusterDataCache { Map<String, LiveInstance> _liveInstanceMap; http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index d4c1691..3370c99 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -227,21 +227,15 @@ public class ClusterStateVerifier { Map<String, Map<String, String>> errStates, String clusterName) { try { Builder keyBuilder = accessor.keyBuilder(); - // read cluster once and do verification - // TODO: stop using ClusterDataCache - ClusterDataCache cache = new ClusterDataCache(); - cache.refresh(accessor); - - Map<String, IdealState> idealStates = cache.getIdealStates(); - if (idealStates == null) // || idealStates.isEmpty()) - { + + Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates()); + if (idealStates == null) { // ideal state is null because ideal state is dropped idealStates = Collections.emptyMap(); } Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews()); - if (extViews == null) // || extViews.isEmpty()) - { + if (extViews == null) { extViews = Collections.emptyMap(); } http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java index e837626..0731475 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalance.java @@ -40,6 +40,9 @@ import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier; @@ -264,22 +267,26 @@ public class TestAutoRebalance extends ZkStandAloneCMTestBaseWithPropertyServerC } int numberOfPartitions = idealState.getRecord().getListFields().size(); - ClusterDataCache cache = new ClusterDataCache(); - cache.refresh(accessor); - State masterValue = - cache - .getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefId().stringify()) - .getStatesPriorityList().get(0); - int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas()); - String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag(); + String stateModelDefName = idealState.getStateModelDefId().stringify(); + StateModelDefinition stateModelDef = + accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName)); + State masterValue = stateModelDef.getStatesPriorityList().get(0); + int replicas = Integer.parseInt(idealState.getReplicas()); + + String instanceGroupTag = idealState.getInstanceGroupTag(); + int instances = 0; - for (String liveInstanceName : cache.getLiveInstances().keySet()) { - if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) { + Map<String, LiveInstance> liveInstanceMap = + accessor.getChildValuesMap(keyBuilder.liveInstances()); + Map<String, InstanceConfig> instanceConfigMap = + accessor.getChildValuesMap(keyBuilder.instanceConfigs()); + for (String liveInstanceName : liveInstanceMap.keySet()) { + if (instanceConfigMap.get(liveInstanceName).containsTag(instanceGroupTag)) { instances++; } } if (instances == 0) { - instances = cache.getLiveInstances().size(); + instances = liveInstanceMap.size(); } ExternalView ev = accessor.getProperty(keyBuilder.externalView(_resourceName)); if (ev == null) { http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java index 2446b67..19535ff 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalancePartitionLimit.java @@ -31,13 +31,15 @@ import org.apache.helix.TestHelper.StartCMResult; import org.apache.helix.ZNRecord; import org.apache.helix.api.State; import org.apache.helix.controller.HelixControllerMain; -import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.ClusterSetup; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier; @@ -50,6 +52,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP private static final Logger LOG = Logger.getLogger(TestAutoRebalancePartitionLimit.class .getName()); + @Override @BeforeClass public void beforeClass() throws Exception { // Logger.getRootLogger().setLevel(Level.INFO); @@ -122,7 +125,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP // kill 1 node String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 0); _startCMResultMap.get(instanceName)._manager.disconnect(); - Thread.currentThread().sleep(1000); + Thread.sleep(1000); _startCMResultMap.get(instanceName)._thread.interrupt(); // verifyBalanceExternalView(); @@ -137,7 +140,7 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + 1); _startCMResultMap.get(instanceName)._manager.disconnect(); - Thread.currentThread().sleep(1000); + Thread.sleep(1000); _startCMResultMap.get(instanceName)._thread.interrupt(); // verifyBalanceExternalView(); @@ -223,22 +226,20 @@ public class TestAutoRebalancePartitionLimit extends ZkStandAloneCMTestBaseWithP @Override public boolean verify() { HelixDataAccessor accessor = - new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor(_client)); + new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client)); Builder keyBuilder = accessor.keyBuilder(); - int numberOfPartitions = - accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields() - .size(); - ClusterDataCache cache = new ClusterDataCache(); - cache.refresh(accessor); - State masterValue = - cache - .getStateModelDef(cache.getIdealState(_resourceName).getStateModelDefId().stringify()) - .getStatesPriorityList().get(0); - int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas()); + IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName)); + int numberOfPartitions = idealState.getRecord().getListFields().size(); + String stateModelDefName = idealState.getStateModelDefId().stringify(); + StateModelDefinition stateModelDef = + accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName)); + State masterValue = stateModelDef.getStatesPriorityList().get(0); + Map<String, LiveInstance> liveInstanceMap = + accessor.getChildValuesMap(keyBuilder.liveInstances()); + int replicas = Integer.parseInt(idealState.getReplicas()); return verifyBalanceExternalView(accessor.getProperty(keyBuilder.externalView(_resourceName)) - .getRecord(), numberOfPartitions, masterValue.toString(), replicas, cache - .getLiveInstances().size(), cache.getIdealState(_resourceName) - .getMaxPartitionsPerInstance()); + .getRecord(), numberOfPartitions, masterValue.toString(), replicas, + liveInstanceMap.size(), idealState.getMaxPartitionsPerInstance()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java index 833f85e..ede4e12 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessage.java @@ -226,14 +226,17 @@ public class TestBatchMessage extends ZkIntegrationTestBase { idealState.setBatchMessageMode(true); accessor.setProperty(keyBuilder.idealState("TestDB0"), idealState); + final String hostToFail = "localhost_12921"; + final String partitionToFail = "TestDB0_4"; + TestHelper .startController(clusterName, "controller_0", ZK_ADDR, HelixControllerMain.STANDALONE); for (int i = 0; i < n; i++) { String instanceName = "localhost_" + (12918 + i); - if (i == 1) { + if (instanceName.equals(hostToFail)) { Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>(); - errPartitions.put("SLAVE-MASTER", TestHelper.setOf("TestDB0_4")); + errPartitions.put("SLAVE-MASTER", TestHelper.setOf(partitionToFail)); participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, new ErrTransition(errPartitions)); @@ -245,14 +248,14 @@ public class TestBatchMessage extends ZkIntegrationTestBase { Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>(); errStates.put("TestDB0", new HashMap<String, String>()); - errStates.get("TestDB0").put("TestDB0_4", "localhost_12919"); + errStates.get("TestDB0").put(partitionToFail, hostToFail); boolean result = ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier( ZK_ADDR, clusterName, errStates)); Assert.assertTrue(result); Map<String, Set<String>> errorStateMap = new HashMap<String, Set<String>>(); - errorStateMap.put("TestDB0_4", TestHelper.setOf("localhost_12919")); + errorStateMap.put(partitionToFail, TestHelper.setOf(hostToFail)); // verify "TestDB0_4", "localhost_12919" is in ERROR state TestHelper.verifyState(clusterName, ZK_ADDR, errorStateMap, "ERROR"); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/0b67257a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java index 90c53d6..a5001ed 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestCustomizedIdealStateRebalancer.java @@ -35,7 +35,6 @@ import org.apache.helix.api.State; import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext; import org.apache.helix.controller.rebalancer.context.Rebalancer; import org.apache.helix.controller.rebalancer.context.RebalancerConfig; -import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.ResourceCurrentState; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; @@ -44,6 +43,8 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.IdealStateProperty; import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.tools.ClusterStateVerifier; @@ -142,26 +143,26 @@ public class TestCustomizedIdealStateRebalancer extends HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, new ZkBaseDataAccessor<ZNRecord>(_client)); Builder keyBuilder = accessor.keyBuilder(); - int numberOfPartitions = - accessor.getProperty(keyBuilder.idealState(_resourceName)).getRecord().getListFields() - .size(); - ClusterDataCache cache = new ClusterDataCache(); - cache.refresh(accessor); - State masterValue = - cache - .getStateModelDef( - cache.getIdealState(_resourceName).getStateModelDefId().stringify()) - .getStatesPriorityList().get(0); - int replicas = Integer.parseInt(cache.getIdealState(_resourceName).getReplicas()); - String instanceGroupTag = cache.getIdealState(_resourceName).getInstanceGroupTag(); + IdealState idealState = accessor.getProperty(keyBuilder.idealState(_resourceName)); + int numberOfPartitions = idealState.getRecord().getListFields().size(); + String stateModelDefName = idealState.getStateModelDefId().stringify(); + StateModelDefinition stateModelDef = + accessor.getProperty(keyBuilder.stateModelDef(stateModelDefName)); + State masterValue = stateModelDef.getStatesPriorityList().get(0); + int replicas = Integer.parseInt(idealState.getReplicas()); + String instanceGroupTag = idealState.getInstanceGroupTag(); int instances = 0; - for (String liveInstanceName : cache.getLiveInstances().keySet()) { - if (cache.getInstanceConfigMap().get(liveInstanceName).containsTag(instanceGroupTag)) { + Map<String, LiveInstance> liveInstanceMap = + accessor.getChildValuesMap(keyBuilder.liveInstances()); + Map<String, InstanceConfig> instanceCfgMap = + accessor.getChildValuesMap(keyBuilder.instanceConfigs()); + for (String liveInstanceName : liveInstanceMap.keySet()) { + if (instanceCfgMap.get(liveInstanceName).containsTag(instanceGroupTag)) { instances++; } } if (instances == 0) { - instances = cache.getLiveInstances().size(); + instances = liveInstanceMap.size(); } ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName)); return verifyBalanceExternalView(externalView.getRecord(), numberOfPartitions,
