[HELIX-280] Full auto rebalancer should check resource tag first, rb=14931
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/486acd48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/486acd48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/486acd48 Branch: refs/heads/master Commit: 486acd48d818961d6bdf085fe978f01739f37f3f Parents: 90faf91 Author: Kanak Biscuitwala <[email protected]> Authored: Tue Oct 29 18:29:32 2013 -0700 Committer: Kanak Biscuitwala <[email protected]> Committed: Tue Oct 29 18:32:06 2013 -0700 ---------------------------------------------------------------------- .../controller/rebalancer/AutoRebalancer.java | 21 +++- .../integration/TestFullAutoNodeTagging.java | 124 +++++++++++++++++++ 2 files changed, 140 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/486acd48/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java index 946dd5e..68f5a5a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AutoRebalancer.java @@ -100,11 +100,22 @@ public class AutoRebalancer implements Rebalancer { } } } - } - if (taggedNodes.size() > 0) { - if (LOG.isInfoEnabled()) { - LOG.info("found the following instances with tag " + currentIdealState.getResourceName() - + " " + taggedLiveNodes); + if (!taggedLiveNodes.isEmpty()) { + // live nodes exist that have this tag + if (LOG.isInfoEnabled()) { + LOG.info("found the following participants with tag " + + currentIdealState.getInstanceGroupTag() + " for " + resource.getResourceName() + + ": " + taggedLiveNodes); + } + } else if (taggedNodes.isEmpty()) { + // no live nodes and no configured nodes have this tag + LOG.warn("Resource " + resource.getResourceName() + " has tag " + + currentIdealState.getInstanceGroupTag() + + " but no configured participants have this tag"); + } else { + // configured nodes have this tag, but no live nodes have this tag + LOG.warn("Resource " + resource.getResourceName() + " has tag " + + currentIdealState.getInstanceGroupTag() + " but no live participants have this tag"); } allNodes = new ArrayList<String>(taggedNodes); liveNodes = new ArrayList<String>(taggedLiveNodes); http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/486acd48/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java index d815c80..e0c8b6f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestFullAutoNodeTagging.java @@ -20,6 +20,7 @@ package org.apache.helix.integration; */ import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Set; @@ -36,6 +37,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; @@ -58,6 +60,65 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase { private static final Logger LOG = Logger.getLogger(TestFullAutoNodeTagging.class); /** + * Ensure that no assignments happen when there are no tagged nodes, but the resource is tagged + */ + @Test + public void testResourceTaggedFirst() throws Exception { + final int NUM_PARTICIPANTS = 10; + final int NUM_PARTITIONS = 4; + final int NUM_REPLICAS = 2; + final String RESOURCE_NAME = "TestDB0"; + final String TAG = "ASSIGNABLE"; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging + true); // do rebalance + + // tag the resource + HelixAdmin helixAdmin = new ZKHelixAdmin(ZK_ADDR); + IdealState idealState = helixAdmin.getResourceIdealState(clusterName, RESOURCE_NAME); + idealState.setInstanceGroupTag(TAG); + helixAdmin.setResourceIdealState(clusterName, RESOURCE_NAME, idealState); + + // start controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + final String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + Thread.sleep(1000); + boolean result = + ClusterStateVerifier.verifyByZkCallback(new EmptyZkVerifier(clusterName, RESOURCE_NAME)); + Assert.assertTrue(result, "External view and current state must be empty"); + + // cleanup + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + participants[i].syncStop(); + } + controller.syncStop(); + } + + /** * Basic test for tagging behavior. 10 participants, of which 4 are tagged. Launch all 10, * checking external view every time a tagged node is started. Then shut down all 10, checking * external view every time a tagged node is killed. @@ -311,4 +372,67 @@ public class TestFullAutoNodeTagging extends ZkUnitTestBase { return _clusterName; } } + + /** + * Ensures that external view and current state are empty + */ + private static class EmptyZkVerifier implements ZkVerifier { + private final String _clusterName; + private final String _resourceName; + private final ZkClient _zkClient; + + /** + * Instantiate the verifier + * @param clusterName the cluster to verify + * @param resourceName the resource to verify + */ + public EmptyZkVerifier(String clusterName, String resourceName) { + _clusterName = clusterName; + _resourceName = resourceName; + _zkClient = ZKClientPool.getZkClient(ZK_ADDR); + } + + @Override + public boolean verify() { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); + HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName)); + + // verify external view empty + for (String partition : externalView.getPartitionSet()) { + Map<String, String> stateMap = externalView.getStateMap(partition); + if (stateMap != null && !stateMap.isEmpty()) { + LOG.error("External view not empty for " + partition); + return false; + } + } + + // verify current state empty + List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances()); + for (String participant : liveParticipants) { + List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant)); + for (String sessionId : sessionIds) { + CurrentState currentState = + accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName)); + Map<String, String> partitionStateMap = currentState.getPartitionStateMap(); + if (partitionStateMap != null && !partitionStateMap.isEmpty()) { + LOG.error("Current state not empty for " + participant); + return false; + } + } + } + return true; + } + + @Override + public ZkClient getZkClient() { + return _zkClient; + } + + @Override + public String getClusterName() { + return _clusterName; + } + } }
