Updated Branches: refs/heads/helix-0.6.2-release c92428023 -> 06ca975d1
[HELIX-345] Speed up the controller pipeline, rb=16407 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/06ca975d Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/06ca975d Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/06ca975d Branch: refs/heads/helix-0.6.2-release Commit: 06ca975d11a2fbea9c91390e7c3419bcd2d6f704 Parents: c924280 Author: Kanak Biscuitwala <[email protected]> Authored: Wed Jan 29 16:43:07 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Wed Jan 29 16:43:07 2014 -0800 ---------------------------------------------------------------------- .../controller/GenericHelixController.java | 40 +++++ .../controller/stages/ClusterDataCache.java | 171 +++++++++++++++++-- .../controller/stages/ReadClusterDataStage.java | 12 +- .../controller/stages/TaskAssignmentStage.java | 5 + .../src/test/java/org/apache/helix/Mocks.java | 2 +- .../stages/TestRebalancePipeline.java | 21 +-- .../TestReelectedPipelineCorrectness.java | 151 ++++++++++++++++ 7 files changed, 370 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 7e28399..6db82fc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -46,6 +46,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.controller.pipeline.Pipeline; import org.apache.helix.controller.pipeline.PipelineRegistry; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; +import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.ClusterEvent; import org.apache.helix.controller.stages.ClusterEventBlockingQueue; import org.apache.helix.controller.stages.CompatibilityCheckStage; @@ -117,6 +118,11 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC int _timerPeriod = Integer.MAX_VALUE; /** + * A cache maintained across pipelines + */ + private ClusterDataCache _cache; + + /** * Default constructor that creates a default pipeline registry. This is sufficient in * most cases, but if there is a some thing specific needed use another constructor * where in you can pass a pipeline registry @@ -134,6 +140,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC @Override public void run() { + _cache.requireFullRefresh(); NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); ClusterEvent event = new ClusterEvent("periodicalRebalance"); @@ -237,6 +244,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC _eventQueue = new ClusterEventBlockingQueue(); _eventThread = new ClusterEventProcessor(); _eventThread.start(); + _cache = new ClusterDataCache(); } /** @@ -282,6 +290,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC } } + // add the cache + event.addAttribute("ClusterDataCache", _cache); + List<Pipeline> pipelines = _registry.getPipelinesForEvent(event.getName()); if (pipelines == null || pipelines.size() == 0) { logger.info("No pipeline to run for event:" + event.getName()); @@ -324,6 +335,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) { logger.info("START: GenericClusterController.onStateChange()"); + if (changeContext == null || changeContext.getType() != Type.CALLBACK) { + _cache.requireFullRefresh(); + } ClusterEvent event = new ClusterEvent("currentStateChange"); event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("instanceName", instanceName); @@ -347,6 +361,9 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC public void onMessage(String instanceName, List<Message> messages, NotificationContext changeContext) { logger.info("START: GenericClusterController.onMessage()"); + if (changeContext == null || changeContext.getType() != Type.CALLBACK) { + _cache.requireFullRefresh(); + } ClusterEvent event = new ClusterEvent("messageChange"); event.addAttribute("helixmanager", changeContext.getManager()); @@ -366,10 +383,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC public void onLiveInstanceChange(List<LiveInstance> liveInstances, NotificationContext changeContext) { logger.info("START: Generic GenericClusterController.onLiveInstanceChange()"); + if (changeContext == null || changeContext.getType() != Type.CALLBACK) { + _cache.requireFullRefresh(); + } if (liveInstances == null) { liveInstances = Collections.emptyList(); } + _cache.setLiveInstances(liveInstances); + // Go though the live instance list and make sure that we are observing them // accordingly. The action is done regardless of the paused flag. if (changeContext.getType() == NotificationContext.Type.INIT @@ -409,6 +431,14 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC @Override public void onIdealStateChange(List<IdealState> idealStates, NotificationContext changeContext) { logger.info("START: Generic GenericClusterController.onIdealStateChange()"); + if (changeContext == null || changeContext.getType() != Type.CALLBACK) { + _cache.requireFullRefresh(); + } + + if (idealStates == null) { + idealStates = Collections.emptyList(); + } + _cache.setIdealStates(idealStates); ClusterEvent event = new ClusterEvent("idealStateChange"); event.addAttribute("helixmanager", changeContext.getManager()); event.addAttribute("changeContext", changeContext); @@ -425,6 +455,15 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC @Override public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) { logger.info("START: GenericClusterController.onConfigChange()"); + if (changeContext == null || changeContext.getType() != Type.CALLBACK) { + _cache.requireFullRefresh(); + } + + if (configs == null) { + configs = Collections.emptyList(); + } + _cache.setInstanceConfigs(configs); + ClusterEvent event = new ClusterEvent("configChange"); event.addAttribute("changeContext", changeContext); event.addAttribute("helixmanager", changeContext.getManager()); @@ -436,6 +475,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC @Override public void onControllerChange(NotificationContext changeContext) { logger.info("START: GenericClusterController.onControllerChange()"); + _cache.requireFullRefresh(); if (changeContext != null && changeContext.getType() == Type.FINALIZE) { logger.info("GenericClusterController.onControllerChange() FINALIZE"); return; http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 5c0a94a..5d38151 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -22,12 +22,15 @@ package org.apache.helix.controller.stages; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.helix.HelixConstants.StateModelToken; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; @@ -39,7 +42,9 @@ import org.apache.helix.model.Message; import org.apache.helix.model.StateModelDefinition; import org.apache.log4j.Logger; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; /** * Reads the data from the cluster using data accessor. This output ClusterData which @@ -58,6 +63,11 @@ public class ClusterDataCache { Map<String, Map<String, Message>> _messageMap; Map<String, Map<String, String>> _idealStateRuleMap; + // maintain a cache of participant messages across pipeline runs + Map<String, Map<String, Message>> _messageCache = Maps.newHashMap(); + + boolean _init = true; + // Map<String, Map<String, HealthStat>> _healthStatMap; // private HealthStat _globalStats; // DON'T THINK I WILL USE THIS ANYMORE // private PersistentStats _persistentStats; @@ -72,38 +82,110 @@ public class ClusterDataCache { * @param accessor * @return */ - public boolean refresh(HelixDataAccessor accessor) { + public synchronized boolean refresh(HelixDataAccessor accessor) { + LOG.info("START: ClusterDataCache.refresh()"); + long startTime = System.currentTimeMillis(); + Builder keyBuilder = accessor.keyBuilder(); - _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates()); - _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); - for (LiveInstance instance : _liveInstanceMap.values()) { - LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); + if (_init) { + _idealStateMap = accessor.getChildValuesMap(keyBuilder.idealStates()); + _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); + _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); + } + + if (LOG.isTraceEnabled()) { + for (LiveInstance instance : _liveInstanceMap.values()) { + LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); + } } _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs()); - _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints()); Map<String, Map<String, Message>> msgMap = new HashMap<String, Map<String, Message>>(); + List<PropertyKey> newMessageKeys = Lists.newLinkedList(); + long purgeSum = 0; for (String instanceName : _liveInstanceMap.keySet()) { - Map<String, Message> map = accessor.getChildValuesMap(keyBuilder.messages(instanceName)); - msgMap.put(instanceName, map); + // get the cache + Map<String, Message> cachedMap = _messageCache.get(instanceName); + if (cachedMap == null) { + cachedMap = Maps.newHashMap(); + _messageCache.put(instanceName, cachedMap); + } + msgMap.put(instanceName, cachedMap); + + // get the current names + Set<String> messageNames = + Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName))); + + long purgeStart = System.currentTimeMillis(); + // clear stale names + Iterator<String> cachedNamesIter = cachedMap.keySet().iterator(); + while (cachedNamesIter.hasNext()) { + String messageName = cachedNamesIter.next(); + if (!messageNames.contains(messageName)) { + cachedNamesIter.remove(); + } + } + long purgeEnd = System.currentTimeMillis(); + purgeSum += purgeEnd - purgeStart; + + // get the keys for the new messages + for (String messageName : messageNames) { + if (!cachedMap.containsKey(messageName)) { + newMessageKeys.add(keyBuilder.message(instanceName, messageName)); + } + } + } + + // get the new messages + if (newMessageKeys.size() > 0) { + List<Message> newMessages = accessor.getProperty(newMessageKeys); + for (Message message : newMessages) { + if (message != null) { + Map<String, Message> cachedMap = _messageCache.get(message.getTgtName()); + cachedMap.put(message.getId(), message); + } + } } _messageMap = Collections.unmodifiableMap(msgMap); + LOG.debug("Purge took: " + purgeSum); + List<PropertyKey> currentStateKeys = Lists.newLinkedList(); Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<String, Map<String, Map<String, CurrentState>>>(); for (String instanceName : _liveInstanceMap.keySet()) { LiveInstance liveInstance = _liveInstanceMap.get(instanceName); String sessionId = liveInstance.getSessionId(); - if (!allCurStateMap.containsKey(instanceName)) { - allCurStateMap.put(instanceName, new HashMap<String, Map<String, CurrentState>>()); + List<String> currentStateNames = + accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId)); + for (String currentStateName : currentStateNames) { + currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName)); + } + + // ensure an empty current state map for all live instances and sessions + Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(instanceName); + if (instanceCurStateMap == null) { + instanceCurStateMap = Maps.newHashMap(); + allCurStateMap.put(instanceName, instanceCurStateMap); + } + Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId); + if (sessionCurStateMap == null) { + sessionCurStateMap = Maps.newHashMap(); + instanceCurStateMap.put(sessionId, sessionCurStateMap); + } + } + List<CurrentState> currentStates = accessor.getProperty(currentStateKeys); + Iterator<PropertyKey> csKeyIter = currentStateKeys.iterator(); + for (CurrentState currentState : currentStates) { + PropertyKey key = csKeyIter.next(); + String[] params = key.getParams(); + if (currentState != null && params.length >= 4) { + Map<String, Map<String, CurrentState>> instanceCurStateMap = allCurStateMap.get(params[1]); + Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(params[2]); + sessionCurStateMap.put(params[3], currentState); } - Map<String, Map<String, CurrentState>> curStateMap = allCurStateMap.get(instanceName); - Map<String, CurrentState> map = - accessor.getChildValuesMap(keyBuilder.currentStates(instanceName, sessionId)); - curStateMap.put(sessionId, map); } for (String instance : allCurStateMap.keySet()) { @@ -130,6 +212,18 @@ public class ClusterDataCache { } } + long endTime = System.currentTimeMillis(); + LOG.info("END: ClusterDataCache.refresh(), took " + (endTime - startTime) + " ms"); + + if (LOG.isDebugEnabled()) { + int numPaths = + _liveInstanceMap.size() + _idealStateMap.size() + _stateModelDefMap.size() + + _instanceConfigMap.size() + _constraintMap.size() + newMessageKeys.size() + + currentStateKeys.size(); + LOG.debug("Paths read: " + numPaths); + } + + _init = false; return true; } @@ -141,6 +235,14 @@ public class ClusterDataCache { return _idealStateMap; } + public synchronized void setIdealStates(List<IdealState> idealStates) { + Map<String, IdealState> idealStateMap = Maps.newHashMap(); + for (IdealState idealState : idealStates) { + idealStateMap.put(idealState.getId(), idealState); + } + _idealStateMap = idealStateMap; + } + public Map<String, Map<String, String>> getIdealStateRules() { return _idealStateRuleMap; } @@ -153,6 +255,14 @@ public class ClusterDataCache { return _liveInstanceMap; } + public synchronized void setLiveInstances(List<LiveInstance> liveInstances) { + Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap(); + for (LiveInstance liveInstance : liveInstances) { + liveInstanceMap.put(liveInstance.getId(), liveInstance); + } + _liveInstanceMap = liveInstanceMap; + } + /** * Provides the current state of the node for a given session id, * the sessionid can be got from LiveInstance @@ -161,6 +271,10 @@ public class ClusterDataCache { * @return */ public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) { + if (!_currentStateMap.containsKey(instanceName) + || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) { + return Collections.emptyMap(); + } return _currentStateMap.get(instanceName).get(clientSessionId); } @@ -178,6 +292,20 @@ public class ClusterDataCache { } } + public void cacheMessages(List<Message> messages) { + for (Message message : messages) { + String instanceName = message.getTgtName(); + Map<String, Message> instMsgMap = null; + if (_messageCache.containsKey(instanceName)) { + instMsgMap = _messageCache.get(instanceName); + } else { + instMsgMap = Maps.newHashMap(); + _messageCache.put(instanceName, instMsgMap); + } + instMsgMap.put(message.getId(), message); + } + } + // public HealthStat getGlobalStats() // { // return _globalStats; @@ -236,6 +364,14 @@ public class ClusterDataCache { return _instanceConfigMap; } + public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) { + Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap(); + for (InstanceConfig instanceConfig : instanceConfigs) { + instanceConfigMap.put(instanceConfig.getId(), instanceConfig); + } + _instanceConfigMap = instanceConfigMap; + } + /** * Some partitions might be disabled on specific nodes. * This method allows one to fetch the set of nodes where a given partition is disabled @@ -295,6 +431,13 @@ public class ClusterDataCache { } /** + * Indicate that a full read should be done on the next refresh + */ + public synchronized void requireFullRefresh() { + _init = true; + } + + /** * toString method to print the entire cluster state */ @Override http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java index f077d29..0a7414a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java @@ -29,11 +29,8 @@ import org.apache.log4j.Logger; public class ReadClusterDataStage extends AbstractBaseStage { private static final Logger logger = Logger.getLogger(ReadClusterDataStage.class.getName()); - ClusterDataCache _cache; - public ReadClusterDataStage() { - _cache = new ClusterDataCache(); - } + private ClusterDataCache _cache = null; @Override public void process(ClusterEvent event) throws Exception { @@ -44,6 +41,13 @@ public class ReadClusterDataStage extends AbstractBaseStage { if (manager == null) { throw new StageException("HelixManager attribute value is null"); } + + ClusterDataCache cache = event.getAttribute("ClusterDataCache"); + if (cache == null && _cache == null) { + cache = new ClusterDataCache(); + } + _cache = cache; + HelixDataAccessor dataAccessor = manager.getHelixDataAccessor(); _cache.refresh(dataAccessor); http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java index 192a645..5772385 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java @@ -74,6 +74,11 @@ public class TaskAssignmentStage extends AbstractBaseStage { manager.getProperties()); sendMessages(dataAccessor, outputMessages); + long cacheStart = System.currentTimeMillis(); + cache.cacheMessages(outputMessages); + long cacheEnd = System.currentTimeMillis(); + logger.debug("Caching messages took " + (cacheEnd - cacheStart) + " ms"); + long endTime = System.currentTimeMillis(); logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms"); http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/test/java/org/apache/helix/Mocks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java index df893bb..9e2452b 100644 --- a/helix-core/src/test/java/org/apache/helix/Mocks.java +++ b/helix-core/src/test/java/org/apache/helix/Mocks.java @@ -603,7 +603,7 @@ public class Mocks { String[] keySplit = key.split("\\/"); String[] pathSplit = path.split("\\/"); if (keySplit.length > pathSplit.length) { - child.add(keySplit[pathSplit.length + 1]); + child.add(keySplit[pathSplit.length]); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index 1a11be3..452a683 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -25,30 +25,20 @@ import java.util.List; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey.Builder; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.ZkUnitTestBase; -import org.apache.helix.PropertyKey.Builder; import org.apache.helix.controller.pipeline.Pipeline; -import org.apache.helix.controller.stages.AttributeName; -import org.apache.helix.controller.stages.BestPossibleStateCalcStage; -import org.apache.helix.controller.stages.ClusterEvent; -import org.apache.helix.controller.stages.CurrentStateComputationStage; -import org.apache.helix.controller.stages.MessageGenerationPhase; -import org.apache.helix.controller.stages.MessageSelectionStage; -import org.apache.helix.controller.stages.MessageSelectionStageOutput; -import org.apache.helix.controller.stages.MessageThrottleStage; -import org.apache.helix.controller.stages.ReadClusterDataStage; -import org.apache.helix.controller.stages.ResourceComputationStage; -import org.apache.helix.controller.stages.TaskAssignmentStage; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.IdealState; import org.apache.helix.model.Message; -import org.apache.helix.model.Partition; import org.apache.helix.model.Message.Attributes; +import org.apache.helix.model.Partition; import org.apache.log4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; @@ -217,6 +207,9 @@ public class TestRebalancePipeline extends ZkUnitTestBase { ClusterEvent event = new ClusterEvent("testEvent"); event.addAttribute("helixmanager", manager); + ClusterDataCache cache = new ClusterDataCache(); + event.addAttribute("ClusterDataCache", cache); + final String resourceName = "testResource_pending"; String[] resourceGroups = new String[] { resourceName @@ -267,6 +260,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase { // message, make sure controller should not send O->DROPPEDN until O->S is done HelixAdmin admin = new ZKHelixAdmin(_gZkClient); admin.dropResource(clusterName, resourceName); + List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates()); + cache.setIdealStates(idealStates); runPipeline(event, dataRefresh); runPipeline(event, rebalancePipeline); http://git-wip-us.apache.org/repos/asf/helix/blob/06ca975d/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java new file mode 100644 index 0000000..78927f9 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestReelectedPipelineCorrectness.java @@ -0,0 +1,151 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Date; + +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.integration.manager.ClusterDistributedController; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * The controller pipeline will only update ideal states, live instances, and instance configs + * when the change. However, if a controller loses leadership and subsequently regains it, we need + * to ensure that the controller can verify its cache. That's what this test is for. + */ +public class TestReelectedPipelineCorrectness extends ZkUnitTestBase { + @Test + public void testReelection() throws Exception { + final int NUM_CONTROLLERS = 2; + final int NUM_PARTICIPANTS = 4; + final int NUM_PARTITIONS = 8; + final int NUM_REPLICAS = 2; + + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + + // Set up cluster + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + NUM_PARTITIONS, // partitions per resource + NUM_PARTICIPANTS, // number of nodes + NUM_REPLICAS, // replicas + "MasterSlave", RebalanceMode.FULL_AUTO, true); // do rebalance + + // configure distributed controllers + String controllerCluster = clusterName + "_controllers"; + setupTool.addCluster(controllerCluster, true); + for (int i = 0; i < NUM_CONTROLLERS; i++) { + setupTool.addInstanceToCluster(controllerCluster, "controller_" + i); + } + setupTool.activateCluster(clusterName, controllerCluster, true); + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS]; + for (int i = 0; i < NUM_PARTICIPANTS; i++) { + final String instanceName = "localhost_" + (12918 + i); + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + // start controllers + ClusterDistributedController[] controllers = new ClusterDistributedController[NUM_CONTROLLERS]; + for (int i = 0; i < NUM_CONTROLLERS; i++) { + controllers[i] = + new ClusterDistributedController(ZK_ADDR, controllerCluster, "controller_" + i); + controllers[i].syncStart(); + } + Thread.sleep(1000); + + // Ensure a balanced cluster + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Disable the leader, resulting in a leader election + HelixDataAccessor accessor = participants[0].getHelixDataAccessor(); + LiveInstance leader = accessor.getProperty(accessor.keyBuilder().controllerLeader()); + String leaderId = leader.getId(); + String standbyId = (leaderId.equals("controller_0")) ? "controller_1" : "controller_0"; + HelixAdmin admin = setupTool.getClusterManagementTool(); + admin.enableInstance(controllerCluster, leaderId, false); + + // Stop a participant to make sure that the leader election worked + Thread.sleep(500); + participants[0].syncStop(); + Thread.sleep(500); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // Disable the original standby (leaving 0 active controllers) and kill another participant + admin.enableInstance(controllerCluster, standbyId, false); + Thread.sleep(500); + participants[1].syncStop(); + + // Also change the ideal state + IdealState idealState = admin.getResourceIdealState(clusterName, "TestDB0"); + idealState.setMaxPartitionsPerInstance(1); + admin.setResourceIdealState(clusterName, "TestDB0", idealState); + Thread.sleep(500); + + // Also disable an instance in the main cluster + admin.enableInstance(clusterName, "localhost_12920", false); + + // Re-enable the original leader + admin.enableInstance(controllerCluster, leaderId, true); + + // Now check that both the ideal state and the live instances are adhered to by the rebalance + Thread.sleep(500); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // cleanup + for (int i = 0; i < NUM_CONTROLLERS; i++) { + controllers[i].syncStop(); + } + for (int i = 2; i < NUM_PARTICIPANTS; i++) { + participants[i].syncStop(); + } + + System.out.println("STOP " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +}
