Updated Branches: refs/heads/helix-0.6.1.5-release a97981813 -> fa7597e2d
HELIX-200: helix controller send ERROR->DROPPED transition infinitely, rb=13462 Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/fa7597e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/fa7597e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/fa7597e2 Branch: refs/heads/helix-0.6.1.5-release Commit: fa7597e2dd19cdf56884a085a5a94dd33679cd4a Parents: a979818 Author: zzhang <[email protected]> Authored: Tue Aug 13 13:33:15 2013 -0700 Committer: zzhang <[email protected]> Committed: Tue Aug 13 13:33:15 2013 -0700 ---------------------------------------------------------------------- .../stages/BestPossibleStateCalcStage.java | 95 ++++--- .../helix/tools/ClusterStateVerifier.java | 34 ++- .../cluster-manager-version.properties | 1 + .../integration/TestDropErrorPartition.java | 257 +++++++++++++++++++ 4 files changed, 346 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index aca0e74..f1f9c43 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -32,6 +32,7 @@ import java.util.TreeMap; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerProperties; import org.apache.helix.ZNRecord; import org.apache.helix.HelixConstants.StateModelToken; import org.apache.helix.controller.pipeline.AbstractBaseStage; @@ -49,7 +50,7 @@ import org.apache.log4j.Logger; /** * For partition compute best possible (instance,state) pair based on * IdealState,StateModel,LiveInstance - * + * */ // TODO: refactor this public class BestPossibleStateCalcStage extends AbstractBaseStage @@ -131,7 +132,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage currentStateOutput); } - + for (Partition partition : resource.getPartitions()) { Map<String, String> currentStateMap = @@ -150,20 +151,22 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage stateModelDef, idealStateMap, currentStateMap, - disabledInstancesForPartition); + disabledInstancesForPartition, + manager.getProperties()); } else // both AUTO and AUTO_REBALANCE mode { List<String> instancePreferenceList = getPreferenceList(cache, partition, idealState, stateModelDef); - + bestStateForPartition = computeAutoBestStateForPartition(cache, stateModelDef, instancePreferenceList, currentStateMap, - disabledInstancesForPartition); + disabledInstancesForPartition, + manager.getProperties()); } output.setState(resourceName, partition, bestStateForPartition); } @@ -175,7 +178,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage * Compute best state for resource in AUTO_REBALANCE ideal state mode. the algorithm * will make sure that the master partition are evenly distributed; Also when instances * are added / removed, the amount of diff in master partitions are minimized - * + * * @param cache * @param idealState * @param instancePreferenceList @@ -191,7 +194,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage String topStateValue = stateModelDef.getStatesPriorityList().get(0); Set<String> liveInstances = cache._liveInstanceMap.keySet(); Set<String> taggedInstances = new HashSet<String>(); - + // If there are instances tagged with resource name, use only those instances if(idealState.getInstanceGroupTag() != null) { @@ -275,7 +278,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage /** * Given the current master assignment map and the partitions not hosted, generate an * evenly distributed partition assignment map - * + * * @param masterAssignmentMap * current master assignment map * @param orphanPartitions @@ -342,7 +345,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage /** * Generate full preference list from the master assignment map evenly distribute the * slave partitions mastered on a host to other hosts - * + * * @param masterAssignmentMap * current master assignment map * @param orphanPartitions @@ -387,8 +390,36 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage } /** + * Is participant version support error->dropped transition + */ + private boolean isDropErrorSupported(HelixManagerProperties properties, + ClusterDataCache cache, + String instance) + { + if (properties == null) + { + return false; + } + + LiveInstance liveInstance = cache.getLiveInstances().get(instance); + String participantVersion = null; + if (liveInstance != null) { + participantVersion = liveInstance.getHelixVersion(); + } + + return properties.isFeatureSupported("drop_error_partition", participantVersion); + } + + private boolean isNotError(Map<String, String> currentStateMap, String instance) + { + return currentStateMap == null + || currentStateMap.get(instance) == null + || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()); + } + + /** * compute best state for resource in AUTO ideal state mode - * + * * @param cache * @param stateModelDef * @param instancePreferenceList @@ -401,7 +432,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage StateModelDefinition stateModelDef, List<String> instancePreferenceList, Map<String, String> currentStateMap, - Set<String> disabledInstancesForPartition) + Set<String> disabledInstancesForPartition, + HelixManagerProperties properties) { Map<String, String> instanceStateMap = new HashMap<String, String>(); @@ -411,15 +443,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { for (String instance : currentStateMap.keySet()) { + boolean isDropErrorSupported = isDropErrorSupported(properties, cache, instance); + boolean isNotError = isNotError(currentStateMap, instance); + if ((instancePreferenceList == null || !instancePreferenceList.contains(instance)) && !disabledInstancesForPartition.contains(instance)) { - // if dropped and not disabled, transit to DROPPED - instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); + if (isDropErrorSupported || isNotError) + { + // if dropped and not disabled, transit to DROPPED + instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); + } } - else if ( (currentStateMap.get(instance) == null - || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString())) - && disabledInstancesForPartition.contains(instance)) + else if ( isNotError && disabledInstancesForPartition.contains(instance)) { // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) instanceStateMap.put(instance, stateModelDef.getInitialState()); @@ -470,9 +506,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { String instanceName = instancePreferenceList.get(i); - boolean notInErrorState = currentStateMap == null - || currentStateMap.get(instanceName) == null - || !currentStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString()); + boolean notInErrorState = isNotError(currentStateMap, instanceName); if (liveInstancesMap.containsKey(instanceName) && !assigned[i] && notInErrorState && !disabledInstancesForPartition.contains(instanceName)) @@ -493,7 +527,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage /** * compute best state for resource in CUSTOMIZED ideal state mode - * + * * @param cache * @param stateModelDef * @param idealStateMap @@ -505,7 +539,8 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage StateModelDefinition stateModelDef, Map<String, String> idealStateMap, Map<String, String> currentStateMap, - Set<String> disabledInstancesForPartition) + Set<String> disabledInstancesForPartition, + HelixManagerProperties properties) { Map<String, String> instanceStateMap = new HashMap<String, String>(); @@ -515,15 +550,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage { for (String instance : currentStateMap.keySet()) { + boolean isDropErrorSupported = isDropErrorSupported(properties, cache, instance); + boolean isNotError = isNotError(currentStateMap, instance); + if ((idealStateMap == null || !idealStateMap.containsKey(instance)) && !disabledInstancesForPartition.contains(instance)) { - // if dropped and not disabled, transit to DROPPED - instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); + if (isDropErrorSupported || isNotError) + { + // if dropped and not disabled, transit to DROPPED + instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); + } } - else if ( (currentStateMap.get(instance) == null - || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString())) - && disabledInstancesForPartition.contains(instance)) + else if (isNotError && disabledInstancesForPartition.contains(instance)) { // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) instanceStateMap.put(instance, stateModelDef.getInitialState()); @@ -540,9 +579,7 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage Map<String, LiveInstance> liveInstancesMap = cache.getLiveInstances(); for (String instance : idealStateMap.keySet()) { - boolean notInErrorState = currentStateMap == null - || currentStateMap.get(instance) == null - || !currentStateMap.get(instance).equals(HelixDefinedState.ERROR.toString()); + boolean notInErrorState = isNotError(currentStateMap, instance); if (liveInstancesMap.containsKey(instance) && notInErrorState && !disabledInstancesForPartition.contains(instance)) http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index eaada16..6c55a56 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -41,6 +41,8 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixDefinedState; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; import org.apache.helix.PropertyPathConfig; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; @@ -55,6 +57,7 @@ import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.CurrentStateComputationStage; import org.apache.helix.controller.stages.ResourceComputationStage; import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.model.ExternalView; @@ -179,7 +182,10 @@ public class ClusterStateVerifier new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); - return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates); + return ClusterStateVerifier.verifyBestPossAndExtView(zkAddr, + clusterName, + accessor, + errStates); } catch (Exception e) { @@ -259,7 +265,9 @@ public class ClusterStateVerifier } - static boolean verifyBestPossAndExtView(HelixDataAccessor accessor, + static boolean verifyBestPossAndExtView(String zkAddr, + String clusterName, + HelixDataAccessor accessor, Map<String, Map<String, String>> errStates) { try @@ -292,10 +300,10 @@ public class ClusterStateVerifier idealStates.put(resource, new IdealState(resource)); } } - + // calculate best possible state BestPossibleStateOutput bestPossOutput = - ClusterStateVerifier.calcBestPossState(cache); + ClusterStateVerifier.calcBestPossState(zkAddr, clusterName, cache); Map<String, Map<Partition, Map<String, String>>> bestPossStateMap = bestPossOutput.getStateMap(); // set error states @@ -307,7 +315,7 @@ public class ClusterStateVerifier for (String partitionName : partErrStates.keySet()) { String instanceName = partErrStates.get(partitionName); - + if (!bestPossStateMap.containsKey(resourceName)) { bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>()); } @@ -319,10 +327,10 @@ public class ClusterStateVerifier } } } - + // System.out.println("stateMap: " + bestPossStateMap); - + for (String resourceName : idealStates.keySet()) { ExternalView extView = extViews.get(resourceName); @@ -355,7 +363,7 @@ public class ClusterStateVerifier if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) { insIter.remove(); - } + } } } } @@ -370,7 +378,7 @@ public class ClusterStateVerifier LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size (" + bestPossStateSize + ") for resource: " + resourceName); - + // System.err.println("exterView size (" + extViewSize // + ") is different from bestPossState size (" + bestPossStateSize // + ") for resource: " + resourceName); @@ -395,7 +403,7 @@ public class ClusterStateVerifier { LOG.info("externalView is different from bestPossibleState for partition:" + partition); - + // System.err.println("externalView is different from bestPossibleState for partition: " // + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " + bpInstanceStateMap); return false; @@ -477,16 +485,18 @@ public class ClusterStateVerifier /** * calculate the best possible state note that DROPPED states are not checked since when * kick off the BestPossibleStateCalcStage we are providing an empty current state map - * + * * @param cache * @return * @throws Exception */ - static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception + static BestPossibleStateOutput calcBestPossState(String zkAddr, String clusterName, ClusterDataCache cache) throws Exception { ClusterEvent event = new ClusterEvent("sampleEvent"); event.addAttribute("ClusterDataCache", cache); + HelixManager manager = new ZKHelixManager(clusterName, "verifier", InstanceType.SPECTATOR, zkAddr); + event.addAttribute("helixmanager", manager); ResourceComputationStage rcState = new ResourceComputationStage(); CurrentStateComputationStage csStage = new CurrentStateComputationStage(); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/main/resources/cluster-manager-version.properties ---------------------------------------------------------------------- diff --git a/helix-core/src/main/resources/cluster-manager-version.properties b/helix-core/src/main/resources/cluster-manager-version.properties index 8a6db4c..c715beb 100644 --- a/helix-core/src/main/resources/cluster-manager-version.properties +++ b/helix-core/src/main/resources/cluster-manager-version.properties @@ -19,5 +19,6 @@ clustermanager.version=${project.version} +minimum_supported_version.drop_error_partition=0.6.1 minimum_supported_version.batch_message=0.6.1 minimum_supported_version.participant=0.4 http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/fa7597e2/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java b/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java new file mode 100644 index 0000000..6366c6f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestDropErrorPartition.java @@ -0,0 +1,257 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.TestHelper; +import org.apache.helix.TestHelper.Verifier; +import org.apache.helix.ZNRecord; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.mock.controller.ClusterController; +import org.apache.helix.mock.participant.ErrTransition; +import org.apache.helix.mock.participant.MockParticipant; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestDropErrorPartition extends ZkUnitTestBase +{ + @Test + public void testDropErrorSupported() throws Exception + { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 2; + + System.out.println("START " + clusterName + " at " + + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, + ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 8, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", + true); // do rebalance + + // enable batch message + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + // start participants + MockParticipant[] participants = new MockParticipant[n]; + for (int i = 0; i < n; i++) + { + String instanceName = "localhost_" + (12918 + i); + + if (i == 0) + { + Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>(); + errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_0")); + participants[i] = + new MockParticipant(clusterName, + instanceName, + ZK_ADDR, + new ErrTransition(errPartitions)); + + } else + { + participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + } + participants[i].syncStart(); + } + + ClusterController controller = + new ClusterController(clusterName, "controller_0", ZK_ADDR); + controller.syncStart(); + + Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>(); + errStates.put("TestDB0", new HashMap<String, String>()); + errStates.get("TestDB0").put("TestDB0_0", "localhost_12918"); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName, + errStates)); + + Assert.assertTrue(result); + + // drop resource + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.dropResource(clusterName, "TestDB0"); + + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + + // check dropping partition in ERROR state + PropertyKey key = keyBuilder.externalView("TestDB0"); + ExternalView externalView = accessor.getProperty(key); + Assert.assertNotNull(externalView); + Assert.assertEquals(externalView.getPartitionSet().size(), 0); + + // clean up + // wait for all zk callbacks done + Thread.sleep(1000); + controller.syncStop(); + for (int i = 0; i < n; i++) + { + participants[i].syncStop(); + } + + System.out.println("END " + clusterName + " at " + + new Date(System.currentTimeMillis())); + } + + @Test + public void testDropErrorNotSupported() throws Exception + { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 2; + + System.out.println("START " + clusterName + " at " + + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, + ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 8, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", + true); // do rebalance + + // enable batch message + final ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient)); + final Builder keyBuilder = accessor.keyBuilder(); + + // start participants + MockParticipant[] participants = new MockParticipant[n]; + for (int i = 0; i < n; i++) + { + String instanceName = "localhost_" + (12918 + i); + + if (i == 0) + { + Map<String, Set<String>> errPartitions = new HashMap<String, Set<String>>(); + errPartitions.put("OFFLINE-SLAVE", TestHelper.setOf("TestDB0_0")); + participants[i] = + new MockParticipant(clusterName, + instanceName, + ZK_ADDR, + new ErrTransition(errPartitions)); + + } else + { + participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); + } + participants[i].syncStart(); + + // change version to < 0.6.1 + PropertyKey key = keyBuilder.liveInstance(instanceName); + LiveInstance liveInstance = accessor.getProperty(key); + liveInstance.setHelixVersion("0.5.32"); + accessor.setProperty(key, liveInstance); + } + + ClusterController controller = + new ClusterController(clusterName, "controller_0", ZK_ADDR); + controller.syncStart(); + + Map<String, Map<String, String>> errStates = new HashMap<String, Map<String, String>>(); + errStates.put("TestDB0", new HashMap<String, String>()); + errStates.get("TestDB0").put("TestDB0_0", "localhost_12918"); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName, + errStates)); + + Assert.assertTrue(result); + + // drop resource + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.dropResource(clusterName, "TestDB0"); + + // check not dropping partition in ERROR state + result = TestHelper.verify(new Verifier() + { + + @Override + public boolean verify() throws Exception + { + PropertyKey key = keyBuilder.externalView("TestDB0"); + ExternalView externalView = accessor.getProperty(key); + if (externalView == null) + { + return false; + } + + if (externalView.getPartitionSet().size() != 1) + { + return false; + } + + if (!externalView.getStateMap("TestDB0_0").get("localhost_12918").equals("ERROR")) + { + return false; + } + + return true; + } + }, 3000); + Assert.assertTrue(result); + + // clean up + // wait for all zk callbacks done + Thread.sleep(1000); + controller.syncStop(); + for (int i = 0; i < n; i++) + { + participants[i].syncStop(); + } + + System.out.println("END " + clusterName + " at " + + new Date(System.currentTimeMillis())); + } +}
