[HELIX-633] AutoRebalancer should ignore disabled instance and all partitions on disabled instances should be dropped in FULL_AUTO rebalance mode
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/bc0aa76a Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/bc0aa76a Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/bc0aa76a Branch: refs/heads/helix-0.6.x Commit: bc0aa76a9de6243928e53e1a1d01e7502ff8267c Parents: f5ac8f8 Author: Lei Xia <[email protected]> Authored: Tue May 31 19:17:39 2016 -0700 Committer: Lei Xia <[email protected]> Committed: Mon Sep 12 10:06:33 2016 -0700 ---------------------------------------------------------------------- .../controller/rebalancer/AutoRebalancer.java | 3 + .../util/ConstraintBasedAssignment.java | 22 +-- .../controller/stages/ClusterDataCache.java | 16 +++ .../TestAutoRebalanceWithDisabledInstance.java | 142 +++++++++++++++++++ .../integration/TestStateTransitionTimeout.java | 28 ---- .../integration/ZkStandAloneCMTestBase.java | 2 + .../mock/participant/MockMSStateModel.java | 65 +-------- 7 files changed, 180 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index a8d83a2..e47297f 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -82,6 +82,9 @@ public class AutoRebalancer implements Rebalancer, MappingCalculator { stateCountMap = stateCount(stateModelDef, liveInstance.size(), Integer.parseInt(replicas)); List<String> liveNodes = new ArrayList<String>(liveInstance.keySet()); List<String> allNodes = new ArrayList<String>(clusterData.getInstanceConfigMap().keySet()); + allNodes.removeAll(clusterData.getDisabledInstances()); + liveNodes.retainAll(allNodes); + Map<String, Map<String, String>> currentMapping = currentMapping(currentStateOutput, resourceName, partitions, stateCountMap); http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java index a520803..9366bcf 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java @@ -75,24 +75,26 @@ public class ConstraintBasedAssignment { boolean isResourceEnabled) { Map<String, String> instanceStateMap = new HashMap<String, String>(); - // if the ideal state is deleted, instancePreferenceList will be empty and - // we should drop all resources. if (currentStateMap != null) { for (String instance : currentStateMap.keySet()) { - if ((instancePreferenceList == null || !instancePreferenceList.contains(instance)) - && !disabledInstancesForPartition.contains(instance)) { - // if dropped (whether disabled or not), transit to DROPPED + if (instancePreferenceList == null || !instancePreferenceList.contains(instance)) { + // The partition is dropped from preference list. + // Transit to DROPPED no matter the instance is disabled or not. instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); - } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals( - HelixDefinedState.ERROR.name())) - && (disabledInstancesForPartition.contains(instance) || !isResourceEnabled)) { + } else { // if disabled and not in ERROR state, transit to initial-state (e.g. OFFLINE) - instanceStateMap.put(instance, stateModelDef.getInitialState()); + if (disabledInstancesForPartition.contains(instance) || !isResourceEnabled) { + if (currentStateMap.get(instance) == null || !currentStateMap.get(instance) + .equals(HelixDefinedState.ERROR.name())) { + instanceStateMap.put(instance, stateModelDef.getInitialState()); + } + } } } } - // ideal state is deleted + // if the ideal state is deleted, instancePreferenceList will be empty and + // we should drop all resources. if (instancePreferenceList == null) { return instanceStateMap; } http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index b77ce0d..cb5bda8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -390,6 +390,22 @@ public class ClusterDataCache { return disabledInstancesSet; } + + /** + * This method allows one to fetch the set of nodes that are disabled + * @return + */ + public Set<String> getDisabledInstances() { + Set<String> disabledInstancesSet = new HashSet<String>(); + for (String instance : _instanceConfigMap.keySet()) { + InstanceConfig config = _instanceConfigMap.get(instance); + if (config.getInstanceEnabled() == false) { + disabledInstancesSet.add(instance); + } + } + return disabledInstancesSet; + } + /** * Returns the number of replicas for a given resource. * @param resourceName http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java new file mode 100644 index 0000000..84eca6b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAutoRebalanceWithDisabledInstance.java @@ -0,0 +1,142 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixAdmin; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class TestAutoRebalanceWithDisabledInstance extends ZkStandAloneCMTestBase { + private static String TEST_DB_2 = "TestDB2"; + + @BeforeClass + @Override + public void beforeClass() throws Exception { + super.beforeClass(); + _setupTool.addResourceToCluster(CLUSTER_NAME, TEST_DB_2, _PARTITIONS, STATE_MODEL, + RebalanceMode.FULL_AUTO + ""); + _setupTool.rebalanceResource(CLUSTER_NAME, TEST_DB_2, _replica); + + Thread.sleep(200); + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @Test() + public void testDisableEnableInstanceAutoRebalance() throws Exception { + String disabledInstance = _participants[0].getInstanceName(); + + Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, + disabledInstance); + Assert.assertFalse(assignedPartitions.isEmpty()); + Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, + disabledInstance); + Assert.assertFalse(currentPartitions.isEmpty()); + + // disable instance + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, false); + Thread.sleep(400); + assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertTrue(assignedPartitions.isEmpty()); + currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertTrue(currentPartitions.isEmpty()); + + //enable instance + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, disabledInstance, true); + Thread.sleep(400); + assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertFalse(assignedPartitions.isEmpty()); + currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, disabledInstance); + Assert.assertFalse(currentPartitions.isEmpty()); + } + + @Test() + public void testAddDisabledInstanceAutoRebalance() throws Exception { + // add disabled instance. + String nodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + NODE_NR); + _setupTool.addInstanceToCluster(CLUSTER_NAME, nodeName); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, nodeName); + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, false); + + participant.syncStart(); + + Thread.sleep(400); + Set<String> assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName); + Assert.assertTrue(assignedPartitions.isEmpty()); + Set<String> currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, + nodeName); + Assert.assertTrue(currentPartitions.isEmpty()); + + //enable instance + _setupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, nodeName, true); + Thread.sleep(400); + assignedPartitions = getPartitionsAssignedtoInstance(CLUSTER_NAME, TEST_DB_2, nodeName); + Assert.assertFalse(assignedPartitions.isEmpty()); + currentPartitions = getCurrentPartitionsOnInstance(CLUSTER_NAME, TEST_DB_2, nodeName); + Assert.assertFalse(currentPartitions.isEmpty()); + } + + private Set<String> getPartitionsAssignedtoInstance(String cluster, String dbName, String instance) { + HelixAdmin admin = _setupTool.getClusterManagementTool(); + Set<String> partitionSet = new HashSet<String>(); + IdealState is = admin.getResourceIdealState(cluster, dbName); + for (String partition : is.getRecord().getListFields().keySet()) { + List<String> assignments = is.getRecord().getListField(partition); + for (String ins : assignments) { + if (ins.equals(instance)) { + partitionSet.add(partition); + } + } + } + + return partitionSet; + } + + private Set<String> getCurrentPartitionsOnInstance(String cluster, String dbName, String instance) { + HelixAdmin admin = _setupTool.getClusterManagementTool(); + Set<String> partitionSet = new HashSet<String>(); + + ExternalView ev = admin.getResourceExternalView(cluster, dbName); + for (String partition : ev.getRecord().getMapFields().keySet()) { + Map<String, String> assignments = ev.getRecord().getMapField(partition); + for (String ins : assignments.keySet()) { + if (ins.equals(instance)) { + partitionSet.add(partition); + } + } + } + return partitionSet; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java index 443d484..fb534fd 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStateTransitionTimeout.java @@ -99,14 +99,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { _sleep = sleep; } - @Override - @Transition(to = "SLAVE", from = "OFFLINE") - public void onBecomeSlaveFromOffline(Message message, NotificationContext context) { - LOG.info("Become SLAVE from OFFLINE"); - - } - - @Override @Transition(to = "MASTER", from = "SLAVE") public void onBecomeMasterFromSlave(Message message, NotificationContext context) throws InterruptedException { @@ -117,26 +109,6 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { } @Override - @Transition(to = "SLAVE", from = "MASTER") - public void onBecomeSlaveFromMaster(Message message, NotificationContext context) { - LOG.info("Become SLAVE from MASTER"); - } - - @Override - @Transition(to = "OFFLINE", from = "SLAVE") - public void onBecomeOfflineFromSlave(Message message, NotificationContext context) { - LOG.info("Become OFFLINE from SLAVE"); - - } - - @Override - @Transition(to = "DROPPED", from = "OFFLINE") - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { - LOG.info("Become DROPPED from OFFLINE"); - - } - - @Override public void rollbackOnError(Message message, NotificationContext context, StateTransitionError error) { _error = error; http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java index 5d169d5..f694618 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java @@ -91,6 +91,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase { ClusterStateVerifier .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + result = ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); http://git-wip-us.apache.org/repos/asf/helix/blob/bc0aa76a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java index 61733ba..7d90063 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java +++ b/helix-core/src/test/java/org/apache/helix/mock/participant/MockMSStateModel.java @@ -43,67 +43,12 @@ public class MockMSStateModel extends StateModel { _transition = transition; } - // overwrite default error->dropped transition - @Transition(to = "DROPPED", from = "ERROR") - public void onBecomeDroppedFromError(Message message, NotificationContext context) + @Transition(to = "*", from = "*") + public void generalTransitionHandle(Message message, NotificationContext context) throws InterruptedException { - LOG.info("Become DROPPED from ERROR"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "SLAVE", from = "OFFLINE") - public void onBecomeSlaveFromOffline(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become SLAVE from OFFLINE"); - if (_transition != null) { - _transition.doTransition(message, context); - - } - } - - @Transition(to = "MASTER", from = "SLAVE") - public void onBecomeMasterFromSlave(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become MASTER from SLAVE"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "SLAVE", from = "MASTER") - public void onBecomeSlaveFromMaster(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become SLAVE from MASTER"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "OFFLINE", from = "SLAVE") - public void onBecomeOfflineFromSlave(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become OFFLINE from SLAVE"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "DROPPED", from = "OFFLINE") - public void onBecomeDroppedFromOffline(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become DROPPED from OFFLINE"); - if (_transition != null) { - _transition.doTransition(message, context); - } - } - - @Transition(to = "OFFLINE", from = "ERROR") - public void onBecomeOfflineFromError(Message message, NotificationContext context) - throws InterruptedException { - LOG.info("Become OFFLINE from ERROR"); - // System.err.println("Become OFFLINE from ERROR"); + LOG.info(String + .format("Resource %s partition %s becomes %s from %s", message.getResourceName(), + message.getPartitionName(), message.getToState(), message.getFromState())); if (_transition != null) { _transition.doTransition(message, context); }
