Repository: helix
Updated Branches:
  refs/heads/master 03e8580ec -> 7ba8c5197


Batch API Implementation

This rb include
1. The batch API define and batch API implementation and make old API backward 
compatible.
2. The server changes the logic to determine which are the disabled instances 
for both batch API or old API.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/31cec911
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/31cec911
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/31cec911

Branch: refs/heads/master
Commit: 31cec9114b77ee37f79da137ed914c7b330780f3
Parents: 3f34a8e
Author: Junkai Xue <[email protected]>
Authored: Wed Oct 11 18:18:20 2017 -0700
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:30:10 2018 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/helix/HelixAdmin.java  |  29 ++
 .../controller/GenericHelixController.java      |   1 +
 .../rebalancer/DelayedAutoRebalancer.java       |  17 +-
 .../rebalancer/topology/Topology.java           |   6 +-
 .../controller/stages/ClusterDataCache.java     |   7 +-
 .../controller/stages/ReadClusterDataStage.java |   5 +-
 .../manager/zk/ControllerManagerHelper.java     |   1 +
 .../apache/helix/manager/zk/ZKHelixAdmin.java   | 307 ++++++++++++-------
 .../org/apache/helix/model/ClusterConfig.java   |  18 +-
 .../org/apache/helix/model/InstanceConfig.java  |  18 ++
 .../org/apache/helix/tools/ClusterSetup.java    |   9 +-
 .../integration/TestBatchEnableInstances.java   | 110 +++++++
 .../integration/TestZkCallbackHandlerLeak.java  |  16 +-
 .../manager/TestConsecutiveZkSessionExpiry.java |   2 +-
 .../TestDistributedControllerManager.java       |   2 +-
 .../manager/TestZkCallbackHandlerLeak.java      |  10 +-
 .../org/apache/helix/mock/MockHelixAdmin.java   |  14 +
 .../helix/task/TaskSynchronizedTestBase.java    |   1 +
 .../rest/server/resources/InstanceAccessor.java |   6 +-
 19 files changed, 447 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
index 7438ee9..652ab7a 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixAdmin.java
@@ -217,6 +217,15 @@ public interface HelixAdmin {
   void enableInstance(String clusterName, String instanceName, boolean 
enabled);
 
   /**
+   * Batch enable/disable instances in a cluster
+   * By default, all the instances are enabled
+   * @param clusterName
+   * @param instances
+   * @param enabled
+   */
+  void enableInstance(String clusterName, List<String> instances, boolean 
enabled);
+
+  /**
    * Disable or enable a resource
    * @param clusterName
    * @param resourceName
@@ -479,6 +488,26 @@ public interface HelixAdmin {
    */
   void enableBatchMessageMode(String clusterName, String resourceName, boolean 
enabled);
 
+
+
+  /**
+   * Get batch disabled instance map (disabled instance -> disabled time) in a 
cluster. It will
+   * include disabled instances and instances in disabled zones
+   * @param clusterName
+   * @return
+   */
+  Map<String, String> getBatchDisabledInstances(String clusterName);
+
+  /**
+   * Get list of instances by domain for a cluster
+   *
+   * Example : domain could be "helixZoneId=1,rackId=3". All the instances 
domain contains these
+   * two domains will be selected.
+   * @param clusterName
+   * @return
+   */
+  List<String> getInstancesByDomain(String clusterName, String domain);
+
   /**
    * Release resources
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 2f7037b..c182ada 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -268,6 +268,7 @@ public class GenericHelixController implements 
IdealStateChangeListener,
       registry.register(ClusterEventType.CurrentStateChange, dataRefresh, 
rebalancePipeline, externalViewPipeline);
       registry.register(ClusterEventType.InstanceConfigChange, dataRefresh, 
rebalancePipeline);
       registry.register(ClusterEventType.ResourceConfigChange, dataRefresh, 
rebalancePipeline);
+      registry.register(ClusterEventType.ClusterConfigChange, dataRefresh, 
rebalancePipeline);
       registry.register(ClusterEventType.LiveInstanceChange, dataRefresh, 
liveInstancePipeline, rebalancePipeline,
           externalViewPipeline);
       registry.register(ClusterEventType.MessageChange, dataRefresh, 
rebalancePipeline);

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 2dcad52..a44aa11 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -210,7 +210,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
     long currentTime = System.currentTimeMillis();
     for (String ins : offlineOrDisabledInstances) {
       long inactiveTime = getInactiveTime(ins, liveNodes, 
instanceOfflineTimeMap.get(ins), delay,
-          instanceConfigMap.get(ins));
+          instanceConfigMap.get(ins), clusterConfig);
       InstanceConfig instanceConfig = instanceConfigMap.get(ins);
       if (inactiveTime > currentTime && instanceConfig != null && 
instanceConfig
           .isDelayRebalanceEnabled()) {
@@ -237,7 +237,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
     // calculate the closest future rebalance time
     for (String ins : offlineOrDisabledInstances) {
       long inactiveTime = getInactiveTime(ins, liveNodes, 
instanceOfflineTimeMap.get(ins), delay,
-          instanceConfigMap.get(ins));
+          instanceConfigMap.get(ins), clusterConfig);
       if (inactiveTime != -1 && inactiveTime > currentTime && inactiveTime < 
nextRebalanceTime) {
         nextRebalanceTime = inactiveTime;
       }
@@ -265,7 +265,7 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
    * @return
    */
   private long getInactiveTime(String instance, Set<String> liveInstances, 
Long offlineTime,
-      long delay, InstanceConfig instanceConfig) {
+      long delay, InstanceConfig instanceConfig, ClusterConfig clusterConfig) {
     long inactiveTime = Long.MAX_VALUE;
 
     // check the time instance went offline.
@@ -276,8 +276,17 @@ public class DelayedAutoRebalancer extends 
AbstractRebalancer {
     }
 
     // check the time instance got disabled.
-    if (!instanceConfig.getInstanceEnabled()) {
+    if (!instanceConfig.getInstanceEnabled() || 
(clusterConfig.getDisabledInstances() != null
+        && clusterConfig.getDisabledInstances().containsKey(instance))) {
       long disabledTime = instanceConfig.getInstanceEnabledTime();
+      if (clusterConfig.getDisabledInstances() != null && 
clusterConfig.getDisabledInstances()
+          .containsKey(instance)) {
+        // Update batch disable time
+        long batchDisableTime = 
Long.parseLong(clusterConfig.getDisabledInstances().get(instance));
+        if (disabledTime == -1 || disabledTime > batchDisableTime) {
+          disabledTime = batchDisableTime;
+        }
+      }
       if (disabledTime > 0 && disabledTime + delay < inactiveTime) {
         inactiveTime = disabledTime + delay;
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
index 8350bd8..bf00e0e 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java
@@ -229,7 +229,8 @@ public class Topology {
         String zone = config.getZoneId();
         if (zone == null) {
           // we have the hierarchy style of domain id for instance.
-          if (config.getInstanceEnabled()) {
+          if (config.getInstanceEnabled() && 
(_clusterConfig.getDisabledInstances() == null
+              || !_clusterConfig.getDisabledInstances().containsKey(ins))) {
             // if enabled instance missing ZONE_ID information, fails the 
rebalance.
             throw new HelixException(String
                 .format("ZONE_ID for instance %s is not set, failed the 
topology-aware placement!",
@@ -274,7 +275,8 @@ public class Topology {
       }
       String domain = insConfig.getDomain();
       if (domain == null) {
-        if (insConfig.getInstanceEnabled()) {
+        if (insConfig.getInstanceEnabled() && 
(_clusterConfig.getDisabledInstances() == null
+            || !_clusterConfig.getDisabledInstances().containsKey(ins))) {
           // if enabled instance missing domain information, fails the 
rebalance.
           throw new HelixException(String
               .format("Domain for instance %s is not set, failed the 
topology-aware placement!",

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 97e91f6..8999ed7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -701,7 +701,8 @@ public class ClusterDataCache {
     Set<String> disabledInstancesSet = new HashSet<String>();
     for (String instance : _instanceConfigMap.keySet()) {
       InstanceConfig config = _instanceConfigMap.get(instance);
-      if (config.getInstanceEnabled() == false
+      if (config.getInstanceEnabled() == false || 
(_clusterConfig.getDisabledInstances() != null
+          && _clusterConfig.getDisabledInstances().containsKey(instance))
           || config.getInstanceEnabledForPartition(resource, partition) == 
false) {
         disabledInstancesSet.add(instance);
       }
@@ -718,7 +719,9 @@ public class ClusterDataCache {
     Set<String> disabledInstancesSet = new HashSet<>();
     for (String instance : _instanceConfigMap.keySet()) {
       InstanceConfig config = _instanceConfigMap.get(instance);
-      if (!config.getInstanceEnabled()) {
+      if (!config.getInstanceEnabled()
+          || (_clusterConfig.getDisabledInstances() != null && 
_clusterConfig.getDisabledInstances()
+          .containsKey(instance))) {
         disabledInstancesSet.add(instance);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index 9313157..9361249 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -29,6 +29,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
 import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
@@ -60,6 +61,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
 
     HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
     _cache.refresh(dataAccessor);
+    final ClusterConfig clusterConfig = cache.getClusterConfig();
     if (!_cache.isTaskCache()) {
       final ClusterStatusMonitor clusterStatusMonitor =
           event.getAttribute(AttributeName.clusterStatusMonitor.name());
@@ -83,7 +85,8 @@ public class ReadClusterDataStage extends AbstractBaseStage {
               if (liveInstanceMap.containsKey(instanceName)) {
                 liveInstanceSet.add(instanceName);
               }
-              if (!config.getInstanceEnabled()) {
+              if (!config.getInstanceEnabled() || 
(clusterConfig.getDisabledInstances() != null
+                  && 
clusterConfig.getDisabledInstances().containsKey(instanceName))) {
                 disabledInstanceSet.add(instanceName);
               }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
index a29ef78..554b09a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -80,6 +80,7 @@ public class ControllerManagerHelper {
        */
       _manager.addInstanceConfigChangeListener(controller);
       _manager.addResourceConfigChangeListener(controller);
+      _manager.addClusterfigChangeListener(controller);
       _manager.addLiveInstanceChangeListener(controller);
       _manager.addIdealStateChangeListener(controller);
       _manager.addControllerListener(controller);

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index f5897af..c3fa9e9 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -33,11 +33,14 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
+import org.apache.commons.math.stat.clustering.Cluster;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
@@ -74,7 +77,6 @@ import org.apache.helix.util.RebalanceUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ZKHelixAdmin implements HelixAdmin {
   public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec";
   private final ZkClient _zkClient;
@@ -123,19 +125,20 @@ public class ZKHelixAdmin implements HelixAdmin {
 
     String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(clusterName, instanceName);
     if (!_zkClient.exists(instanceConfigPath)) {
-      throw new HelixException("Node " + instanceName + " does not exist in 
config for cluster "
-          + clusterName);
+      throw new HelixException(
+          "Node " + instanceName + " does not exist in config for cluster " + 
clusterName);
     }
 
     String instancePath = PropertyPathBuilder.instance(clusterName, 
instanceName);
     if (!_zkClient.exists(instancePath)) {
-      throw new HelixException("Node " + instanceName + " does not exist in 
instances for cluster "
-          + clusterName);
+      throw new HelixException(
+          "Node " + instanceName + " does not exist in instances for cluster " 
+ clusterName);
     }
 
     String liveInstancePath = PropertyPathBuilder.liveInstance(clusterName, 
instanceName);
     if (_zkClient.exists(liveInstancePath)) {
-      throw new HelixException("Node " + instanceName + " is still alive for 
cluster " + clusterName + ", can't drop.");
+      throw new HelixException(
+          "Node " + instanceName + " is still alive for cluster " + 
clusterName + ", can't drop.");
     }
 
     // delete config path
@@ -150,8 +153,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   public InstanceConfig getInstanceConfig(String clusterName, String 
instanceName) {
     String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(clusterName, instanceName);
     if (!_zkClient.exists(instanceConfigPath)) {
-      throw new HelixException("instance" + instanceName + " does not exist in 
cluster "
-          + clusterName);
+      throw new HelixException(
+          "instance" + instanceName + " does not exist in cluster " + 
clusterName);
     }
 
     HelixDataAccessor accessor =
@@ -161,7 +164,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     return accessor.getProperty(keyBuilder.instanceConfig(instanceName));
   }
 
-  @Override public boolean setInstanceConfig(String clusterName, String 
instanceName,
+  @Override
+  public boolean setInstanceConfig(String clusterName, String instanceName,
       InstanceConfig newInstanceConfig) {
     String instanceConfigPath = 
PropertyPathBuilder.getPath(PropertyType.CONFIGS, clusterName,
         HelixConfigScope.ConfigScopeProperty.PARTICIPANT.toString(), 
instanceName);
@@ -188,27 +192,22 @@ public class ZKHelixAdmin implements HelixAdmin {
   @Override
   public void enableInstance(final String clusterName, final String 
instanceName,
       final boolean enabled) {
-    String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
-
-    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<ZNRecord>(_zkClient);
-    if (!baseAccessor.exists(path, 0)) {
-      throw new HelixException("Cluster " + clusterName + ", instance: " + 
instanceName
-          + ", instance config does not exist");
-    }
-
-    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        if (currentData == null) {
-          throw new HelixException("Cluster: " + clusterName + ", instance: " 
+ instanceName
-              + ", participant config is null");
-        }
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
+    enableSingleInstance(clusterName, instanceName, enabled, baseAccessor);
+    enableBatchInstances(clusterName, Collections.singletonList(instanceName), 
enabled,
+        baseAccessor);
+  }
 
-        InstanceConfig config = new InstanceConfig(currentData);
-        config.setInstanceEnabled(enabled);
-        return config.getRecord();
+  @Override
+  public void enableInstance(String clusterName, List<String> instances,
+      boolean enabled) {
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
+    if (enabled) {
+      for (String instance : instances) {
+        enableSingleInstance(clusterName, instance, enabled, baseAccessor);
       }
-    }, AccessOption.PERSISTENT);
+    }
+    enableBatchInstances(clusterName, instances, enabled, baseAccessor);
   }
 
   @Override
@@ -221,7 +220,8 @@ public class ZKHelixAdmin implements HelixAdmin {
           + ", ideal-state does not exist");
     }
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
         if (currentData == null) {
           throw new HelixException(
               "Cluster: " + clusterName + ", resource: " + resourceName + ", 
ideal-state is null");
@@ -260,22 +260,22 @@ public class ZKHelixAdmin implements HelixAdmin {
     if (idealStateRecord == null) {
       // throw new HelixException("Cluster: " + clusterName + ", resource: " + 
resourceName
       // + ", ideal state does not exist");
-      logger.warn("Disable partitions: " + partitionNames + " but Cluster: " + 
clusterName
-          + ", resource: " + resourceName
-          + " does not exists. probably disable it during ERROR->DROPPED 
transtition");
-
+      logger.warn(
+          "Disable partitions: " + partitionNames + " but Cluster: " + 
clusterName + ", resource: "
+              + resourceName
+              + " does not exists. probably disable it during ERROR->DROPPED 
transtition");
     } else {
       // check partitions exist. warn if not
       IdealState idealState = new IdealState(idealStateRecord);
       for (String partitionName : partitionNames) {
-        if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO && 
idealState
-            .getPreferenceList(partitionName) == null)
-            || (idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED && 
idealState
-                .getPreferenceList(partitionName) == null)
-            || (idealState.getRebalanceMode() == RebalanceMode.TASK && 
idealState
-                .getPreferenceList(partitionName) == null)
-            || (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED && 
idealState
-                .getInstanceStateMap(partitionName) == null)) {
+        if ((idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO
+            && idealState.getPreferenceList(partitionName) == null) || (
+            idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED
+                && idealState.getPreferenceList(partitionName) == null) || (
+            idealState.getRebalanceMode() == RebalanceMode.TASK
+                && idealState.getPreferenceList(partitionName) == null) || (
+            idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED
+                && idealState.getInstanceStateMap(partitionName) == null)) {
           logger.warn("Cluster: " + clusterName + ", resource: " + 
resourceName + ", partition: "
               + partitionName + ", partition does not exist in ideal state");
         }
@@ -285,7 +285,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     // update participantConfig
     // could not use ZNRecordUpdater since it doesn't do listField 
merge/subtract
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
         if (currentData == null) {
           throw new HelixException("Cluster: " + clusterName + ", instance: " 
+ instanceName
               + ", participant config is null");
@@ -339,15 +340,17 @@ public class ZKHelixAdmin implements HelixAdmin {
     // check the instance is alive
     LiveInstance liveInstance = 
accessor.getProperty(keyBuilder.liveInstance(instanceName));
     if (liveInstance == null) {
-      throw new HelixException("Can't reset state for " + resourceName + "/" + 
partitionNames
-          + " on " + instanceName + ", because " + instanceName + " is not 
alive");
+      throw new HelixException(
+          "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+              + ", because " + instanceName + " is not alive");
     }
 
     // check resource group exists
     IdealState idealState = 
accessor.getProperty(keyBuilder.idealStates(resourceName));
     if (idealState == null) {
-      throw new HelixException("Can't reset state for " + resourceName + "/" + 
partitionNames
-          + " on " + instanceName + ", because " + resourceName + " is not 
added");
+      throw new HelixException(
+          "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+              + ", because " + resourceName + " is not added");
     }
 
     // check partition exists in resource group
@@ -355,14 +358,16 @@ public class ZKHelixAdmin implements HelixAdmin {
     if (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED) {
       Set<String> partitions = new 
HashSet<String>(idealState.getRecord().getMapFields().keySet());
       if (!partitions.containsAll(resetPartitionNames)) {
-        throw new HelixException("Can't reset state for " + resourceName + "/" 
+ partitionNames
-            + " on " + instanceName + ", because not all " + partitionNames + 
" exist");
+        throw new HelixException(
+            "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+                + ", because not all " + partitionNames + " exist");
       }
     } else {
       Set<String> partitions = new 
HashSet<String>(idealState.getRecord().getListFields().keySet());
       if (!partitions.containsAll(resetPartitionNames)) {
-        throw new HelixException("Can't reset state for " + resourceName + "/" 
+ partitionNames
-            + " on " + instanceName + ", because not all " + partitionNames + 
" exist");
+        throw new HelixException(
+            "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+                + ", because not all " + partitionNames + " exist");
       }
     }
 
@@ -372,8 +377,9 @@ public class ZKHelixAdmin implements HelixAdmin {
         accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, 
resourceName));
     for (String partitionName : resetPartitionNames) {
       if 
(!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
-        throw new HelixException("Can't reset state for " + resourceName + "/" 
+ partitionNames
-            + " on " + instanceName + ", because not all " + partitionNames + 
" are in ERROR state");
+        throw new HelixException(
+            "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+                + ", because not all " + partitionNames + " are in ERROR 
state");
       }
     }
 
@@ -381,22 +387,23 @@ public class ZKHelixAdmin implements HelixAdmin {
     String stateModelDef = idealState.getStateModelDefRef();
     StateModelDefinition stateModel = 
accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
     if (stateModel == null) {
-      throw new HelixException("Can't reset state for " + resourceName + "/" + 
partitionNames
-          + " on " + instanceName + ", because " + stateModelDef + " is NOT 
found");
+      throw new HelixException(
+          "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+              + ", because " + stateModelDef + " is NOT found");
     }
 
     // check there is no pending messages for the partitions exist
     List<Message> messages = 
accessor.getChildValues(keyBuilder.messages(instanceName));
     for (Message message : messages) {
-      if 
(!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
-          || !sessionId.equals(message.getTgtSessionId())
-          || !resourceName.equals(message.getResourceName())
+      if 
(!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || 
!sessionId
+          .equals(message.getTgtSessionId()) || 
!resourceName.equals(message.getResourceName())
           || !resetPartitionNames.contains(message.getPartitionName())) {
         continue;
       }
 
-      throw new HelixException("Can't reset state for " + resourceName + "/" + 
partitionNames
-          + " on " + instanceName + ", because a pending message exists: " + 
message);
+      throw new HelixException(
+          "Can't reset state for " + resourceName + "/" + partitionNames + " 
on " + instanceName
+              + ", because a pending message exists: " + message);
     }
 
     String adminName = null;
@@ -455,8 +462,8 @@ public class ZKHelixAdmin implements HelixAdmin {
         for (String partitionName : stateMap.keySet()) {
           Map<String, String> instanceStateMap = stateMap.get(partitionName);
 
-          if (instanceStateMap.containsKey(instanceName)
-              && 
instanceStateMap.get(instanceName).equals(HelixDefinedState.ERROR.toString())) {
+          if (instanceStateMap.containsKey(instanceName) && 
instanceStateMap.get(instanceName)
+              .equals(HelixDefinedState.ERROR.toString())) {
             resetPartitionNames.add(partitionName);
           }
         }
@@ -629,12 +636,13 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void addResource(String clusterName, String resourceName, IdealState 
idealstate) {
+  public void addResource(String clusterName, String resourceName,
+      IdealState idealstate) {
     String stateModelRef = idealstate.getStateModelDefRef();
     String stateModelDefPath = PropertyPathBuilder.stateModelDef(clusterName, 
stateModelRef);
     if (!_zkClient.exists(stateModelDefPath)) {
-      throw new HelixException("State model " + stateModelRef
-          + " not found in the cluster STATEMODELDEFS path");
+      throw new HelixException(
+          "State model " + stateModelRef + " not found in the cluster 
STATEMODELDEFS path");
     }
 
     String idealStatePath = PropertyPathBuilder.idealState(clusterName);
@@ -652,7 +660,6 @@ public class ZKHelixAdmin implements HelixAdmin {
       String stateModelRef, String rebalancerMode, int bucketSize) {
     addResource(clusterName, resourceName, partitions, stateModelRef, 
rebalancerMode, bucketSize,
         -1);
-
   }
 
   @Override
@@ -733,7 +740,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void setResourceIdealState(String clusterName, String resourceName, 
IdealState idealState) {
+  public void setResourceIdealState(String clusterName, String resourceName,
+      IdealState idealState) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -765,8 +773,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     String stateModelPath = stateModelDefPath + "/" + stateModelDef;
     if (_zkClient.exists(stateModelPath)) {
       if (recreateIfExists) {
-        logger.info("Operation.State Model directory exists:" + stateModelPath 
+
-            ", remove and recreate.");
+        logger.info(
+            "Operation.State Model directory exists:" + stateModelPath + ", 
remove and recreate.");
         _zkClient.deleteRecursive(stateModelPath);
       } else {
         logger.info("Skip the operation. State Model directory exists:" + 
stateModelPath);
@@ -796,7 +804,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public StateModelDefinition getStateModelDef(String clusterName, String 
stateModelName) {
+  public StateModelDefinition getStateModelDef(String clusterName,
+      String stateModelName) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
     Builder keyBuilder = accessor.keyBuilder();
@@ -884,8 +893,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void rebalance(String clusterName, String resourceName, int replica, 
String keyPrefix,
-      String group) {
+  public void rebalance(String clusterName, String resourceName, int replica,
+      String keyPrefix, String group) {
     List<String> instanceNames = new LinkedList<String>();
     if (keyPrefix == null || keyPrefix.length() == 0) {
       keyPrefix = resourceName;
@@ -904,7 +913,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void rebalance(String clusterName, String resourceName, int replica, 
List<String> instances) {
+  public void rebalance(String clusterName, String resourceName, int replica,
+      List<String> instances) {
     rebalance(clusterName, resourceName, replica, resourceName, instances, "");
   }
 
@@ -966,9 +976,9 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
     if (idealState.getRebalanceMode() != RebalanceMode.FULL_AUTO
         && idealState.getRebalanceMode() != RebalanceMode.USER_DEFINED) {
-      ZNRecord newIdealState =
-          DefaultIdealStateCalculator.calculateIdealState(instanceNames, 
partitions, replica,
-              keyPrefix, masterStateValue, slaveStateValue);
+      ZNRecord newIdealState = DefaultIdealStateCalculator
+          .calculateIdealState(instanceNames, partitions, replica, keyPrefix, 
masterStateValue,
+              slaveStateValue);
 
       // for now keep mapField in SEMI_AUTO mode and remove listField in 
CUSTOMIZED mode
       if (idealState.getRebalanceMode() == RebalanceMode.SEMI_AUTO) {
@@ -990,8 +1000,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void addIdealState(String clusterName, String resourceName, String 
idealStateFile)
-      throws IOException {
+  public void addIdealState(String clusterName, String resourceName,
+      String idealStateFile) throws IOException {
     ZNRecord idealStateRecord =
         (ZNRecord) (new 
ZNRecordSerializer().deserialize(readFile(idealStateFile)));
     if (idealStateRecord.getId() == null || 
!idealStateRecord.getId().equals(resourceName)) {
@@ -1042,10 +1052,11 @@ public class ZKHelixAdmin implements HelixAdmin {
     String path = keyBuilder.constraint(constraintType.toString()).getPath();
 
     baseAccessor.update(path, new DataUpdater<ZNRecord>() {
-      @Override public ZNRecord update(ZNRecord currentData) {
-        ClusterConstraints constraints = currentData == null ?
-            new ClusterConstraints(constraintType) :
-            new ClusterConstraints(currentData);
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        ClusterConstraints constraints = currentData == null
+            ? new ClusterConstraints(constraintType)
+            : new ClusterConstraints(currentData);
 
         constraints.addConstraintItem(constraintId, constraintItem);
         return constraints.getRecord();
@@ -1076,7 +1087,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public ClusterConstraints getConstraints(String clusterName, ConstraintType 
constraintType) {
+  public ClusterConstraints getConstraints(String clusterName,
+      ConstraintType constraintType) {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
 
@@ -1085,16 +1097,18 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   /**
-   * Takes the existing idealstate as input and computes newIdealState such 
that
-   * the partition movement is minimized. The partitions are redistributed 
among the instances
-   * provided.
+   * Takes the existing idealstate as input and computes newIdealState such 
that the partition
+   * movement is minimized. The partitions are redistributed among the 
instances provided.
+   *
    * @param clusterName
    * @param currentIdealState
    * @param instanceNames
+   *
    * @return
    */
   @Override
-  public void rebalance(String clusterName, IdealState currentIdealState, 
List<String> instanceNames) {
+  public void rebalance(String clusterName, IdealState currentIdealState,
+      List<String> instanceNames) {
     Set<String> activeInstances = new HashSet<String>();
     for (String partition : currentIdealState.getPartitionSet()) {
       
activeInstances.addAll(currentIdealState.getRecord().getListField(partition));
@@ -1109,14 +1123,14 @@ public class ZKHelixAdmin implements HelixAdmin {
         this.getStateModelDef(clusterName, 
currentIdealState.getStateModelDefRef());
 
     if (stateModDef == null) {
-      throw new HelixException("cannot find state model: "
-          + currentIdealState.getStateModelDefRef());
+      throw new HelixException(
+          "cannot find state model: " + 
currentIdealState.getStateModelDefRef());
     }
     String[] states = RebalanceUtil.parseStates(clusterName, stateModDef);
 
-    ZNRecord newIdealStateRecord =
-        DefaultIdealStateCalculator.convertToZNRecord(balancedRecord,
-            currentIdealState.getResourceName(), states[0], states[1]);
+    ZNRecord newIdealStateRecord = DefaultIdealStateCalculator
+        .convertToZNRecord(balancedRecord, 
currentIdealState.getResourceName(), states[0],
+            states[1]);
     Set<String> partitionSet = new HashSet<String>();
     partitionSet.addAll(newIdealStateRecord.getMapFields().keySet());
     partitionSet.addAll(newIdealStateRecord.getListFields().keySet());
@@ -1129,12 +1143,12 @@ public class ZKHelixAdmin implements HelixAdmin {
         if (partition.equals(originPartitionName)) {
           continue;
         }
-        newIdealStateRecord.getMapFields().put(originPartitionName,
-            newIdealStateRecord.getMapField(partition));
+        newIdealStateRecord.getMapFields()
+            .put(originPartitionName, 
newIdealStateRecord.getMapField(partition));
         newIdealStateRecord.getMapFields().remove(partition);
 
-        newIdealStateRecord.getListFields().put(originPartitionName,
-            newIdealStateRecord.getListField(partition));
+        newIdealStateRecord.getListFields()
+            .put(originPartitionName, 
newIdealStateRecord.getListField(partition));
         newIdealStateRecord.getListFields().remove(partition);
       }
     }
@@ -1151,8 +1165,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, 
InstanceType.PARTICIPANT)) {
-      throw new HelixException("cluster " + clusterName + " instance " + 
instanceName
-          + " is not setup yet");
+      throw new HelixException(
+          "cluster " + clusterName + " instance " + instanceName + " is not 
setup yet");
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
@@ -1170,8 +1184,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, 
InstanceType.PARTICIPANT)) {
-      throw new HelixException("cluster " + clusterName + " instance " + 
instanceName
-          + " is not setup yet");
+      throw new HelixException(
+          "cluster " + clusterName + " instance " + instanceName + " is not 
setup yet");
     }
     ZKHelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
@@ -1189,8 +1203,8 @@ public class ZKHelixAdmin implements HelixAdmin {
     }
 
     if (!ZKUtil.isInstanceSetup(_zkClient, clusterName, instanceName, 
InstanceType.PARTICIPANT)) {
-      throw new HelixException("cluster " + clusterName + " instance " + 
instanceName
-          + " is not setup yet");
+      throw new HelixException(
+          "cluster " + clusterName + " instance " + instanceName + " is not 
setup yet");
     }
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new 
ZkBaseDataAccessor<ZNRecord>(_zkClient));
@@ -1214,7 +1228,8 @@ public class ZKHelixAdmin implements HelixAdmin {
   }
 
   @Override
-  public void enableBatchMessageMode(String clusterName, String resourceName, 
boolean enabled) {
+  public void enableBatchMessageMode(String clusterName, String resourceName,
+      boolean enabled) {
     // TODO: Change IdealState to ResourceConfig when configs are migrated to 
ResourceConfig
     IdealState idealState = getResourceIdealState(clusterName, resourceName);
     if (idealState == null) {
@@ -1226,8 +1241,94 @@ public class ZKHelixAdmin implements HelixAdmin {
     setResourceIdealState(clusterName, resourceName, idealState);
   }
 
+  private void enableSingleInstance(final String clusterName, final String 
instanceName,
+      final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor) {
+    String path = PropertyPathBuilder.instanceConfig(clusterName, 
instanceName);
+
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ", instance: " + 
instanceName
+          + ", instance config does not exist");
+    }
+
+    baseAccessor.update(path, new DataUpdater<ZNRecord>()
+
+    {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ", instance: " 
+ instanceName
+              + ", participant config is null");
+        }
+
+        InstanceConfig config = new InstanceConfig(currentData);
+        config.setInstanceEnabled(enabled);
+        return config.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  private void enableBatchInstances(final String clusterName, final 
List<String> instances,
+      final boolean enabled, BaseDataAccessor<ZNRecord> baseAccessor) {
+
+    String path = PropertyPathBuilder.clusterConfig(clusterName);
+
+    if (!baseAccessor.exists(path, 0)) {
+      throw new HelixException("Cluster " + clusterName + ": cluster config 
does not exist");
+    }
+
+    baseAccessor.update(path, new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        if (currentData == null) {
+          throw new HelixException("Cluster: " + clusterName + ": cluster 
config is null");
+        }
+
+        ClusterConfig clusterConfig = new ClusterConfig(currentData);
+        Map<String, String> disabledInstances = new TreeMap<>();
+        if (clusterConfig.getDisabledInstances() != null) {
+          disabledInstances.putAll(clusterConfig.getDisabledInstances());
+        }
+
+        if (enabled) {
+          disabledInstances.keySet().removeAll(instances);
+        } else {
+          for (String disabledInstance : instances) {
+            if (!disabledInstances.containsKey(disabledInstance)) {
+              disabledInstances.put(disabledInstance, 
String.valueOf(System.currentTimeMillis()));
+            }
+          }
+        }
+        clusterConfig.setDisabledInstances(disabledInstances);
+
+        return clusterConfig.getRecord();
+      }
+    }, AccessOption.PERSISTENT);
+  }
+
+  @Override
+  public Map<String, String> getBatchDisabledInstances(String clusterName) {
+    ConfigAccessor accessor = new ConfigAccessor(_zkClient);
+    return accessor.getClusterConfig(clusterName).getDisabledInstances();
+  }
+
   @Override
-  public void close() {
+  public List<String> getInstancesByDomain(String clusterName, String domain) {
+    List<String> instances = new ArrayList<>();
+    String path = PropertyPathBuilder.instanceConfig(clusterName);
+    BaseDataAccessor<ZNRecord> baseAccessor = new 
ZkBaseDataAccessor<>(_zkClient);
+    List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0);
+    for (ZNRecord record : znRecords) {
+      if (record != null) {
+        InstanceConfig instanceConfig = new InstanceConfig(record);
+        if (instanceConfig.isInstanceInDomain(domain)) {
+          instances.add(instanceConfig.getInstanceName());
+        }
+      }
+    }
+    return instances;
+  }
+
+  @Override public void close() {
     if (_zkClient != null) {
       _zkClient.close();
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 6f74728..2a97145 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -60,7 +60,8 @@ public class ClusterConfig extends HelixProperty {
     MAX_PARTITIONS_PER_INSTANCE,
     MAX_OFFLINE_INSTANCES_ALLOWED,
     TARGET_EXTERNALVIEW_ENABLED,
-    ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE // Controller won't execute 
load balance state transition if the number of partitons that need recovery 
exceeds this limitation
+    ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE, // Controller won't execute 
load balance state transition if the number of partitons that need recovery 
exceeds this limitation
+    DISABLED_INSTANCES
   }
   private final static int DEFAULT_MAX_CONCURRENT_TASK_PER_INSTANCE = 40;
   private final static int DEFAULT_ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE 
= 0; // By default, no load balance if any error partition
@@ -474,6 +475,21 @@ public class ClusterConfig extends HelixProperty {
     
_record.setIntField(ClusterConfigProperty.ERROR_PARTITION_THRESHOLD_FOR_LOAD_BALANCE.name(),
         errorPartitionThreshold);
   }
+  /**
+   * Set the disabled instance list
+   * @param disabledInstances
+   */
+  public void setDisabledInstances(Map<String, String> disabledInstances) {
+    _record.setMapField(ClusterConfigProperty.DISABLED_INSTANCES.name(), 
disabledInstances);
+  }
+
+  /**
+   * Get current disabled instance map of <instance, disabledTimeStamp>
+   * @return
+   */
+  public Map<String, String> getDisabledInstances() {
+    return 
_record.getMapField(ClusterConfigProperty.DISABLED_INSTANCES.name());
+  }
 
   /**
    * Get IdealState rules defined in the cluster config.

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java 
b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index e88f37d..4343006 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -20,6 +20,7 @@ package org.apache.helix.model;
  */
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.util.HelixUtil;
@@ -426,6 +428,22 @@ public class InstanceConfig extends HelixProperty {
     }
   }
 
+  public boolean isInstanceInDomain(String domain) {
+    if (domain == null) {
+      throw new HelixException("Invalid input for domain.");
+    }
+
+    if (_record.getSimpleField(InstanceConfigProperty.DOMAIN.name()) == null) {
+      return false;
+    }
+
+    Set<String> domainSet = new HashSet<>(Arrays.asList(domain.split(",")));
+    Set<String> instanceDomains = new HashSet<>(
+        
Arrays.asList(_record.getSimpleField(InstanceConfigProperty.DOMAIN.name()).split(",")));
+    domainSet.removeAll(instanceDomains);
+    return domainSet.size() == 0;
+  }
+
   /**
    * Whether the delay rebalance is enabled for this instance.
    * By default, it is enable if the field is not set.

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java 
b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
index 30483f6..5736169 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java
@@ -46,6 +46,7 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ClusterConstraints;
 import org.apache.helix.model.ClusterConstraints.ConstraintType;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -217,8 +218,10 @@ public class ClusterSetup {
       throw new HelixException(error);
     }
 
+    ClusterConfig clusterConfig = 
accessor.getProperty(keyBuilder.clusterConfig());
     // ensure node is disabled, otherwise fail
-    if (config.getInstanceEnabled()) {
+    if (config.getInstanceEnabled() && (clusterConfig.getDisabledInstances() 
== null
+        || !clusterConfig.getDisabledInstances().containsKey(instanceId))) {
       String error = "Node " + instanceId + " is enabled, cannot drop";
       _logger.warn(error);
       throw new HelixException(error);
@@ -245,8 +248,10 @@ public class ClusterSetup {
       throw new HelixException(error);
     }
 
+    ClusterConfig clusterConfig = 
accessor.getProperty(keyBuilder.clusterConfig());
     // ensure old instance is disabled, otherwise fail
-    if (oldConfig.getInstanceEnabled()) {
+    if (oldConfig.getInstanceEnabled() && 
(clusterConfig.getDisabledInstances() == null
+        || 
!clusterConfig.getDisabledInstances().containsKey(oldInstanceName))) {
       String error =
           "Old instance " + oldInstanceName + " is enabled, it need to be 
disabled and turned off";
       _logger.warn(error);

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
new file mode 100644
index 0000000..eab5c32
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestBatchEnableInstances.java
@@ -0,0 +1,110 @@
+package org.apache.helix.integration;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestBatchEnableInstances extends TaskTestBase {
+  private ConfigAccessor _accessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numDbs = 1;
+    _numReplicas = 3;
+    _numNodes = 5;
+    _numParitions = 4;
+    super.beforeClass();
+    _accessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @Test
+  public void testOldEnableDisable() throws InterruptedException {
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), 
false);
+    Thread.sleep(2000);
+
+    ExternalView externalView = _gSetupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
+      
Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
+    }
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), 
true);
+  }
+
+  @Test
+  public void testBatchEnableDisable() throws InterruptedException {
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        Arrays.asList(_participants[0].getInstanceName(), 
_participants[1].getInstanceName()),
+        false);
+    Thread.sleep(2000);
+
+    ExternalView externalView = _gSetupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
+      
Assert.assertTrue(!stateMap.keySet().contains(_participants[0].getInstanceName()));
+      
Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName()));
+    }
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        Arrays.asList(_participants[0].getInstanceName(), 
_participants[1].getInstanceName()),
+        true);
+  }
+
+  @Test
+  public void testOldDisableBatchEnable() throws InterruptedException {
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), 
false);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        Arrays.asList(_participants[0].getInstanceName(), 
_participants[1].getInstanceName()),
+        true);
+    Thread.sleep(2000);
+
+    ExternalView externalView = _gSetupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    int numOfFirstHost = 0;
+    for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
+      if (stateMap.keySet().contains(_participants[0].getInstanceName())) {
+        numOfFirstHost++;
+      }
+    }
+    Assert.assertTrue(numOfFirstHost > 0);
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), 
true);
+  }
+
+  @Test
+  public void testBatchDisableOldEnable() throws InterruptedException {
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        Arrays.asList(_participants[0].getInstanceName(), 
_participants[1].getInstanceName()),
+        false);
+    _gSetupTool.getClusterManagementTool()
+        .enableInstance(CLUSTER_NAME, _participants[0].getInstanceName(), 
true);
+    Thread.sleep(2000);
+
+    ExternalView externalView = _gSetupTool.getClusterManagementTool()
+        .getResourceExternalView(CLUSTER_NAME, 
WorkflowGenerator.DEFAULT_TGT_DB);
+    Assert.assertEquals(externalView.getRecord().getMapFields().size(), 
_numParitions);
+    int numOfFirstHost = 0;
+    for (Map<String, String> stateMap : 
externalView.getRecord().getMapFields().values()) {
+      if (stateMap.keySet().contains(_participants[0].getInstanceName())) {
+        numOfFirstHost++;
+      }
+      
Assert.assertTrue(!stateMap.keySet().contains(_participants[1].getInstanceName()));
+    }
+    Assert.assertTrue(numOfFirstHost > 0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME,
+        Arrays.asList(_participants[0].getInstanceName(), 
_participants[1].getInstanceName()),
+        true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
index 2de3a10..04c3ed4 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/TestZkCallbackHandlerLeak.java
@@ -90,10 +90,10 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (7 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -115,7 +115,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
     // printHandlers(participantManagerToExpire);
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManagerToExpire.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 10,
+    Assert.assertEquals(controllerHandlerNb, 11,
         "HelixController should have 10 (5+2n) callback handlers for 2 (n) 
participant");
     Assert.assertEquals(particHandlerNb, 1,
         "HelixParticipant should have 1 (msg->HelixTaskExecutor) callback 
handlers");
@@ -145,10 +145,10 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (7 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers 
after session expiry.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers 
after session expiry.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -241,7 +241,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, 10,
+    Assert.assertEquals(controllerHandlerNb, 11,
         "HelixController should have 10 (6+2n) callback handlers for 2 
participant, but was "
             + controllerHandlerNb + ", " + printHandlers(controller));
     Assert.assertEquals(particHandlerNb, 1,
@@ -273,10 +273,10 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (7 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers 
after session expiry.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers 
after session expiry.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
index 176fe33..a30de78 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestConsecutiveZkSessionExpiry.java
@@ -244,7 +244,7 @@ public class TestConsecutiveZkSessionExpiry extends 
ZkUnitTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            1,
+            2,
             "Distributed controller should have 2 handler (message) after lose 
leadership, but was "
                 + handlers.size());
 

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
index ea00888..142cd1f 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -147,7 +147,7 @@ public class TestDistributedControllerManager extends 
ZkIntegrationTestBase {
     Assert
         .assertEquals(
             handlers.size(),
-            1,
+            2,
             "Distributed controller should have 1 handler (message) after lose 
leadership, but was "
                 + handlers.size());
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
index 00f95b0..ec2dd1b 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/manager/TestZkCallbackHandlerLeak.java
@@ -96,7 +96,7 @@ public class TestZkCallbackHandlerLeak extends ZkUnitTestBase 
{
         return watchPaths.size() == (7 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers.");
 
     // check participant zk-watchers
     final MockParticipantManager participantManagerToExpire = participants[0];
@@ -152,7 +152,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
         return watchPaths.size() == (7 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers 
after session expiry.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers 
after session expiry.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {
@@ -237,7 +237,7 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
     int controllerHandlerNb = controller.getHandlers().size();
     int particHandlerNb = participantManager.getHandlers().size();
-    Assert.assertEquals(controllerHandlerNb, (6 + 2 * n),
+    Assert.assertEquals(controllerHandlerNb, (7 + 2 * n),
         "HelixController should have 9 (5+2n) callback handlers for 2 
participant, but was "
             + controllerHandlerNb + ", " + 
TestHelper.printHandlers(controller));
     Assert.assertEquals(particHandlerNb, 1,
@@ -270,10 +270,10 @@ public class TestZkCallbackHandlerLeak extends 
ZkUnitTestBase {
 
         // controller should have 5 + 2n + m + (m+2)n zk-watchers
         // where n is number of nodes and m is number of resources
-        return watchPaths.size() == (7 + 5 * n);
+        return watchPaths.size() == (8 + 5 * n);
       }
     }, 500);
-    Assert.assertTrue(result, "Controller should have 7 + 5*n zk-watchers 
after session expiry.");
+    Assert.assertTrue(result, "Controller should have 8 + 5*n zk-watchers 
after session expiry.");
 
     // check participant zk-watchers
     result = TestHelper.verify(new TestHelper.Verifier() {

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java 
b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
index beeb3cf..8679007 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
@@ -219,6 +219,11 @@ public class MockHelixAdmin implements HelixAdmin {
 
   }
 
+  @Override public void enableInstance(String clusterName, List<String> 
instances,
+      boolean enabled) {
+
+  }
+
   @Override public void enableResource(String clusterName, String 
resourceName, boolean enabled) {
 
   }
@@ -366,6 +371,15 @@ public class MockHelixAdmin implements HelixAdmin {
 
   }
 
+  @Override
+  public Map<String, String> getBatchDisabledInstances(String clusterName) {
+    return null;
+  }
+
+  @Override public List<String> getInstancesByDomain(String clusterName, 
String domain) {
+    return null;
+  }
+
   @Override public void close() {
 
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java 
b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
index 4109fa3..e8c7b0d 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TaskSynchronizedTestBase.java
@@ -110,6 +110,7 @@ public class TaskSynchronizedTestBase extends 
ZkIntegrationTestBase {
   }
 
   protected void setupParticipants() {
+    _participants = new MockParticipantManager[_numNodes];
     for (int i = 0; i < _numNodes; i++) {
       String storageNodeName = PARTICIPANT_PREFIX + "_" + (_startPort + i);
       _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);

http://git-wip-us.apache.org/repos/asf/helix/blob/31cec911/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
----------------------------------------------------------------------
diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
index 4fe8060..5b25c3e 100644
--- 
a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
+++ 
b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/InstanceAccessor.java
@@ -36,6 +36,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.HealthStat;
@@ -91,13 +92,14 @@ public class InstanceAccessor extends AbstractResource {
     }
 
     List<String> liveInstances = 
accessor.getChildNames(accessor.keyBuilder().liveInstances());
-
+    ClusterConfig clusterConfig = 
accessor.getProperty(accessor.keyBuilder().clusterConfig());
 
     for (String instanceName : instances) {
       InstanceConfig instanceConfig =
           
accessor.getProperty(accessor.keyBuilder().instanceConfig(instanceName));
       if (instanceConfig != null) {
-        if (!instanceConfig.getInstanceEnabled()) {
+        if (!instanceConfig.getInstanceEnabled() || 
(clusterConfig.getDisabledInstances() != null
+            && 
clusterConfig.getDisabledInstances().containsKey(instanceName))) {
           disabledNode.add(JsonNodeFactory.instance.textNode(instanceName));
         }
 

Reply via email to