This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7566790  Create leadControllerResource in helix cluster (#4047)
7566790 is described below

commit 75667900a7ad4739069aa2220b778ab7d7b12a6b
Author: Jialiang Li <[email protected]>
AuthorDate: Thu Jun 6 22:18:16 2019 -0700

    Create leadControllerResource in helix cluster (#4047)
    
    * Create leadControllerResource in Helix cluster
    
    * Separate Helix cluster creation logic
    
    * Enabled delay rebalance on lead controller resource, set delay time to 5 
mins, minActiveReplicas to 0, numReplicas to 1.
---
 .../apache/pinot/common/config/TagNameUtils.java   |  30 +--
 .../apache/pinot/common/utils/CommonConstants.java |  10 +
 .../apache/pinot/controller/ControllerStarter.java |   6 +-
 .../helix/core/PinotHelixResourceManager.java      |  76 +++---
 .../helix/core/util/HelixSetupUtils.java           | 286 ++++++++++++++-------
 .../controller/helix/PinotControllerModeTest.java  | 113 +++++++-
 .../helix/core/PinotHelixResourceManagerTest.java  | 180 ++++++++++---
 7 files changed, 498 insertions(+), 203 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
index f1ab25e..a22e4fa 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.common.config;
 
-import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.utils.ServerType;
 import org.apache.pinot.common.utils.TenantRole;
 
@@ -38,25 +37,20 @@ public class TagNameUtils {
     return tenantName + "_" + TenantRole.BROKER.toString();
   }
 
-  public static boolean hasValidServerTagSuffix(String tagName) {
-    if (tagName.endsWith(ServerType.REALTIME.toString()) || 
tagName.endsWith(ServerType.OFFLINE.toString())) {
-      return true;
-    }
-    return false;
+  public static boolean isServerTag(String tagName) {
+    return isOfflineServerTag(tagName) || isRealtimeServerTag(tagName);
   }
 
-  public static TenantRole getTenantRoleFromTag(String tagName)
-      throws InvalidConfigException {
-    if (tagName.endsWith(ServerType.REALTIME.toString())) {
-      return TenantRole.SERVER;
-    }
-    if (tagName.endsWith(ServerType.OFFLINE.toString())) {
-      return TenantRole.SERVER;
-    }
-    if (tagName.endsWith(TenantRole.BROKER.toString())) {
-      return TenantRole.BROKER;
-    }
-    throw new InvalidConfigException("Cannot identify tenant type from tag 
name : " + tagName);
+  public static boolean isOfflineServerTag(String tagName) {
+    return tagName.endsWith(ServerType.OFFLINE.toString());
+  }
+
+  public static boolean isRealtimeServerTag(String tagName) {
+    return tagName.endsWith(ServerType.REALTIME.toString());
+  }
+
+  public static boolean isBrokerTag(String tagName) {
+    return tagName.endsWith(TenantRole.BROKER.toString());
   }
 
   public static String getTagFromTenantAndServerType(String tenantName, 
ServerType type) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 915e96e..8132cdb 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -37,8 +37,18 @@ public class CommonConstants {
 
     public static final String SERVER_INSTANCE_TYPE = "server";
     public static final String BROKER_INSTANCE_TYPE = "broker";
+    public static final String CONTROLLER_INSTANCE_TYPE = "controller";
 
     public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
+    public static final String LEAD_CONTROLLER_RESOURCE_NAME = 
"leadControllerResource";
+
+    // More information on why these numbers are set can be found in the 
following doc:
+    // 
https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot
+    public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE = 
24;
+    public static final int LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT = 1;
+    public static final boolean ENABLE_DELAY_REBALANCE = true;
+    public static final int MIN_ACTIVE_REPLICAS = 0;
+    public static final long REBALANCE_DELAY_MS = 300_000L; // 5 minutes.
 
     public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
     public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index f57d21f..c313616 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -219,8 +219,7 @@ public class ControllerStarter {
   private void setUpHelixController() {
     // Register and connect instance as Helix controller.
     LOGGER.info("Starting Helix controller");
-    _helixControllerManager = HelixSetupUtils
-        .setup(_helixClusterName, _helixZkURL, _instanceId, 
_isUpdateStateModel, _enableBatchMessageMode);
+    _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, 
_helixZkURL, _instanceId);
 
     // Emit helix controller metrics
     
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -244,6 +243,9 @@ public class ControllerStarter {
       throw new RuntimeException("Pinot only controller currently isn't 
supported in production yet.");
     }
 
+    // Set up Pinot cluster in Helix
+    HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, 
_isUpdateStateModel, _enableBatchMessageMode);
+
     // Start all components
     initPinotFSFactory();
     initSegmentFetcherFactory();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index aaa9ea8..877458d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,6 +53,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -60,6 +61,7 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.IndexingConfig;
 import org.apache.pinot.common.config.OfflineTagConfig;
@@ -92,7 +94,6 @@ import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnli
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.SchemaUtils;
-import org.apache.pinot.common.utils.TenantRole;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import 
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
 import org.apache.pinot.common.utils.retry.RetryPolicies;
@@ -157,15 +158,10 @@ public class PinotHelixResourceManager {
     _allowHLCTables = allowHLCTables;
   }
 
-  public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String 
helixClusterName,
-      @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
-    this(zkURL, helixClusterName, controllerInstanceId, dataDir, 
DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
-        true, true);
-  }
-
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
     this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
-        controllerConf.getControllerHost() + "_" + 
controllerConf.getControllerPort(), controllerConf.getDataDir(),
+        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + 
controllerConf.getControllerHost() + "_"
+            + controllerConf.getControllerPort(), controllerConf.getDataDir(),
         controllerConf.getExternalViewOnlineToOfflineTimeout(), 
controllerConf.tenantIsolationEnabled(),
         controllerConf.getEnableBatchMessageMode(), 
controllerConf.getHLCTablesAllowed());
   }
@@ -185,6 +181,10 @@ public class PinotHelixResourceManager {
     _cacheInstanceConfigsDataAccessor =
         new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) 
baseDataAccessor, instanceConfigs, null,
             Collections.singletonList(instanceConfigs));
+
+    // Add instance group tag for controller
+    addInstanceGroupTagIfNeeded();
+
     _keyBuilder = _helixDataAccessor.keyBuilder();
     _segmentDeletionManager = new SegmentDeletionManager(_dataDir, 
_helixAdmin, _helixClusterName, _propertyStore);
     ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore, 
_isSingleTenantCluster);
@@ -259,9 +259,13 @@ public class PinotHelixResourceManager {
    * Register and connect to Helix cluster as PARTICIPANT role.
    */
   private HelixManager registerAndConnectAsHelixParticipant() {
-    HelixManager helixManager = HelixManagerFactory
-        .getZKHelixManager(_helixClusterName, 
CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
-            InstanceType.PARTICIPANT, _helixZkURL);
+    HelixManager helixManager =
+        HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, _helixZkURL);
+
+    // Registers Master-Slave state model to state machine engine, which is 
for calculating participant assignment in lead controller resource.
+    helixManager.getStateMachineEngine()
+        .registerStateModelFactory(MasterSlaveSMD.name, new 
MasterSlaveStateModelFactory());
+
     try {
       helixManager.connect();
       return helixManager;
@@ -274,6 +278,20 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Add instance group tag for controller so that pinot controller can be 
assigned to lead controller resource.
+   */
+  private void addInstanceGroupTagIfNeeded() {
+    InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
+    if 
(!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) {
+      LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", 
_instanceId,
+          CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+      accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId), 
instanceConfig);
+    }
+  }
+
+  /**
    * Instance related APIs
    */
 
@@ -835,20 +853,8 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInCluster) {
       InstanceConfig config = 
_helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
       for (String tag : config.getTags()) {
-        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
-            .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
-            .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
-          continue;
-        }
-        TenantRole tenantRole;
-        try {
-          tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
-          if (tenantRole == TenantRole.BROKER) {
-            tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
-          }
-        } catch (InvalidConfigException e) {
-          LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName, 
tag);
-          continue;
+        if (TagNameUtils.isBrokerTag(tag)) {
+          tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
         }
       }
     }
@@ -861,20 +867,8 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInCluster) {
       InstanceConfig config = 
_helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
       for (String tag : config.getTags()) {
-        if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
-            .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
-            .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
-          continue;
-        }
-        TenantRole tenantRole;
-        try {
-          tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
-          if (tenantRole == TenantRole.SERVER) {
-            tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
-          }
-        } catch (InvalidConfigException e) {
-          LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName, 
tag);
-          continue;
+        if (TagNameUtils.isServerTag(tag)) {
+          tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
         }
       }
     }
@@ -1176,7 +1170,7 @@ public class PinotHelixResourceManager {
     if (tagOverrideConfig != null) {
       String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
       if (realtimeConsumingTag != null) {
-        if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
+        if (!TagNameUtils.isServerTag(realtimeConsumingTag)) {
           throw new InvalidTableConfigException(
               "Invalid realtime consuming tag: " + realtimeConsumingTag + " 
for table " + tableNameWithType
                   + ". Must have suffix _REALTIME or _OFFLINE");
@@ -1190,7 +1184,7 @@ public class PinotHelixResourceManager {
 
       String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
       if (realtimeCompletedTag != null) {
-        if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
+        if (!TagNameUtils.isServerTag(realtimeCompletedTag)) {
           throw new InvalidTableConfigException(
               "Invalid realtime completed tag: " + realtimeCompletedTag + " 
for table " + tableNameWithType
                   + ". Must have suffix _REALTIME or _OFFLINE");
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index b8707f7..f65a8ce 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.controller.helix.core.util;
 
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,10 +29,10 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.controller.HelixControllerMain;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZKHelixManager;
@@ -41,6 +43,7 @@ import org.apache.helix.messaging.handling.HelixTaskExecutor;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -53,6 +56,11 @@ import 
org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.ENABLE_DELAY_REBALANCE;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.MIN_ACTIVE_REPLICAS;
+import static 
org.apache.pinot.common.utils.CommonConstants.Helix.REBALANCE_DELAY_MS;
+
 
 /**
  * HelixSetupUtils handles how to create or get a helixCluster in controller.
@@ -60,119 +68,205 @@ import org.slf4j.LoggerFactory;
  *
  */
 public class HelixSetupUtils {
-
   private static final Logger LOGGER = 
LoggerFactory.getLogger(HelixSetupUtils.class);
 
   public static synchronized HelixManager setup(String helixClusterName, 
String zkPath,
-      String pinotControllerInstanceId, boolean isUpdateStateModel, boolean 
enableBatchMessageMode) {
+      String helixControllerInstanceId) {
+    setupHelixCluster(helixClusterName, zkPath);
 
-    try {
-      createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel, 
enableBatchMessageMode);
-    } catch (final Exception e) {
-      LOGGER.error("Caught exception", e);
-      return null;
-    }
+    return startHelixControllerInStandadloneMode(helixClusterName, zkPath, 
helixControllerInstanceId);
+  }
 
-    try {
-      return startHelixControllerInStandadloneMode(helixClusterName, zkPath, 
pinotControllerInstanceId);
-    } catch (final Exception e) {
-      LOGGER.error("Caught exception", e);
-      return null;
+  /**
+   * Set up a brand new Helix cluster if it doesn't exist.
+   */
+  public static void setupHelixCluster(String helixClusterName, String zkPath) 
{
+    final HelixAdmin admin = new ZKHelixAdmin(zkPath);
+    if (admin.getClusters().contains(helixClusterName)) {
+      LOGGER.info("Helix cluster: {} already exists", helixClusterName);
+      return;
     }
+    LOGGER.info("Creating a new Helix cluster: {}", helixClusterName);
+    admin.addCluster(helixClusterName, false);
+    LOGGER.info("New Cluster: {} created.", helixClusterName);
+  }
+
+  private static HelixManager startHelixControllerInStandadloneMode(String 
helixClusterName, String zkUrl,
+      String pinotControllerInstanceId) {
+    LOGGER.info("Starting Helix Standalone Controller ... ");
+    return HelixControllerMain
+        .startHelixController(zkUrl, helixClusterName, 
pinotControllerInstanceId, HelixControllerMain.STANDALONE);
   }
 
-  public static void createHelixClusterIfNeeded(String helixClusterName, 
String zkPath, boolean isUpdateStateModel,
+  /**
+   * Customizes existing Helix cluster to run Pinot components.
+   */
+  public static void setupPinotCluster(String helixClusterName, String zkPath, 
boolean isUpdateStateModel,
       boolean enableBatchMessageMode) {
     final HelixAdmin admin = new ZKHelixAdmin(zkPath);
-    final String segmentStateModelName =
-        
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    Preconditions.checkState(admin.getClusters().contains(helixClusterName),
+        String.format("Helix cluster: %s hasn't been set up", 
helixClusterName));
 
-    if (admin.getClusters().contains(helixClusterName)) {
-      LOGGER.info("cluster already exists 
********************************************* ");
-      if (isUpdateStateModel) {
-        final StateModelDefinition curStateModelDef = 
admin.getStateModelDef(helixClusterName, segmentStateModelName);
-        List<String> states = curStateModelDef.getStatesPriorityList();
-        if 
(states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE))
 {
-          LOGGER.info("State model {} already updated to contain CONSUMING 
state", segmentStateModelName);
-          return;
-        } else {
-          LOGGER.info("Updating {} to add states for low level consumers", 
segmentStateModelName);
-          StateModelDefinition newStateModelDef =
-              
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
-          ZkClient zkClient = new ZkClient(zkPath);
-          
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC,
 TimeUnit.SECONDS);
-          zkClient.setZkSerializer(new ZNRecordSerializer());
-          HelixDataAccessor accessor =
-              new ZKHelixDataAccessor(helixClusterName, new 
ZkBaseDataAccessor<ZNRecord>(zkClient));
-          PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-          
accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), 
newStateModelDef);
-          LOGGER.info("Completed updating statemodel {}", 
segmentStateModelName);
-          zkClient.close();
-        }
-      }
-      return;
-    }
+    // Ensure auto join.
+    ensureAutoJoin(helixClusterName, admin);
 
-    LOGGER.info("Creating a new cluster, as the helix cluster : " + 
helixClusterName
-        + " was not found ********************************************* ");
-    admin.addCluster(helixClusterName, false);
+    // Add segment state model definition if needed
+    addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, 
isUpdateStateModel);
+
+    // Add broker resource if needed
+    createBrokerResourceIfNeeded(helixClusterName, admin, 
enableBatchMessageMode);
 
-    LOGGER.info("Enable auto join.");
+    // Add lead controller resource if needed
+    createLeadControllerResourceIfNeeded(helixClusterName, admin, 
enableBatchMessageMode);
+
+    // Init property store if needed
+    initPropertyStoreIfNeeded(helixClusterName, zkPath);
+  }
+
+  private static void ensureAutoJoin(String helixClusterName, HelixAdmin 
admin) {
     final HelixConfigScope scope =
         new 
HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
+    String stateTransitionMaxThreads = MessageType.STATE_TRANSITION + "." + 
HelixTaskExecutor.MAX_THREADS;
+    List<String> keys = new ArrayList<>();
+    keys.add(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN);
+    keys.add(stateTransitionMaxThreads);
+    Map<String, String> configs = admin.getConfig(scope, keys);
+    if 
(!Boolean.TRUE.toString().equals(configs.get(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)))
 {
+      configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
Boolean.TRUE.toString());
+    }
+    if (!Integer.toString(1).equals(configs.get(stateTransitionMaxThreads))) {
+      configs.put(stateTransitionMaxThreads, String.valueOf(1));
+    }
+    admin.setConfig(scope, configs);
+  }
 
-    final Map<String, String> props = new HashMap<String, String>();
-    props.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, 
String.valueOf(true));
-    //we need only one segment to be loaded at a time
-    props.put(MessageType.STATE_TRANSITION + "." + 
HelixTaskExecutor.MAX_THREADS, String.valueOf(1));
-
-    admin.setConfig(scope, props);
-
-    LOGGER.info(
-        "Adding state model {} (with CONSUMED state) generated using {} 
**********************************************",
-        segmentStateModelName, 
PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-
-    // If this is a fresh cluster we are creating, then the cluster will see 
the CONSUMING state in the
-    // state model. But then the servers will never be asked to go to that 
STATE (whether they have the code
-    // to handle it or not) unil we complete the feature using low-level 
consumers and turn the feature on.
-    admin.addStateModelDef(helixClusterName, segmentStateModelName,
-        
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
-    LOGGER.info("Adding state model definition named : "
-        + 
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL
-        + " generated using : " + 
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()
-        + " ********************************************** ");
-
-    admin.addStateModelDef(helixClusterName,
-        
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
-        
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
-    LOGGER.info("Adding empty ideal state for Broker!");
-    HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
-        helixClusterName, admin);
-    IdealState idealState = PinotTableIdealStateBuilder
-        .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, 
enableBatchMessageMode);
-    admin.setResourceIdealState(helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
-    initPropertyStorePath(helixClusterName, zkPath);
-    LOGGER.info("New Cluster setup completed... 
********************************************** ");
+  private static void addSegmentStateModelDefinitionIfNeeded(String 
helixClusterName, HelixAdmin admin, String zkPath,
+      boolean isUpdateStateModel) {
+    final String segmentStateModelName =
+        
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+    StateModelDefinition stateModelDefinition = 
admin.getStateModelDef(helixClusterName, segmentStateModelName);
+    if (stateModelDefinition == null) {
+      LOGGER.info("Adding state model {} (with CONSUMED state) generated using 
{}", segmentStateModelName,
+          PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
+      admin.addStateModelDef(helixClusterName, segmentStateModelName,
+          
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    } else if (isUpdateStateModel) {
+      final StateModelDefinition curStateModelDef = 
admin.getStateModelDef(helixClusterName, segmentStateModelName);
+      List<String> states = curStateModelDef.getStatesPriorityList();
+      if 
(states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE))
 {
+        LOGGER.info("State model {} already updated to contain CONSUMING 
state", segmentStateModelName);
+      } else {
+        LOGGER.info("Updating {} to add states for low level consumers", 
segmentStateModelName);
+        StateModelDefinition newStateModelDef =
+            
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
+        ZkClient zkClient = new ZkClient(zkPath);
+        
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC,
 TimeUnit.SECONDS);
+        zkClient.setZkSerializer(new ZNRecordSerializer());
+        HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, 
new ZkBaseDataAccessor<>(zkClient));
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+        accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), 
newStateModelDef);
+        LOGGER.info("Completed updating state model {}", 
segmentStateModelName);
+        zkClient.close();
+      }
+    }
   }
 
-  private static void initPropertyStorePath(String helixClusterName, String 
zkPath) {
-    String propertyStorePath = 
PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, helixClusterName);
-    ZkHelixPropertyStore<ZNRecord> propertyStore =
-        new ZkHelixPropertyStore<ZNRecord>(zkPath, new ZNRecordSerializer(), 
propertyStorePath);
-    propertyStore.create("/CONFIGS", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    propertyStore.create("/SCHEMAS", new ZNRecord(""), 
AccessOption.PERSISTENT);
-    propertyStore.create("/SEGMENTS", new ZNRecord(""), 
AccessOption.PERSISTENT);
+  private static void createBrokerResourceIfNeeded(String helixClusterName, 
HelixAdmin admin,
+      boolean enableBatchMessageMode) {
+    // Add broker resource online offline state model definition if needed
+    StateModelDefinition brokerResourceStateModelDefinition = 
admin.getStateModelDef(helixClusterName,
+        
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
+    if (brokerResourceStateModelDefinition == null) {
+      LOGGER.info("Adding state model definition named : {} generated using : 
{}",
+          
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+          
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
+      admin.addStateModelDef(helixClusterName,
+          
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+          
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+    }
+
+    // Create broker resource if needed.
+    IdealState brokerResourceIdealState =
+        admin.getResourceIdealState(helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    if (brokerResourceIdealState == null) {
+      LOGGER.info("Adding empty ideal state for Broker!");
+      HelixHelper
+          .updateResourceConfigsFor(new HashMap<>(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
+              admin);
+      IdealState idealState = PinotTableIdealStateBuilder
+          .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, 
enableBatchMessageMode);
+      admin.setResourceIdealState(helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+    }
   }
 
-  private static HelixManager startHelixControllerInStandadloneMode(String 
helixClusterName, String zkUrl,
-      String pinotControllerInstanceId) {
-    LOGGER.info("Starting Helix Standalone Controller ... ");
-    return HelixControllerMain
-        .startHelixController(zkUrl, helixClusterName, 
pinotControllerInstanceId, HelixControllerMain.STANDALONE);
+  private static void createLeadControllerResourceIfNeeded(String 
helixClusterName, HelixAdmin admin,
+      boolean enableBatchMessageMode) {
+    StateModelDefinition masterSlaveStateModelDefinition =
+        admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
+    if (masterSlaveStateModelDefinition == null) {
+      LOGGER.info("Adding state model definition named : {} generated using : 
{}", MasterSlaveSMD.name,
+          MasterSlaveSMD.class.toString());
+      admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, 
MasterSlaveSMD.build());
+    }
+
+    IdealState leadControllerResourceIdealState =
+        admin.getResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME);
+    if (leadControllerResourceIdealState == null) {
+      LOGGER.info("Cluster {} doesn't contain {}. Creating one.", 
helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+      HelixHelper.updateResourceConfigsFor(new HashMap<>(), 
LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
+      // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
+      admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+          
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, 
MasterSlaveSMD.name,
+          IdealState.RebalanceMode.FULL_AUTO.toString(), 
CrushEdRebalanceStrategy.class.getName());
+
+      // Set instance group tag for lead controller resource.
+      IdealState leadControllerIdealState =
+          admin.getResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME);
+      
leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+      leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+      // The below config guarantees if active number of replicas is no less 
than minimum active replica, there will not be partition movements happened.
+      // Set min active replicas to 0 and rebalance delay to 5 minutes so that 
if any master goes offline, Helix controller waits at most 5 minutes and then 
re-calculate the participant assignment.
+      // This delay is helpful when periodic tasks are running and we don't 
want them to be re-run too frequently.
+      // Plus, if virtual id is applied to controller hosts, swapping hosts 
would be easy as new hosts can use the same virtual id and it takes least 
effort to change the configs.
+      leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+      leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+      
leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
+      admin.setResourceIdealState(helixClusterName, 
LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
+
+      // Explicitly disable this resource when creating this new resource.
+      // When all the controllers are running the code with the logic to 
handle this resource, it can be enabled for backward compatibility.
+      // In the next major release, we can enable this resource by default, so 
that all the controller logic can be separated.
+      admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, 
false);
+
+      LOGGER.info("Re-balance lead controller resource with replicas: {}",
+          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+      // Set it to 1 so that there's only 1 instance (i.e. master) shown in 
every partitions.
+      admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+          CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+    }
+  }
+
+  private static void initPropertyStoreIfNeeded(String helixClusterName, 
String zkPath) {
+    String propertyStorePath = 
PropertyPathBuilder.propertyStore(helixClusterName);
+    ZkHelixPropertyStore<ZNRecord> propertyStore =
+        new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), 
propertyStorePath);
+    if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS", new ZNRecord(""), 
AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), 
AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), 
AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
+      propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), 
AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
+      propertyStore.create("/SCHEMAS", new ZNRecord(""), 
AccessOption.PERSISTENT);
+    }
+    if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
+      propertyStore.create("/SEGMENTS", new ZNRecord(""), 
AccessOption.PERSISTENT);
+    }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index f3da797..d91e612 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -18,9 +18,14 @@
  */
 package org.apache.pinot.controller.helix;
 
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerStarter;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -62,18 +67,37 @@ public class PinotControllerModeTest extends ControllerTest 
{
     config.setControllerMode(ControllerConf.ControllerMode.DUAL);
     
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
 
+    // Helix cluster will be set up when starting the first controller.
     startController(config);
     TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), 
TIMEOUT_IN_MS,
         "Failed to start " + config.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
     Assert.assertEquals(_controllerStarter.getControllerMode(), 
ControllerConf.ControllerMode.DUAL);
 
+    // Enable the lead controller resource.
+    _helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+    // Starting a second dual-mode controller. Helix cluster has already been 
set up.
+    ControllerConf controllerConfig = getDefaultControllerConfiguration();
+    controllerConfig.setHelixClusterName(getHelixClusterName());
+    controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
+    controllerConfig.setControllerPort(
+        Integer.toString(Integer.parseInt(this.config.getControllerPort()) + 
controllerPortOffset++));
+
+    ControllerStarter secondDualModeController = new 
TestOnlyControllerStarter(controllerConfig);
+    secondDualModeController.start();
+    TestUtils
+        .waitForCondition(aVoid -> 
secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
+            TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " 
controller in " + TIMEOUT_IN_MS + "ms.");
+    Assert.assertEquals(secondDualModeController.getControllerMode(), 
ControllerConf.ControllerMode.DUAL);
+
+    secondDualModeController.stop();
     stopController();
     _controllerStarter = null;
   }
 
   // TODO: enable it after removing ControllerLeadershipManager which requires 
both CONTROLLER and PARTICIPANT
   //       HelixManager
-  @Test (enabled = false)
+  @Test(enabled = false)
   public void testPinotOnlyController()
       throws Exception {
     config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
@@ -95,27 +119,94 @@ public class PinotControllerModeTest extends 
ControllerTest {
     ControllerStarter helixControllerStarter = new ControllerStarter(config2);
     helixControllerStarter.start();
     HelixManager helixControllerManager = 
helixControllerStarter.getHelixControllerManager();
+    HelixAdmin helixAdmin = helixControllerManager.getClusterManagmentTool();
     TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), 
TIMEOUT_IN_MS,
         "Failed to start " + config2.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
 
+    // Enable the lead controller resource.
+    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
     // Starting a pinot only controller.
-    startController(config, false);
-    TestUtils.waitForCondition(aVoid -> 
_helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
-        "Failed to start " + config.getControllerMode() + " controller in " + 
TIMEOUT_IN_MS + "ms.");
-    Assert.assertEquals(_controllerStarter.getControllerMode(), 
ControllerConf.ControllerMode.PINOT_ONLY);
+    ControllerConf config3 = getDefaultControllerConfiguration();
+    config3.setHelixClusterName(getHelixClusterName());
+    config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    
config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
+
+    ControllerStarter firstPinotOnlyController = new 
TestOnlyControllerStarter(config3);
+    firstPinotOnlyController.start();
+    PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
+        firstPinotOnlyController.getHelixResourceManager();
+
+    TestUtils.waitForCondition(aVoid -> 
firstPinotOnlyPinotHelixResourceManager.getHelixZkManager().isConnected(),
+        TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " 
controller in " + TIMEOUT_IN_MS + "ms.");
+    Assert.assertEquals(firstPinotOnlyController.getControllerMode(), 
ControllerConf.ControllerMode.PINOT_ONLY);
 
     // Start a second Pinot only controller.
-    
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
-    ControllerStarter secondControllerStarter = new 
TestOnlyControllerStarter(config);
+    ControllerConf config4 = getDefaultControllerConfiguration();
+    config4.setHelixClusterName(getHelixClusterName());
+    config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+    
config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
 + controllerPortOffset++));
 
+    ControllerStarter secondControllerStarter = new 
TestOnlyControllerStarter(config4);
     secondControllerStarter.start();
     // Two controller instances assigned to cluster.
-    TestUtils.waitForCondition(aVoid -> 
_helixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
-        "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + 
"ms.");
-
+    TestUtils
+        .waitForCondition(aVoid -> 
firstPinotOnlyPinotHelixResourceManager.getAllInstances().size() == 2, 
TIMEOUT_IN_MS,
+            "Failed to start the 2nd pinot only controller in " + 
TIMEOUT_IN_MS + "ms.");
+
+    // Disable lead controller resource, all the participants are in offline 
state (from slave state).
+    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, false);
+
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = 
firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+          if (!"OFFLINE".equals(entry.getValue())) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to mark all the participants offline in " + 
TIMEOUT_IN_MS + "ms.");
+
+    // Re-enable lead controller resource, all the participants are in healthy 
state (either master or slave).
+    helixAdmin.enableResource(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+    // Shutdown one controller, it will be removed from external view of lead 
controller resource.
     secondControllerStarter.stop();
 
-    stopController();
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = 
firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        // Only 1 participant left in each partition, which will become the 
master.
+        for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+          if (!"MASTER".equals(entry.getValue())) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to mark all the participants MASTER in " + 
TIMEOUT_IN_MS + "ms.");
+
+    // Shutdown the only one controller left, the partition map should be 
empty.
+    firstPinotOnlyController.stop();
+    TestUtils.waitForCondition(aVoid -> {
+      ExternalView leadControllerResourceExternalView = helixAdmin
+          .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+        Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+        // There's no participant in all the partitions.
+        if (!stateMap.isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    }, TIMEOUT_IN_MS, "Failed to have all the partitions empty in " + 
TIMEOUT_IN_MS + "ms.");
+
     _controllerStarter = null;
     helixControllerStarter.stop();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 573ab90..7d9f8d9 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -19,13 +19,23 @@
 package org.apache.pinot.controller.helix.core;
 
 import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
+import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -45,12 +55,14 @@ import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
 import static 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
 
 
@@ -64,34 +76,34 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
   private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
   private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
+  private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
+  private static final long TIMEOUT_IN_MS = 10_000L;
 
   private final String _helixClusterName = getHelixClusterName();
 
   @BeforeClass
-  public void setUp()
-      throws Exception {
+  public void setUp() throws Exception {
     startZk();
     ControllerConf config = getDefaultControllerConfiguration();
     config.setTenantIsolationEnabled(false);
     startController(config);
 
-    ControllerRequestBuilderUtil
-        .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
-            false);
-    ControllerRequestBuilderUtil
-        .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, 
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
-            BASE_SERVER_ADMIN_PORT);
+    
ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
+        ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false);
+    
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
 ZkStarter.DEFAULT_ZK_STR,
+        NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT);
 
     // Create server tenant on all Servers
-    Tenant serverTenant =
-        new 
Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(NUM_INSTANCES)
-            .build();
+    Tenant serverTenant = new 
Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER)
+        .setOfflineInstances(NUM_INSTANCES)
+        .build();
     _helixResourceManager.createServerTenant(serverTenant);
+
+    _helixAdmin.enableResource(getHelixClusterName(), 
LEAD_CONTROLLER_RESOURCE_NAME, true);
   }
 
   @Test
-  public void testGetInstanceEndpoints()
-      throws InvalidConfigException {
+  public void testGetInstanceEndpoints() throws InvalidConfigException {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     BiMap<String, String> endpoints = 
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
     for (int i = 0; i < NUM_INSTANCES; i++) {
@@ -100,8 +112,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testGetInstanceConfigs()
-      throws Exception {
+  public void testGetInstanceConfigs() throws Exception {
     Set<String> servers = 
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
     for (String server : servers) {
       InstanceConfig cachedInstanceConfig = 
_helixResourceManager.getHelixInstanceConfig(server);
@@ -118,8 +129,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     zkClient.close();
   }
 
-  private void modifyExistingInstanceConfig(ZkClient zkClient)
-      throws InterruptedException {
+  private void modifyExistingInstanceConfig(ZkClient zkClient) throws 
InterruptedException {
     String instanceName = "Server_localhost_" + new 
Random().nextInt(NUM_INSTANCES);
     String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
     Assert.assertTrue(zkClient.exists(instanceConfigPath));
@@ -150,8 +160,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     zkClient.writeData(instanceConfigPath, znRecord);
   }
 
-  private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
-      throws Exception {
+  private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws 
Exception {
     int biggerRandomNumber = NUM_INSTANCES + new 
Random().nextInt(NUM_INSTANCES);
     String instanceName = "Server_localhost_" + 
String.valueOf(biggerRandomNumber);
     String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
@@ -184,17 +193,18 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testRebuildBrokerResourceFromHelixTags()
-      throws Exception {
+  public void testRebuildBrokerResourceFromHelixTags() throws Exception {
     // Create broker tenant on 3 Brokers
     Tenant brokerTenant =
         new 
Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(3).build();
     _helixResourceManager.createBrokerTenant(brokerTenant);
 
     // Create the table
-    TableConfig tableConfig =
-        new 
TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
-            
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
+    TableConfig tableConfig = new 
TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME)
+        .setNumReplicas(3)
+        .setBrokerTenant(BROKER_TENANT_NAME)
+        .setServerTenant(SERVER_TENANT_NAME)
+        .build();
     _helixResourceManager.addTable(tableConfig);
 
     // Check that the BrokerResource ideal state has 3 Brokers assigned to the 
table
@@ -204,8 +214,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
 
     // Untag all Brokers assigned to broker tenant
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
 
@@ -228,8 +238,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
   }
 
   @Test
-  public void testRetrieveMetadata()
-      throws Exception {
+  public void testRetrieveMetadata() throws Exception {
     String segmentName = "testSegment";
 
     // Test retrieving OFFLINE segment ZK metadata
@@ -263,7 +272,8 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     }
   }
 
-  @Test void testRetrieveTenantNames() {
+  @Test
+  void testRetrieveTenantNames() {
     // Create broker tenant on 1 Broker
     Tenant brokerTenant =
         new 
Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(1).build();
@@ -366,8 +376,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     }
 
     // Create broker tenant on 3 Brokers
-    Tenant brokerTenant =
-        new 
Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
+    Tenant brokerTenant = new 
Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
     _helixResourceManager.createBrokerTenant(brokerTenant);
 
     // empty server instances list
@@ -452,18 +461,119 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     }
 
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(brokerTag));
+      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(brokerTag));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }
 
+  @Test
+  public void testLeadControllerResource()
+      throws Exception {
+    IdealState leadControllerResourceIdealState = 
_helixResourceManager.getHelixAdmin()
+        .getResourceIdealState(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+    Assert.assertTrue(leadControllerResourceIdealState.isValid());
+    Assert.assertTrue(leadControllerResourceIdealState.isEnabled());
+    Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(),
+        CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+    Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(),
+        
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+    Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
+        Integer.toString(LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT));
+    Assert.assertEquals(leadControllerResourceIdealState.getRebalanceMode(), 
IdealState.RebalanceMode.FULL_AUTO);
+    Assert.assertTrue(leadControllerResourceIdealState.getInstanceSet(
+        
leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty());
+
+    TestUtils
+        .waitForCondition(aVoid -> {
+              ExternalView leadControllerResourceExternalView = 
_helixResourceManager.getHelixAdmin()
+                  .getResourceExternalView(getHelixClusterName(), 
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+              for (String partition : 
leadControllerResourceExternalView.getPartitionSet()) {
+                Map<String, String> stateMap = 
leadControllerResourceExternalView.getStateMap(partition);
+                Map.Entry<String, String> entry = 
stateMap.entrySet().iterator().next();
+                boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + 
"_" + _controllerPort).equals(entry.getKey());
+                result &= "MASTER".equals(entry.getValue());
+                if (!result) {
+                  return false;
+                }
+              }
+              return true;
+            },
+            TIMEOUT_IN_MS, "Failed to assign controller hosts to lead 
controller resource in " + TIMEOUT_IN_MS + " ms.");
+  }
+
+  @Test
+  public void testLeadControllerAssignment() {
+    // Given a number of instances (from 1 to 10), make sure all the instances 
got assigned to lead controller resource.
+    for (int nInstances = 1; nInstances <= 
MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES; nInstances++) {
+      List<String> instanceNames = new ArrayList<>(nInstances);
+      List<Integer> ports = new ArrayList<>(nInstances);
+      for (int i = 0; i < nInstances; i++) {
+        instanceNames.add(PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + 
i);
+        ports.add(i);
+      }
+
+      List<String> partitions = new 
ArrayList<>(NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+      for (int i = 0; i < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; 
i++) {
+        partitions.add(LEAD_CONTROLLER_RESOURCE_NAME + "_" + i);
+      }
+
+      LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
+      states.put("OFFLINE", 0);
+      states.put("SLAVE", LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT - 1);
+      states.put("MASTER", 1);
+
+      CrushEdRebalanceStrategy crushEdRebalanceStrategy = new 
CrushEdRebalanceStrategy();
+      crushEdRebalanceStrategy.init(LEAD_CONTROLLER_RESOURCE_NAME, partitions, 
states, Integer.MAX_VALUE);
+
+      ClusterDataCache clusterDataCache = new ClusterDataCache();
+      PropertyKey.Builder keyBuilder = new 
PropertyKey.Builder(getHelixClusterName());
+      HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+      ClusterConfig clusterConfig = 
accessor.getProperty(keyBuilder.clusterConfig());
+      clusterDataCache.setClusterConfig(clusterConfig);
+
+      Map<String, InstanceConfig> instanceConfigMap = new 
HashMap<>(nInstances);
+      for (int i = 0; i < nInstances; i++) {
+        String instanceName = instanceNames.get(i);
+        int port = ports.get(i);
+        instanceConfigMap.put(instanceName, new InstanceConfig(instanceName
+            + ", {HELIX_ENABLED=true, HELIX_ENABLED_TIMESTAMP=1559546216610, 
HELIX_HOST=Controller_localhost, HELIX_PORT="
+            + port + "}{}{TAG_LIST=[controller]}"));
+      }
+      clusterDataCache.setInstanceConfigMap(instanceConfigMap);
+      ZNRecord znRecord =
+          crushEdRebalanceStrategy.computePartitionAssignment(instanceNames, 
instanceNames, new HashMap<>(0),
+              clusterDataCache);
+
+      Assert.assertNotNull(znRecord);
+      Map<String, List<String>> listFields = znRecord.getListFields();
+      Assert.assertEquals(listFields.size(), 
NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+
+      Map<String, Integer> instanceToMasterAssignmentCountMap = new 
HashMap<>();
+      int maxCount = 0;
+      for (List<String> assignments : listFields.values()) {
+        Assert.assertEquals(assignments.size(), 
LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+        if 
(!instanceToMasterAssignmentCountMap.containsKey(assignments.get(0))) {
+          instanceToMasterAssignmentCountMap.put(assignments.get(0), 1);
+        } else {
+          instanceToMasterAssignmentCountMap.put(assignments.get(0),
+              instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1);
+        }
+        maxCount = 
Math.max(instanceToMasterAssignmentCountMap.get(assignments.get(0)), maxCount);
+      }
+      Assert.assertEquals(instanceToMasterAssignmentCountMap.size(), 
nInstances,
+          "Not all the instances got assigned to the resource!");
+      for (Integer count : instanceToMasterAssignmentCountMap.values()) {
+        Assert.assertTrue((maxCount - count == 0 || maxCount - count == 1), 
"Instance assignment isn't distributed");
+      }
+    }
+  }
+
   @AfterMethod
   public void cleanUpBrokerTags() {
     // Untag all Brokers for other tests
     for (String brokerInstance : 
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
-      _helixAdmin
-          .removeInstanceTag(_helixClusterName, brokerInstance, 
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+      _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+          TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
       _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to