Repository: helix Updated Branches: refs/heads/master 03e8580ec -> 7ba8c5197
Batch API Implementation This rb include 1. The batch API define and batch API implementation and make old API backward compatible. 2. The server changes the logic to determine which are the disabled instances for both batch API or old API. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/31cec911 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/31cec911 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/31cec911 Branch: refs/heads/master Commit: 31cec9114b77ee37f79da137ed914c7b330780f3 Parents: 3f34a8e Author: Junkai Xue <[email protected]> Authored: Wed Oct 11 18:18:20 2017 -0700 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:30:10 2018 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/helix/HelixAdmin.java | 29 ++ .../controller/GenericHelixController.java | 1 + .../rebalancer/DelayedAutoRebalancer.java | 17 +- .../rebalancer/topology/Topology.java | 6 +- .../controller/stages/ClusterDataCache.java | 7 +- .../controller/stages/ReadClusterDataStage.java | 5 +- .../manager/zk/ControllerManagerHelper.java | 1 + .../apache/helix/manager/zk/ZKHelixAdmin.java | 307 ++++++++++++------- .../org/apache/helix/model/ClusterConfig.java | 18 +- .../org/apache/helix/model/InstanceConfig.java | 18 ++ .../org/apache/helix/tools/ClusterSetup.java | 9 +- .../integration/TestBatchEnableInstances.java | 110 +++++++ .../integration/TestZkCallbackHandlerLeak.java | 16 +- .../manager/TestConsecutiveZkSessionExpiry.java | 2 +- .../TestDistributedControllerManager.java | 2 +- .../manager/TestZkCallbackHandlerLeak.java | 10 +- .../org/apache/helix/mock/MockHelixAdmin.java | 14 + .../helix/task/TaskSynchronizedTestBase.java | 1 + .../rest/server/resources/InstanceAccessor.java | 6 +- 19 files changed, 447 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/HelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java index 7438ee9..652ab7a 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java @@ -217,6 +217,15 @@ public interface HelixAdmin { void enableInstance(String clusterName, String instanceName, boolean enabled); /** + * Batch enable/disable instances in a cluster + * By default, all the instances are enabled + * @param clusterName + * @param instances + * @param enabled + */ + void enableInstance(String clusterName, List<String> instances, boolean enabled); + + /** * Disable or enable a resource * @param clusterName * @param resourceName @@ -479,6 +488,26 @@ public interface HelixAdmin { */ void enableBatchMessageMode(String clusterName, String resourceName, boolean enabled); + + + /** + * Get batch disabled instance map (disabled instance -> disabled time) in a cluster. It will + * include disabled instances and instances in disabled zones + * @param clusterName + * @return + */ + Map<String, String> getBatchDisabledInstances(String clusterName); + + /** + * Get list of instances by domain for a cluster + * + * Example : domain could be "helixZoneId=1,rackId=3". All the instances domain contains these + * two domains will be selected. + * @param clusterName + * @return + */ + List<String> getInstancesByDomain(String clusterName, String domain); + /** * Release resources */ http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index 2f7037b..c182ada 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -268,6 +268,7 @@ public class GenericHelixController implements IdealStateChangeListener, registry.register(ClusterEventType.CurrentStateChange, dataRefresh, rebalancePipeline, externalViewPipeline); registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, rebalancePipeline); registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, rebalancePipeline); + registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, rebalancePipeline); registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, liveInstancePipeline, rebalancePipeline, externalViewPipeline); registry.register(ClusterEventType.MessageChange, dataRefresh, rebalancePipeline); http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java index 2dcad52..a44aa11 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java @@ -210,7 +210,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { long currentTime = System.currentTimeMillis(); for (String ins : offlineOrDisabledInstances) { long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay, - instanceConfigMap.get(ins)); + instanceConfigMap.get(ins), clusterConfig); InstanceConfig instanceConfig = instanceConfigMap.get(ins); if (inactiveTime > currentTime && instanceConfig != null && instanceConfig .isDelayRebalanceEnabled()) { @@ -237,7 +237,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { // calculate the closest future rebalance time for (String ins : offlineOrDisabledInstances) { long inactiveTime = getInactiveTime(ins, liveNodes, instanceOfflineTimeMap.get(ins), delay, - instanceConfigMap.get(ins)); + instanceConfigMap.get(ins), clusterConfig); if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < nextRebalanceTime) { nextRebalanceTime = inactiveTime; } @@ -265,7 +265,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { * @return */ private long getInactiveTime(String instance, Set<String> liveInstances, Long offlineTime, - long delay, InstanceConfig instanceConfig) { + long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) { long inactiveTime = Long.MAX_VALUE; // check the time instance went offline. @@ -276,8 +276,17 @@ public class DelayedAutoRebalancer extends AbstractRebalancer { } // check the time instance got disabled. - if (!instanceConfig.getInstanceEnabled()) { + if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null + && clusterConfig.getDisabledInstances().containsKey(instance))) { long disabledTime = instanceConfig.getInstanceEnabledTime(); + if (clusterConfig.getDisabledInstances() != null && clusterConfig.getDisabledInstances() + .containsKey(instance)) { + // Update batch disable time + long batchDisableTime = Long.parseLong(clusterConfig.getDisabledInstances().get(instance)); + if (disabledTime == -1 || disabledTime > batchDisableTime) { + disabledTime = batchDisableTime; + } + } if (disabledTime > 0 && disabledTime + delay < inactiveTime) { inactiveTime = disabledTime + delay; } http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 8350bd8..bf00e0e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -229,7 +229,8 @@ public class Topology { String zone = config.getZoneId(); if (zone == null) { // we have the hierarchy style of domain id for instance. - if (config.getInstanceEnabled()) { + if (config.getInstanceEnabled() && (_clusterConfig.getDisabledInstances() == null + || !_clusterConfig.getDisabledInstances().containsKey(ins))) { // if enabled instance missing ZONE_ID information, fails the rebalance. throw new HelixException(String .format("ZONE_ID for instance %s is not set, failed the topology-aware placement!", @@ -274,7 +275,8 @@ public class Topology { } String domain = insConfig.getDomain(); if (domain == null) { - if (insConfig.getInstanceEnabled()) { + if (insConfig.getInstanceEnabled() && (_clusterConfig.getDisabledInstances() == null + || !_clusterConfig.getDisabledInstances().containsKey(ins))) { // if enabled instance missing domain information, fails the rebalance. throw new HelixException(String .format("Domain for instance %s is not set, failed the topology-aware placement!", http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 97e91f6..8999ed7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -701,7 +701,8 @@ public class ClusterDataCache { Set<String> disabledInstancesSet = new HashSet<String>(); for (String instance : _instanceConfigMap.keySet()) { InstanceConfig config = _instanceConfigMap.get(instance); - if (config.getInstanceEnabled() == false + if (config.getInstanceEnabled() == false || (_clusterConfig.getDisabledInstances() != null + && _clusterConfig.getDisabledInstances().containsKey(instance)) || config.getInstanceEnabledForPartition(resource, partition) == false) { disabledInstancesSet.add(instance); } @@ -718,7 +719,9 @@ public class ClusterDataCache { Set<String> disabledInstancesSet = new HashSet<>(); for (String instance : _instanceConfigMap.keySet()) { InstanceConfig config = _instanceConfigMap.get(instance); - if (!config.getInstanceEnabled()) { + if (!config.getInstanceEnabled() + || (_clusterConfig.getDisabledInstances() != null && _clusterConfig.getDisabledInstances() + .containsKey(instance))) { disabledInstancesSet.add(instance); } } http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java index 9313157..9361249 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java @@ -29,6 +29,7 @@ import org.apache.helix.HelixManager; import org.apache.helix.controller.GenericHelixController; import org.apache.helix.controller.pipeline.AbstractBaseStage; import org.apache.helix.controller.pipeline.StageException; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; @@ -60,6 +61,7 @@ public class ReadClusterDataStage extends AbstractBaseStage { HelixDataAccessor dataAccessor = manager.getHelixDataAccessor(); _cache.refresh(dataAccessor); + final ClusterConfig clusterConfig = cache.getClusterConfig(); if (!_cache.isTaskCache()) { final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); @@ -83,7 +85,8 @@ public class ReadClusterDataStage extends AbstractBaseStage { if (liveInstanceMap.containsKey(instanceName)) { liveInstanceSet.add(instanceName); } - if (!config.getInstanceEnabled()) { + if (!config.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null + && clusterConfig.getDisabledInstances().containsKey(instanceName))) { disabledInstanceSet.add(instanceName); } http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java index a29ef78..554b09a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java @@ -80,6 +80,7 @@ public class ControllerManagerHelper { */ _manager.addInstanceConfigChangeListener(controller); _manager.addResourceConfigChangeListener(controller); + _manager.addClusterfigChangeListener(controller); _manager.addLiveInstanceChangeListener(controller); _manager.addIdealStateChangeListener(controller); _manager.addControllerListener(controller); http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index f5897af..c3fa9e9 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -33,11 +33,14 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.exception.ZkNoNodeException; +import org.apache.commons.math.stat.clustering.Cluster; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; @@ -74,7 +77,6 @@ import org.apache.helix.util.RebalanceUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ZKHelixAdmin implements HelixAdmin { public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec"; private final ZkClient _zkClient; @@ -123,19 +125,20 @@ public class ZKHelixAdmin implements HelixAdmin { String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); if (!_zkClient.exists(instanceConfigPath)) { - throw new HelixException("Node " + instanceName + " does not exist in config for cluster " - + clusterName); + throw new HelixException( + "Node " + instanceName + " does not exist in config for cluster " + clusterName); } String instancePath = PropertyPathBuilder.instance(clusterName, instanceName); if (!_zkClient.exists(instancePath)) { - throw new HelixException("Node " + instanceName + " does not exist in instances for cluster " - + clusterName); + throw new HelixException( + "Node " + instanceName + " does not exist in instances for cluster " + clusterName); } String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, instanceName); if (_zkClient.exists(liveInstancePath)) { - throw new HelixException("Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop."); + throw new HelixException( + "Node " + instanceName + " is still alive for cluster " + clusterName + ", can't drop."); } // delete config path @@ -150,8 +153,8 @@ public class ZKHelixAdmin implements HelixAdmin { public InstanceConfig getInstanceConfig(String clusterName, String instanceName) { String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName); if (!_zkClient.exists(instanceConfigPath)) { - throw new HelixException("instance" + instanceName + " does not exist in cluster " - + clusterName); + throw new HelixException( + "instance" + instanceName + " does not exist in cluster " + clusterName); } HelixDataAccessor accessor = @@ -161,7 +164,8 @@ public class ZKHelixAdmin implements HelixAdmin { return accessor.getProperty(keyBuilder.instanceConfig(instanceName)); } - @Override public boolean setInstanceConfig(String clusterName, String instanceName, + @Override + public boolean setInstanceConfig(String clusterName, String instanceName, InstanceConfig newInstanceConfig) { String instanceConfigPath = PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName, HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString(), instanceName); @@ -188,27 +192,22 @@ public class ZKHelixAdmin implements HelixAdmin { @Override public void enableInstance(final String clusterName, final String instanceName, final boolean enabled) { - String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); - - BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); - if (!baseAccessor.exists(path, 0)) { - throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName - + ", instance config does not exist"); - } - - baseAccessor.update(path, new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - if (currentData == null) { - throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName - + ", participant config is null"); - } + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient); + enableSingleInstance(clusterName, instanceName, enabled, baseAccessor); + enableBatchInstances(clusterName, Collections.singletonList(instanceName), enabled, + baseAccessor); + } - InstanceConfig config = new InstanceConfig(currentData); - config.setInstanceEnabled(enabled); - return config.getRecord(); + @Override + public void enableInstance(String clusterName, List<String> instances, + boolean enabled) { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient); + if (enabled) { + for (String instance : instances) { + enableSingleInstance(clusterName, instance, enabled, baseAccessor); } - }, AccessOption.PERSISTENT); + } + enableBatchInstances(clusterName, instances, enabled, baseAccessor); } @Override @@ -221,7 +220,8 @@ public class ZKHelixAdmin implements HelixAdmin { + ", ideal-state does not exist"); } baseAccessor.update(path, new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord currentData) { + @Override + public ZNRecord update(ZNRecord currentData) { if (currentData == null) { throw new HelixException( "Cluster: " + clusterName + ", resource: " + resourceName + ", ideal-state is null"); @@ -260,22 +260,22 @@ public class ZKHelixAdmin implements HelixAdmin { if (idealStateRecord == null) { // throw new HelixException("Cluster: " + clusterName + ", resource: " + resourceName // + ", ideal state does not exist"); - logger.warn("Disable partitions: " + partitionNames + " but Cluster: " + clusterName - + ", resource: " + resourceName - + " does not exists. probably disable it during ERROR->DROPPED transtition"); - + logger.warn( + "Disable partitions: " + partitionNames + " but Cluster: " + clusterName + ", resource: " + + resourceName + + " does not exists. probably disable it during ERROR->DROPPED transtition"); } else { // check partitions exist. warn if not IdealState idealState = new IdealState(idealStateRecord); for (String partitionName : partitionNames) { - if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && idealState - .getPreferenceList(partitionName) == null) - || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED && idealState - .getPreferenceList(partitionName) == null) - || (idealState.getRebalanceMode() == RebalanceMode.TASK && idealState - .getPreferenceList(partitionName) == null) - || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && idealState - .getInstanceStateMap(partitionName) == null)) { + if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO + && idealState.getPreferenceList(partitionName) == null) || ( + idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED + && idealState.getPreferenceList(partitionName) == null) || ( + idealState.getRebalanceMode() == RebalanceMode.TASK + && idealState.getPreferenceList(partitionName) == null) || ( + idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED + && idealState.getInstanceStateMap(partitionName) == null)) { logger.warn("Cluster: " + clusterName + ", resource: " + resourceName + ", partition: " + partitionName + ", partition does not exist in ideal state"); } @@ -285,7 +285,8 @@ public class ZKHelixAdmin implements HelixAdmin { // update participantConfig // could not use ZNRecordUpdater since it doesn't do listField merge/subtract baseAccessor.update(path, new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord currentData) { + @Override + public ZNRecord update(ZNRecord currentData) { if (currentData == null) { throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName + ", participant config is null"); @@ -339,15 +340,17 @@ public class ZKHelixAdmin implements HelixAdmin { // check the instance is alive LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); if (liveInstance == null) { - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because " + instanceName + " is not alive"); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because " + instanceName + " is not alive"); } // check resource group exists IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); if (idealState == null) { - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because " + resourceName + " is not added"); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because " + resourceName + " is not added"); } // check partition exists in resource group @@ -355,14 +358,16 @@ public class ZKHelixAdmin implements HelixAdmin { if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) { Set<String> partitions = new HashSet<String>(idealState.getRecord().getMapFields().keySet()); if (!partitions.containsAll(resetPartitionNames)) { - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because not all " + partitionNames + " exist"); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because not all " + partitionNames + " exist"); } } else { Set<String> partitions = new HashSet<String>(idealState.getRecord().getListFields().keySet()); if (!partitions.containsAll(resetPartitionNames)) { - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because not all " + partitionNames + " exist"); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because not all " + partitionNames + " exist"); } } @@ -372,8 +377,9 @@ public class ZKHelixAdmin implements HelixAdmin { accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); for (String partitionName : resetPartitionNames) { if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) { - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because not all " + partitionNames + " are in ERROR state"); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because not all " + partitionNames + " are in ERROR state"); } } @@ -381,22 +387,23 @@ public class ZKHelixAdmin implements HelixAdmin { String stateModelDef = idealState.getStateModelDefRef(); StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); if (stateModel == null) { - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because " + stateModelDef + " is NOT found"); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because " + stateModelDef + " is NOT found"); } // check there is no pending messages for the partitions exist List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName)); for (Message message : messages) { - if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) - || !sessionId.equals(message.getTgtSessionId()) - || !resourceName.equals(message.getResourceName()) + if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId + .equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) || !resetPartitionNames.contains(message.getPartitionName())) { continue; } - throw new HelixException("Can't reset state for " + resourceName + "/" + partitionNames - + " on " + instanceName + ", because a pending message exists: " + message); + throw new HelixException( + "Can't reset state for " + resourceName + "/" + partitionNames + " on " + instanceName + + ", because a pending message exists: " + message); } String adminName = null; @@ -455,8 +462,8 @@ public class ZKHelixAdmin implements HelixAdmin { for (String partitionName : stateMap.keySet()) { Map<String, String> instanceStateMap = stateMap.get(partitionName); - if (instanceStateMap.containsKey(instanceName) - && instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString())) { + if (instanceStateMap.containsKey(instanceName) && instanceStateMap.get(instanceName) + .equals(HelixDefinedState.ERROR.toString())) { resetPartitionNames.add(partitionName); } } @@ -629,12 +636,13 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public void addResource(String clusterName, String resourceName, IdealState idealstate) { + public void addResource(String clusterName, String resourceName, + IdealState idealstate) { String stateModelRef = idealstate.getStateModelDefRef(); String stateModelDefPath = PropertyPathBuilder.stateModelDef(clusterName, stateModelRef); if (!_zkClient.exists(stateModelDefPath)) { - throw new HelixException("State model " + stateModelRef - + " not found in the cluster STATEMODELDEFS path"); + throw new HelixException( + "State model " + stateModelRef + " not found in the cluster STATEMODELDEFS path"); } String idealStatePath = PropertyPathBuilder.idealState(clusterName); @@ -652,7 +660,6 @@ public class ZKHelixAdmin implements HelixAdmin { String stateModelRef, String rebalancerMode, int bucketSize) { addResource(clusterName, resourceName, partitions, stateModelRef, rebalancerMode, bucketSize, -1); - } @Override @@ -733,7 +740,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public void setResourceIdealState(String clusterName, String resourceName, IdealState idealState) { + public void setResourceIdealState(String clusterName, String resourceName, + IdealState idealState) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); Builder keyBuilder = accessor.keyBuilder(); @@ -765,8 +773,8 @@ public class ZKHelixAdmin implements HelixAdmin { String stateModelPath = stateModelDefPath + "/" + stateModelDef; if (_zkClient.exists(stateModelPath)) { if (recreateIfExists) { - logger.info("Operation.State Model directory exists:" + stateModelPath + - ", remove and recreate."); + logger.info( + "Operation.State Model directory exists:" + stateModelPath + ", remove and recreate."); _zkClient.deleteRecursive(stateModelPath); } else { logger.info("Skip the operation. State Model directory exists:" + stateModelPath); @@ -796,7 +804,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public StateModelDefinition getStateModelDef(String clusterName, String stateModelName) { + public StateModelDefinition getStateModelDef(String clusterName, + String stateModelName) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); Builder keyBuilder = accessor.keyBuilder(); @@ -884,8 +893,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public void rebalance(String clusterName, String resourceName, int replica, String keyPrefix, - String group) { + public void rebalance(String clusterName, String resourceName, int replica, + String keyPrefix, String group) { List<String> instanceNames = new LinkedList<String>(); if (keyPrefix == null || keyPrefix.length() == 0) { keyPrefix = resourceName; @@ -904,7 +913,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public void rebalance(String clusterName, String resourceName, int replica, List<String> instances) { + public void rebalance(String clusterName, String resourceName, int replica, + List<String> instances) { rebalance(clusterName, resourceName, replica, resourceName, instances, ""); } @@ -966,9 +976,9 @@ public class ZKHelixAdmin implements HelixAdmin { } if (idealState.getRebalanceMode() != RebalanceMode.FULL_AUTO && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) { - ZNRecord newIdealState = - DefaultIdealStateCalculator.calculateIdealState(instanceNames, partitions, replica, - keyPrefix, masterStateValue, slaveStateValue); + ZNRecord newIdealState = DefaultIdealStateCalculator + .calculateIdealState(instanceNames, partitions, replica, keyPrefix, masterStateValue, + slaveStateValue); // for now keep mapField in SEMI_AUTO mode and remove listField in CUSTOMIZED mode if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) { @@ -990,8 +1000,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public void addIdealState(String clusterName, String resourceName, String idealStateFile) - throws IOException { + public void addIdealState(String clusterName, String resourceName, + String idealStateFile) throws IOException { ZNRecord idealStateRecord = (ZNRecord) (new ZNRecordSerializer().deserialize(readFile(idealStateFile))); if (idealStateRecord.getId() == null || !idealStateRecord.getId().equals(resourceName)) { @@ -1042,10 +1052,11 @@ public class ZKHelixAdmin implements HelixAdmin { String path = keyBuilder.constraint(constraintType.toString()).getPath(); baseAccessor.update(path, new DataUpdater<ZNRecord>() { - @Override public ZNRecord update(ZNRecord currentData) { - ClusterConstraints constraints = currentData == null ? - new ClusterConstraints(constraintType) : - new ClusterConstraints(currentData); + @Override + public ZNRecord update(ZNRecord currentData) { + ClusterConstraints constraints = currentData == null + ? new ClusterConstraints(constraintType) + : new ClusterConstraints(currentData); constraints.addConstraintItem(constraintId, constraintItem); return constraints.getRecord(); @@ -1076,7 +1087,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public ClusterConstraints getConstraints(String clusterName, ConstraintType constraintType) { + public ClusterConstraints getConstraints(String clusterName, + ConstraintType constraintType) { HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); @@ -1085,16 +1097,18 @@ public class ZKHelixAdmin implements HelixAdmin { } /** - * Takes the existing idealstate as input and computes newIdealState such that - * the partition movement is minimized. The partitions are redistributed among the instances - * provided. + * Takes the existing idealstate as input and computes newIdealState such that the partition + * movement is minimized. The partitions are redistributed among the instances provided. + * * @param clusterName * @param currentIdealState * @param instanceNames + * * @return */ @Override - public void rebalance(String clusterName, IdealState currentIdealState, List<String> instanceNames) { + public void rebalance(String clusterName, IdealState currentIdealState, + List<String> instanceNames) { Set<String> activeInstances = new HashSet<String>(); for (String partition : currentIdealState.getPartitionSet()) { activeInstances.addAll(currentIdealState.getRecord().getListField(partition)); @@ -1109,14 +1123,14 @@ public class ZKHelixAdmin implements HelixAdmin { this.getStateModelDef(clusterName, currentIdealState.getStateModelDefRef()); if (stateModDef == null) { - throw new HelixException("cannot find state model: " - + currentIdealState.getStateModelDefRef()); + throw new HelixException( + "cannot find state model: " + currentIdealState.getStateModelDefRef()); } String[] states = RebalanceUtil.parseStates(clusterName, stateModDef); - ZNRecord newIdealStateRecord = - DefaultIdealStateCalculator.convertToZNRecord(balancedRecord, - currentIdealState.getResourceName(), states[0], states[1]); + ZNRecord newIdealStateRecord = DefaultIdealStateCalculator + .convertToZNRecord(balancedRecord, currentIdealState.getResourceName(), states[0], + states[1]); Set<String> partitionSet = new HashSet<String>(); partitionSet.addAll(newIdealStateRecord.getMapFields().keySet()); partitionSet.addAll(newIdealStateRecord.getListFields().keySet()); @@ -1129,12 +1143,12 @@ public class ZKHelixAdmin implements HelixAdmin { if (partition.equals(originPartitionName)) { continue; } - newIdealStateRecord.getMapFields().put(originPartitionName, - newIdealStateRecord.getMapField(partition)); + newIdealStateRecord.getMapFields() + .put(originPartitionName, newIdealStateRecord.getMapField(partition)); newIdealStateRecord.getMapFields().remove(partition); - newIdealStateRecord.getListFields().put(originPartitionName, - newIdealStateRecord.getListField(partition)); + newIdealStateRecord.getListFields() + .put(originPartitionName, newIdealStateRecord.getListField(partition)); newIdealStateRecord.getListFields().remove(partition); } } @@ -1151,8 +1165,8 @@ public class ZKHelixAdmin implements HelixAdmin { } if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) { - throw new HelixException("cluster " + clusterName + " instance " + instanceName - + " is not setup yet"); + throw new HelixException( + "cluster " + clusterName + " instance " + instanceName + " is not setup yet"); } HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); @@ -1170,8 +1184,8 @@ public class ZKHelixAdmin implements HelixAdmin { } if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) { - throw new HelixException("cluster " + clusterName + " instance " + instanceName - + " is not setup yet"); + throw new HelixException( + "cluster " + clusterName + " instance " + instanceName + " is not setup yet"); } ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); @@ -1189,8 +1203,8 @@ public class ZKHelixAdmin implements HelixAdmin { } if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, InstanceType.PARTICIPANT)) { - throw new HelixException("cluster " + clusterName + " instance " + instanceName - + " is not setup yet"); + throw new HelixException( + "cluster " + clusterName + " instance " + instanceName + " is not setup yet"); } HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); @@ -1214,7 +1228,8 @@ public class ZKHelixAdmin implements HelixAdmin { } @Override - public void enableBatchMessageMode(String clusterName, String resourceName, boolean enabled) { + public void enableBatchMessageMode(String clusterName, String resourceName, + boolean enabled) { // TODO: Change IdealState to ResourceConfig when configs are migrated to ResourceConfig IdealState idealState = getResourceIdealState(clusterName, resourceName); if (idealState == null) { @@ -1226,8 +1241,94 @@ public class ZKHelixAdmin implements HelixAdmin { setResourceIdealState(clusterName, resourceName, idealState); } + private void enableSingleInstance(final String clusterName, final String instanceName, + final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor) { + String path = PropertyPathBuilder.instanceConfig(clusterName, instanceName); + + if (!baseAccessor.exists(path, 0)) { + throw new HelixException("Cluster " + clusterName + ", instance: " + instanceName + + ", instance config does not exist"); + } + + baseAccessor.update(path, new DataUpdater<ZNRecord>() + + { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ", instance: " + instanceName + + ", participant config is null"); + } + + InstanceConfig config = new InstanceConfig(currentData); + config.setInstanceEnabled(enabled); + return config.getRecord(); + } + }, AccessOption.PERSISTENT); + } + + private void enableBatchInstances(final String clusterName, final List<String> instances, + final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor) { + + String path = PropertyPathBuilder.clusterConfig(clusterName); + + if (!baseAccessor.exists(path, 0)) { + throw new HelixException("Cluster " + clusterName + ": cluster config does not exist"); + } + + baseAccessor.update(path, new DataUpdater<ZNRecord>() { + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + throw new HelixException("Cluster: " + clusterName + ": cluster config is null"); + } + + ClusterConfig clusterConfig = new ClusterConfig(currentData); + Map<String, String> disabledInstances = new TreeMap<>(); + if (clusterConfig.getDisabledInstances() != null) { + disabledInstances.putAll(clusterConfig.getDisabledInstances()); + } + + if (enabled) { + disabledInstances.keySet().removeAll(instances); + } else { + for (String disabledInstance : instances) { + if (!disabledInstances.containsKey(disabledInstance)) { + disabledInstances.put(disabledInstance, String.valueOf(System.currentTimeMillis())); + } + } + } + clusterConfig.setDisabledInstances(disabledInstances); + + return clusterConfig.getRecord(); + } + }, AccessOption.PERSISTENT); + } + + @Override + public Map<String, String> getBatchDisabledInstances(String clusterName) { + ConfigAccessor accessor = new ConfigAccessor(_zkClient); + return accessor.getClusterConfig(clusterName).getDisabledInstances(); + } + @Override - public void close() { + public List<String> getInstancesByDomain(String clusterName, String domain) { + List<String> instances = new ArrayList<>(); + String path = PropertyPathBuilder.instanceConfig(clusterName); + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient); + List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0); + for (ZNRecord record : znRecords) { + if (record != null) { + InstanceConfig instanceConfig = new InstanceConfig(record); + if (instanceConfig.isInstanceInDomain(domain)) { + instances.add(instanceConfig.getInstanceName()); + } + } + } + return instances; + } + + @Override public void close() { if (_zkClient != null) { _zkClient.close(); } http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index 6f74728..2a97145 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -60,7 +60,8 @@ public class ClusterConfig extends HelixProperty { MAX_PARTITIONS_PER_INSTANCE, MAX_OFFLINE_INSTANCES_ALLOWED, TARGET_EXTERNALVIEW_ENABLED, - ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE // Controller won't execute load balance state transition if the number of partitons that need recovery exceeds this limitation + ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute load balance state transition if the number of partitons that need recovery exceeds this limitation + DISABLED_INSTANCES } private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40; private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE = 0; // By default, no load balance if any error partition @@ -474,6 +475,21 @@ public class ClusterConfig extends HelixProperty { _record.setIntField(ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(), errorPartitionThreshold); } + /** + * Set the disabled instance list + * @param disabledInstances + */ + public void setDisabledInstances(Map<String, String> disabledInstances) { + _record.setMapField(ClusterConfigProperty.DISABLED_INSTANCES.name(), disabledInstances); + } + + /** + * Get current disabled instance map of <instance, disabledTimeStamp> + * @return + */ + public Map<String, String> getDisabledInstances() { + return _record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name()); + } /** * Get IdealState rules defined in the cluster config. http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java index e88f37d..4343006 100644 --- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java @@ -20,6 +20,7 @@ package org.apache.helix.model; */ import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; import org.apache.helix.ZNRecord; import org.apache.helix.util.HelixUtil; @@ -426,6 +428,22 @@ public class InstanceConfig extends HelixProperty { } } + public boolean isInstanceInDomain(String domain) { + if (domain == null) { + throw new HelixException("Invalid input for domain."); + } + + if (_record.getSimpleField(InstanceConfigProperty.DOMAIN.name()) == null) { + return false; + } + + Set<String> domainSet = new HashSet<>(Arrays.asList(domain.split(","))); + Set<String> instanceDomains = new HashSet<>( + Arrays.asList(_record.getSimpleField(InstanceConfigProperty.DOMAIN.name()).split(","))); + domainSet.removeAll(instanceDomains); + return domainSet.size() == 0; + } + /** * Whether the delay rebalance is enabled for this instance. * By default, it is enable if the field is not set. http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 30483f6..5736169 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -46,6 +46,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; import org.apache.helix.model.BuiltInStateModelDefinitions; @@ -217,8 +218,10 @@ public class ClusterSetup { throw new HelixException(error); } + ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); // ensure node is disabled, otherwise fail - if (config.getInstanceEnabled()) { + if (config.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null + || !clusterConfig.getDisabledInstances().containsKey(instanceId))) { String error = "Node " + instanceId + " is enabled, cannot drop"; _logger.warn(error); throw new HelixException(error); @@ -245,8 +248,10 @@ public class ClusterSetup { throw new HelixException(error); } + ClusterConfig clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); // ensure old instance is disabled, otherwise fail - if (oldConfig.getInstanceEnabled()) { + if (oldConfig.getInstanceEnabled() && (clusterConfig.getDisabledInstances() == null + || !clusterConfig.getDisabledInstances().containsKey(oldInstanceName))) { String error = "Old instance " + oldInstanceName + " is enabled, it need to be disabled and turned off"; _logger.warn(error); http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java new file mode 100644 index 0000000..eab5c32 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java @@ -0,0 +1,110 @@ +package org.apache.helix.integration; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.integration.task.TaskTestBase; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ExternalView; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestBatchEnableInstances extends TaskTestBase { + private ConfigAccessor _accessor; + + @BeforeClass + public void beforeClass() throws Exception { + _numDbs = 1; + _numReplicas = 3; + _numNodes = 5; + _numParitions = 4; + super.beforeClass(); + _accessor = new ConfigAccessor(_gZkClient); + } + + @Test + public void testOldEnableDisable() throws InterruptedException { + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false); + Thread.sleep(2000); + + ExternalView externalView = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { + Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName())); + } + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), true); + } + + @Test + public void testBatchEnableDisable() throws InterruptedException { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, + Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()), + false); + Thread.sleep(2000); + + ExternalView externalView = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { + Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName())); + Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName())); + } + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, + Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()), + true); + } + + @Test + public void testOldDisableBatchEnable() throws InterruptedException { + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), false); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, + Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()), + true); + Thread.sleep(2000); + + ExternalView externalView = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + int numOfFirstHost = 0; + for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { + if (stateMap.keySet().contains(_participants[0].getInstanceName())) { + numOfFirstHost++; + } + } + Assert.assertTrue(numOfFirstHost > 0); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), true); + } + + @Test + public void testBatchDisableOldEnable() throws InterruptedException { + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, + Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()), + false); + _gSetupTool.getClusterManagementTool() + .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), true); + Thread.sleep(2000); + + ExternalView externalView = _gSetupTool.getClusterManagementTool() + .getResourceExternalView(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB); + Assert.assertEquals(externalView.getRecord().getMapFields().size(), _numParitions); + int numOfFirstHost = 0; + for (Map<String, String> stateMap : externalView.getRecord().getMapFields().values()) { + if (stateMap.keySet().contains(_participants[0].getInstanceName())) { + numOfFirstHost++; + } + Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName())); + } + Assert.assertTrue(numOfFirstHost > 0); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, + Arrays.asList(_participants[0].getInstanceName(), _participants[1].getInstanceName()), + true); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java index 2de3a10..04c3ed4 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java @@ -90,10 +90,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (7 + 5 * n); + return watchPaths.size() == (8 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers."); + Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() { @@ -115,7 +115,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // printHandlers(participantManagerToExpire); int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManagerToExpire.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, 10, + Assert.assertEquals(controllerHandlerNb, 11, "HelixController should have 10 (5+2n) callback handlers for 2 (n) participant"); Assert.assertEquals(particHandlerNb, 1, "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback handlers"); @@ -145,10 +145,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (7 + 5 * n); + return watchPaths.size() == (8 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); + Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() { @@ -241,7 +241,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManager.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, 10, + Assert.assertEquals(controllerHandlerNb, 11, "HelixController should have 10 (6+2n) callback handlers for 2 participant, but was " + controllerHandlerNb + ", " + printHandlers(controller)); Assert.assertEquals(particHandlerNb, 1, @@ -273,10 +273,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (7 + 5 * n); + return watchPaths.size() == (8 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); + Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() { http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java index 176fe33..a30de78 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java @@ -244,7 +244,7 @@ public class TestConsecutiveZkSessionExpiry extends ZkUnitTestBase { Assert .assertEquals( handlers.size(), - 1, + 2, "Distributed controller should have 2 handler (message) after lose leadership, but was " + handlers.size()); http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java index ea00888..142cd1f 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java @@ -147,7 +147,7 @@ public class TestDistributedControllerManager extends ZkIntegrationTestBase { Assert .assertEquals( handlers.size(), - 1, + 2, "Distributed controller should have 1 handler (message) after lose leadership, but was " + handlers.size()); } http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java index 00f95b0..ec2dd1b 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java @@ -96,7 +96,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { return watchPaths.size() == (7 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers."); + Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers."); // check participant zk-watchers final MockParticipantManager participantManagerToExpire = participants[0]; @@ -152,7 +152,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { return watchPaths.size() == (7 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers after session expiry."); + Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() { @@ -237,7 +237,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { int controllerHandlerNb = controller.getHandlers().size(); int particHandlerNb = participantManager.getHandlers().size(); - Assert.assertEquals(controllerHandlerNb, (6 + 2 * n), + Assert.assertEquals(controllerHandlerNb, (7 + 2 * n), "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " + controllerHandlerNb + ", " + TestHelper.printHandlers(controller)); Assert.assertEquals(particHandlerNb, 1, @@ -270,10 +270,10 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { // controller should have 5 + 2n + m + (m+2)n zk-watchers // where n is number of nodes and m is number of resources - return watchPaths.size() == (7 + 5 * n); + return watchPaths.size() == (8 + 5 * n); } }, 500); - Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers after session expiry."); + Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers after session expiry."); // check participant zk-watchers result = TestHelper.verify(new TestHelper.Verifier() { http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java index beeb3cf..8679007 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java @@ -219,6 +219,11 @@ public class MockHelixAdmin implements HelixAdmin { } + @Override public void enableInstance(String clusterName, List<String> instances, + boolean enabled) { + + } + @Override public void enableResource(String clusterName, String resourceName, boolean enabled) { } @@ -366,6 +371,15 @@ public class MockHelixAdmin implements HelixAdmin { } + @Override + public Map<String, String> getBatchDisabledInstances(String clusterName) { + return null; + } + + @Override public List<String> getInstancesByDomain(String clusterName, String domain) { + return null; + } + @Override public void close() { } http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java index 4109fa3..e8c7b0d 100644 --- a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java @@ -110,6 +110,7 @@ public class TaskSynchronizedTestBase extends ZkIntegrationTestBase { } protected void setupParticipants() { + _participants = new MockParticipantManager[_numNodes]; for (int i = 0; i < _numNodes; i++) { String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i); _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java index 4fe8060..5b25c3e 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java @@ -36,6 +36,7 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.CurrentState; import org.apache.helix.model.Error; import org.apache.helix.model.HealthStat; @@ -91,13 +92,14 @@ public class InstanceAccessor extends AbstractResource { } List<String> liveInstances = accessor.getChildNames(accessor.keyBuilder().liveInstances()); - + ClusterConfig clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig()); for (String instanceName : instances) { InstanceConfig instanceConfig = accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName)); if (instanceConfig != null) { - if (!instanceConfig.getInstanceEnabled()) { + if (!instanceConfig.getInstanceEnabled() || (clusterConfig.getDisabledInstances() != null + && clusterConfig.getDisabledInstances().containsKey(instanceName))) { disabledNode.add(JsonNodeFactory.instance.textNode(instanceName)); }
