This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7566790 Create leadControllerResource in helix cluster (#4047)
7566790 is described below
commit 75667900a7ad4739069aa2220b778ab7d7b12a6b
Author: Jialiang Li <[email protected]>
AuthorDate: Thu Jun 6 22:18:16 2019 -0700
Create leadControllerResource in helix cluster (#4047)
* Create leadControllerResource in Helix cluster
* Separate Helix cluster creation logic
* Enabled delay rebalance on lead controller resource, set delay time to 5
mins, minActiveReplicas to 0, numReplicas to 1.
---
.../apache/pinot/common/config/TagNameUtils.java | 30 +--
.../apache/pinot/common/utils/CommonConstants.java | 10 +
.../apache/pinot/controller/ControllerStarter.java | 6 +-
.../helix/core/PinotHelixResourceManager.java | 76 +++---
.../helix/core/util/HelixSetupUtils.java | 286 ++++++++++++++-------
.../controller/helix/PinotControllerModeTest.java | 113 +++++++-
.../helix/core/PinotHelixResourceManagerTest.java | 180 ++++++++++---
7 files changed, 498 insertions(+), 203 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
index f1ab25e..a22e4fa 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/config/TagNameUtils.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.common.config;
-import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.utils.ServerType;
import org.apache.pinot.common.utils.TenantRole;
@@ -38,25 +37,20 @@ public class TagNameUtils {
return tenantName + "_" + TenantRole.BROKER.toString();
}
- public static boolean hasValidServerTagSuffix(String tagName) {
- if (tagName.endsWith(ServerType.REALTIME.toString()) ||
tagName.endsWith(ServerType.OFFLINE.toString())) {
- return true;
- }
- return false;
+ public static boolean isServerTag(String tagName) {
+ return isOfflineServerTag(tagName) || isRealtimeServerTag(tagName);
}
- public static TenantRole getTenantRoleFromTag(String tagName)
- throws InvalidConfigException {
- if (tagName.endsWith(ServerType.REALTIME.toString())) {
- return TenantRole.SERVER;
- }
- if (tagName.endsWith(ServerType.OFFLINE.toString())) {
- return TenantRole.SERVER;
- }
- if (tagName.endsWith(TenantRole.BROKER.toString())) {
- return TenantRole.BROKER;
- }
- throw new InvalidConfigException("Cannot identify tenant type from tag
name : " + tagName);
+ public static boolean isOfflineServerTag(String tagName) {
+ return tagName.endsWith(ServerType.OFFLINE.toString());
+ }
+
+ public static boolean isRealtimeServerTag(String tagName) {
+ return tagName.endsWith(ServerType.REALTIME.toString());
+ }
+
+ public static boolean isBrokerTag(String tagName) {
+ return tagName.endsWith(TenantRole.BROKER.toString());
}
public static String getTagFromTenantAndServerType(String tenantName,
ServerType type) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 915e96e..8132cdb 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -37,8 +37,18 @@ public class CommonConstants {
public static final String SERVER_INSTANCE_TYPE = "server";
public static final String BROKER_INSTANCE_TYPE = "broker";
+ public static final String CONTROLLER_INSTANCE_TYPE = "controller";
public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
+ public static final String LEAD_CONTROLLER_RESOURCE_NAME =
"leadControllerResource";
+
+ // More information on why these numbers are set can be found in the
following doc:
+ //
https://cwiki.apache.org/confluence/display/PINOT/Controller+Separation+between+Helix+and+Pinot
+ public static final int NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE =
24;
+ public static final int LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT = 1;
+ public static final boolean ENABLE_DELAY_REBALANCE = true;
+ public static final int MIN_ACTIVE_REPLICAS = 0;
+ public static final long REBALANCE_DELAY_MS = 300_000L; // 5 minutes.
public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index f57d21f..c313616 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -219,8 +219,7 @@ public class ControllerStarter {
private void setUpHelixController() {
// Register and connect instance as Helix controller.
LOGGER.info("Starting Helix controller");
- _helixControllerManager = HelixSetupUtils
- .setup(_helixClusterName, _helixZkURL, _instanceId,
_isUpdateStateModel, _enableBatchMessageMode);
+ _helixControllerManager = HelixSetupUtils.setup(_helixClusterName,
_helixZkURL, _instanceId);
// Emit helix controller metrics
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -244,6 +243,9 @@ public class ControllerStarter {
throw new RuntimeException("Pinot only controller currently isn't
supported in production yet.");
}
+ // Set up Pinot cluster in Helix
+ HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL,
_isUpdateStateModel, _enableBatchMessageMode);
+
// Start all components
initPinotFSFactory();
initSegmentFetcherFactory();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index aaa9ea8..877458d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -53,6 +53,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
@@ -60,6 +61,7 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.OfflineTagConfig;
@@ -92,7 +94,6 @@ import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnli
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.SchemaUtils;
-import org.apache.pinot.common.utils.TenantRole;
import org.apache.pinot.common.utils.helix.HelixHelper;
import
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvider;
import org.apache.pinot.common.utils.retry.RetryPolicies;
@@ -157,15 +158,10 @@ public class PinotHelixResourceManager {
_allowHLCTables = allowHLCTables;
}
- public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String
helixClusterName,
- @Nonnull String controllerInstanceId, @Nonnull String dataDir) {
- this(zkURL, helixClusterName, controllerInstanceId, dataDir,
DEFAULT_EXTERNAL_VIEW_UPDATE_TIMEOUT_MILLIS, false,
- true, true);
- }
-
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
- controllerConf.getControllerHost() + "_" +
controllerConf.getControllerPort(), controllerConf.getDataDir(),
+ CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE +
controllerConf.getControllerHost() + "_"
+ + controllerConf.getControllerPort(), controllerConf.getDataDir(),
controllerConf.getExternalViewOnlineToOfflineTimeout(),
controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
controllerConf.getHLCTablesAllowed());
}
@@ -185,6 +181,10 @@ public class PinotHelixResourceManager {
_cacheInstanceConfigsDataAccessor =
new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>)
baseDataAccessor, instanceConfigs, null,
Collections.singletonList(instanceConfigs));
+
+ // Add instance group tag for controller
+ addInstanceGroupTagIfNeeded();
+
_keyBuilder = _helixDataAccessor.keyBuilder();
_segmentDeletionManager = new SegmentDeletionManager(_dataDir,
_helixAdmin, _helixClusterName, _propertyStore);
ZKMetadataProvider.setClusterTenantIsolationEnabled(_propertyStore,
_isSingleTenantCluster);
@@ -259,9 +259,13 @@ public class PinotHelixResourceManager {
* Register and connect to Helix cluster as PARTICIPANT role.
*/
private HelixManager registerAndConnectAsHelixParticipant() {
- HelixManager helixManager = HelixManagerFactory
- .getZKHelixManager(_helixClusterName,
CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + _instanceId,
- InstanceType.PARTICIPANT, _helixZkURL);
+ HelixManager helixManager =
+ HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, _helixZkURL);
+
+ // Registers Master-Slave state model to state machine engine, which is
for calculating participant assignment in lead controller resource.
+ helixManager.getStateMachineEngine()
+ .registerStateModelFactory(MasterSlaveSMD.name, new
MasterSlaveStateModelFactory());
+
try {
helixManager.connect();
return helixManager;
@@ -274,6 +278,20 @@ public class PinotHelixResourceManager {
}
/**
+ * Add instance group tag for controller so that pinot controller can be
assigned to lead controller resource.
+ */
+ private void addInstanceGroupTagIfNeeded() {
+ InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
+ if
(!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) {
+ LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.",
_instanceId,
+ CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+ accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId),
instanceConfig);
+ }
+ }
+
+ /**
* Instance related APIs
*/
@@ -835,20 +853,8 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInCluster) {
InstanceConfig config =
_helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
for (String tag : config.getTags()) {
- if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
- .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
- .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
- continue;
- }
- TenantRole tenantRole;
- try {
- tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
- if (tenantRole == TenantRole.BROKER) {
- tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
- }
- } catch (InvalidConfigException e) {
- LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName,
tag);
- continue;
+ if (TagNameUtils.isBrokerTag(tag)) {
+ tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
}
}
}
@@ -861,20 +867,8 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInCluster) {
InstanceConfig config =
_helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName));
for (String tag : config.getTags()) {
- if (tag.equals(CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE) || tag
- .equals(CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE) || tag
- .equals(CommonConstants.Minion.UNTAGGED_INSTANCE)) {
- continue;
- }
- TenantRole tenantRole;
- try {
- tenantRole = TagNameUtils.getTenantRoleFromTag(tag);
- if (tenantRole == TenantRole.SERVER) {
- tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
- }
- } catch (InvalidConfigException e) {
- LOGGER.warn("Instance {} contains an invalid tag: {}", instanceName,
tag);
- continue;
+ if (TagNameUtils.isServerTag(tag)) {
+ tenantSet.add(TagNameUtils.getTenantNameFromTag(tag));
}
}
}
@@ -1176,7 +1170,7 @@ public class PinotHelixResourceManager {
if (tagOverrideConfig != null) {
String realtimeConsumingTag = tagOverrideConfig.getRealtimeConsuming();
if (realtimeConsumingTag != null) {
- if (!TagNameUtils.hasValidServerTagSuffix(realtimeConsumingTag)) {
+ if (!TagNameUtils.isServerTag(realtimeConsumingTag)) {
throw new InvalidTableConfigException(
"Invalid realtime consuming tag: " + realtimeConsumingTag + "
for table " + tableNameWithType
+ ". Must have suffix _REALTIME or _OFFLINE");
@@ -1190,7 +1184,7 @@ public class PinotHelixResourceManager {
String realtimeCompletedTag = tagOverrideConfig.getRealtimeCompleted();
if (realtimeCompletedTag != null) {
- if (!TagNameUtils.hasValidServerTagSuffix(realtimeCompletedTag)) {
+ if (!TagNameUtils.isServerTag(realtimeCompletedTag)) {
throw new InvalidTableConfigException(
"Invalid realtime completed tag: " + realtimeCompletedTag + "
for table " + tableNameWithType
+ ". Must have suffix _REALTIME or _OFFLINE");
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index b8707f7..f65a8ce 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.controller.helix.core.util;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -27,10 +29,10 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyPathConfig;
-import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.HelixControllerMain;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZKHelixManager;
@@ -41,6 +43,7 @@ import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
@@ -53,6 +56,11 @@ import
org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.ENABLE_DELAY_REBALANCE;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.MIN_ACTIVE_REPLICAS;
+import static
org.apache.pinot.common.utils.CommonConstants.Helix.REBALANCE_DELAY_MS;
+
/**
* HelixSetupUtils handles how to create or get a helixCluster in controller.
@@ -60,119 +68,205 @@ import org.slf4j.LoggerFactory;
*
*/
public class HelixSetupUtils {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(HelixSetupUtils.class);
public static synchronized HelixManager setup(String helixClusterName,
String zkPath,
- String pinotControllerInstanceId, boolean isUpdateStateModel, boolean
enableBatchMessageMode) {
+ String helixControllerInstanceId) {
+ setupHelixCluster(helixClusterName, zkPath);
- try {
- createHelixClusterIfNeeded(helixClusterName, zkPath, isUpdateStateModel,
enableBatchMessageMode);
- } catch (final Exception e) {
- LOGGER.error("Caught exception", e);
- return null;
- }
+ return startHelixControllerInStandadloneMode(helixClusterName, zkPath,
helixControllerInstanceId);
+ }
- try {
- return startHelixControllerInStandadloneMode(helixClusterName, zkPath,
pinotControllerInstanceId);
- } catch (final Exception e) {
- LOGGER.error("Caught exception", e);
- return null;
+ /**
+ * Set up a brand new Helix cluster if it doesn't exist.
+ */
+ public static void setupHelixCluster(String helixClusterName, String zkPath)
{
+ final HelixAdmin admin = new ZKHelixAdmin(zkPath);
+ if (admin.getClusters().contains(helixClusterName)) {
+ LOGGER.info("Helix cluster: {} already exists", helixClusterName);
+ return;
}
+ LOGGER.info("Creating a new Helix cluster: {}", helixClusterName);
+ admin.addCluster(helixClusterName, false);
+ LOGGER.info("New Cluster: {} created.", helixClusterName);
+ }
+
+ private static HelixManager startHelixControllerInStandadloneMode(String
helixClusterName, String zkUrl,
+ String pinotControllerInstanceId) {
+ LOGGER.info("Starting Helix Standalone Controller ... ");
+ return HelixControllerMain
+ .startHelixController(zkUrl, helixClusterName,
pinotControllerInstanceId, HelixControllerMain.STANDALONE);
}
- public static void createHelixClusterIfNeeded(String helixClusterName,
String zkPath, boolean isUpdateStateModel,
+ /**
+ * Customizes existing Helix cluster to run Pinot components.
+ */
+ public static void setupPinotCluster(String helixClusterName, String zkPath,
boolean isUpdateStateModel,
boolean enableBatchMessageMode) {
final HelixAdmin admin = new ZKHelixAdmin(zkPath);
- final String segmentStateModelName =
-
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ Preconditions.checkState(admin.getClusters().contains(helixClusterName),
+ String.format("Helix cluster: %s hasn't been set up",
helixClusterName));
- if (admin.getClusters().contains(helixClusterName)) {
- LOGGER.info("cluster already exists
********************************************* ");
- if (isUpdateStateModel) {
- final StateModelDefinition curStateModelDef =
admin.getStateModelDef(helixClusterName, segmentStateModelName);
- List<String> states = curStateModelDef.getStatesPriorityList();
- if
(states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE))
{
- LOGGER.info("State model {} already updated to contain CONSUMING
state", segmentStateModelName);
- return;
- } else {
- LOGGER.info("Updating {} to add states for low level consumers",
segmentStateModelName);
- StateModelDefinition newStateModelDef =
-
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
- ZkClient zkClient = new ZkClient(zkPath);
-
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC,
TimeUnit.SECONDS);
- zkClient.setZkSerializer(new ZNRecordSerializer());
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(helixClusterName, new
ZkBaseDataAccessor<ZNRecord>(zkClient));
- PropertyKey.Builder keyBuilder = accessor.keyBuilder();
-
accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName),
newStateModelDef);
- LOGGER.info("Completed updating statemodel {}",
segmentStateModelName);
- zkClient.close();
- }
- }
- return;
- }
+ // Ensure auto join.
+ ensureAutoJoin(helixClusterName, admin);
- LOGGER.info("Creating a new cluster, as the helix cluster : " +
helixClusterName
- + " was not found ********************************************* ");
- admin.addCluster(helixClusterName, false);
+ // Add segment state model definition if needed
+ addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath,
isUpdateStateModel);
+
+ // Add broker resource if needed
+ createBrokerResourceIfNeeded(helixClusterName, admin,
enableBatchMessageMode);
- LOGGER.info("Enable auto join.");
+ // Add lead controller resource if needed
+ createLeadControllerResourceIfNeeded(helixClusterName, admin,
enableBatchMessageMode);
+
+ // Init property store if needed
+ initPropertyStoreIfNeeded(helixClusterName, zkPath);
+ }
+
+ private static void ensureAutoJoin(String helixClusterName, HelixAdmin
admin) {
final HelixConfigScope scope =
new
HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build();
+ String stateTransitionMaxThreads = MessageType.STATE_TRANSITION + "." +
HelixTaskExecutor.MAX_THREADS;
+ List<String> keys = new ArrayList<>();
+ keys.add(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN);
+ keys.add(stateTransitionMaxThreads);
+ Map<String, String> configs = admin.getConfig(scope, keys);
+ if
(!Boolean.TRUE.toString().equals(configs.get(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN)))
{
+ configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN,
Boolean.TRUE.toString());
+ }
+ if (!Integer.toString(1).equals(configs.get(stateTransitionMaxThreads))) {
+ configs.put(stateTransitionMaxThreads, String.valueOf(1));
+ }
+ admin.setConfig(scope, configs);
+ }
- final Map<String, String> props = new HashMap<String, String>();
- props.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN,
String.valueOf(true));
- //we need only one segment to be loaded at a time
- props.put(MessageType.STATE_TRANSITION + "." +
HelixTaskExecutor.MAX_THREADS, String.valueOf(1));
-
- admin.setConfig(scope, props);
-
- LOGGER.info(
- "Adding state model {} (with CONSUMED state) generated using {}
**********************************************",
- segmentStateModelName,
PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
-
- // If this is a fresh cluster we are creating, then the cluster will see
the CONSUMING state in the
- // state model. But then the servers will never be asked to go to that
STATE (whether they have the code
- // to handle it or not) unil we complete the feature using low-level
consumers and turn the feature on.
- admin.addStateModelDef(helixClusterName, segmentStateModelName,
-
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
- LOGGER.info("Adding state model definition named : "
- +
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL
- + " generated using : " +
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()
- + " ********************************************** ");
-
- admin.addStateModelDef(helixClusterName,
-
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
-
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
-
- LOGGER.info("Adding empty ideal state for Broker!");
- HelixHelper.updateResourceConfigsFor(new HashMap<String, String>(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
- helixClusterName, admin);
- IdealState idealState = PinotTableIdealStateBuilder
- .buildEmptyIdealStateForBrokerResource(admin, helixClusterName,
enableBatchMessageMode);
- admin.setResourceIdealState(helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
- initPropertyStorePath(helixClusterName, zkPath);
- LOGGER.info("New Cluster setup completed...
********************************************** ");
+ private static void addSegmentStateModelDefinitionIfNeeded(String
helixClusterName, HelixAdmin admin, String zkPath,
+ boolean isUpdateStateModel) {
+ final String segmentStateModelName =
+
PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL;
+ StateModelDefinition stateModelDefinition =
admin.getStateModelDef(helixClusterName, segmentStateModelName);
+ if (stateModelDefinition == null) {
+ LOGGER.info("Adding state model {} (with CONSUMED state) generated using
{}", segmentStateModelName,
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString());
+ admin.addStateModelDef(helixClusterName, segmentStateModelName,
+
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+ } else if (isUpdateStateModel) {
+ final StateModelDefinition curStateModelDef =
admin.getStateModelDef(helixClusterName, segmentStateModelName);
+ List<String> states = curStateModelDef.getStatesPriorityList();
+ if
(states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE))
{
+ LOGGER.info("State model {} already updated to contain CONSUMING
state", segmentStateModelName);
+ } else {
+ LOGGER.info("Updating {} to add states for low level consumers",
segmentStateModelName);
+ StateModelDefinition newStateModelDef =
+
PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition();
+ ZkClient zkClient = new ZkClient(zkPath);
+
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC,
TimeUnit.SECONDS);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName,
new ZkBaseDataAccessor<>(zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName),
newStateModelDef);
+ LOGGER.info("Completed updating state model {}",
segmentStateModelName);
+ zkClient.close();
+ }
+ }
}
- private static void initPropertyStorePath(String helixClusterName, String
zkPath) {
- String propertyStorePath =
PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, helixClusterName);
- ZkHelixPropertyStore<ZNRecord> propertyStore =
- new ZkHelixPropertyStore<ZNRecord>(zkPath, new ZNRecordSerializer(),
propertyStorePath);
- propertyStore.create("/CONFIGS", new ZNRecord(""),
AccessOption.PERSISTENT);
- propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""),
AccessOption.PERSISTENT);
- propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""),
AccessOption.PERSISTENT);
- propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""),
AccessOption.PERSISTENT);
- propertyStore.create("/SCHEMAS", new ZNRecord(""),
AccessOption.PERSISTENT);
- propertyStore.create("/SEGMENTS", new ZNRecord(""),
AccessOption.PERSISTENT);
+ private static void createBrokerResourceIfNeeded(String helixClusterName,
HelixAdmin admin,
+ boolean enableBatchMessageMode) {
+ // Add broker resource online offline state model definition if needed
+ StateModelDefinition brokerResourceStateModelDefinition =
admin.getStateModelDef(helixClusterName,
+
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL);
+ if (brokerResourceStateModelDefinition == null) {
+ LOGGER.info("Adding state model definition named : {} generated using :
{}",
+
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString());
+ admin.addStateModelDef(helixClusterName,
+
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL,
+
PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition());
+ }
+
+ // Create broker resource if needed.
+ IdealState brokerResourceIdealState =
+ admin.getResourceIdealState(helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ if (brokerResourceIdealState == null) {
+ LOGGER.info("Adding empty ideal state for Broker!");
+ HelixHelper
+ .updateResourceConfigsFor(new HashMap<>(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName,
+ admin);
+ IdealState idealState = PinotTableIdealStateBuilder
+ .buildEmptyIdealStateForBrokerResource(admin, helixClusterName,
enableBatchMessageMode);
+ admin.setResourceIdealState(helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState);
+ }
}
- private static HelixManager startHelixControllerInStandadloneMode(String
helixClusterName, String zkUrl,
- String pinotControllerInstanceId) {
- LOGGER.info("Starting Helix Standalone Controller ... ");
- return HelixControllerMain
- .startHelixController(zkUrl, helixClusterName,
pinotControllerInstanceId, HelixControllerMain.STANDALONE);
+ private static void createLeadControllerResourceIfNeeded(String
helixClusterName, HelixAdmin admin,
+ boolean enableBatchMessageMode) {
+ StateModelDefinition masterSlaveStateModelDefinition =
+ admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name);
+ if (masterSlaveStateModelDefinition == null) {
+ LOGGER.info("Adding state model definition named : {} generated using :
{}", MasterSlaveSMD.name,
+ MasterSlaveSMD.class.toString());
+ admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name,
MasterSlaveSMD.build());
+ }
+
+ IdealState leadControllerResourceIdealState =
+ admin.getResourceIdealState(helixClusterName,
LEAD_CONTROLLER_RESOURCE_NAME);
+ if (leadControllerResourceIdealState == null) {
+ LOGGER.info("Cluster {} doesn't contain {}. Creating one.",
helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME);
+ HelixHelper.updateResourceConfigsFor(new HashMap<>(),
LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin);
+ // FULL-AUTO Master-Slave state model with CrushED reBalance strategy.
+ admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE,
MasterSlaveSMD.name,
+ IdealState.RebalanceMode.FULL_AUTO.toString(),
CrushEdRebalanceStrategy.class.getName());
+
+ // Set instance group tag for lead controller resource.
+ IdealState leadControllerIdealState =
+ admin.getResourceIdealState(helixClusterName,
LEAD_CONTROLLER_RESOURCE_NAME);
+
leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode);
+ // The below config guarantees if active number of replicas is no less
than minimum active replica, there will not be partition movements happened.
+ // Set min active replicas to 0 and rebalance delay to 5 minutes so that
if any master goes offline, Helix controller waits at most 5 minutes and then
re-calculate the participant assignment.
+ // This delay is helpful when periodic tasks are running and we don't
want them to be re-run too frequently.
+ // Plus, if virtual id is applied to controller hosts, swapping hosts
would be easy as new hosts can use the same virtual id and it takes least
effort to change the configs.
+ leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS);
+ leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS);
+
leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE);
+ admin.setResourceIdealState(helixClusterName,
LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState);
+
+ // Explicitly disable this resource when creating this new resource.
+ // When all the controllers are running the code with the logic to
handle this resource, it can be enabled for backward compatibility.
+ // In the next major release, we can enable this resource by default, so
that all the controller logic can be separated.
+ admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
false);
+
+ LOGGER.info("Re-balance lead controller resource with replicas: {}",
+ CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+ // Set it to 1 so that there's only 1 instance (i.e. master) shown in
every partitions.
+ admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME,
+ CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+ }
+ }
+
+ private static void initPropertyStoreIfNeeded(String helixClusterName,
String zkPath) {
+ String propertyStorePath =
PropertyPathBuilder.propertyStore(helixClusterName);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
+ new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(),
propertyStorePath);
+ if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS", new ZNRecord(""),
AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""),
AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""),
AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) {
+ propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""),
AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) {
+ propertyStore.create("/SCHEMAS", new ZNRecord(""),
AccessOption.PERSISTENT);
+ }
+ if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) {
+ propertyStore.create("/SEGMENTS", new ZNRecord(""),
AccessOption.PERSISTENT);
+ }
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
index f3da797..d91e612 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java
@@ -18,9 +18,14 @@
*/
package org.apache.pinot.controller.helix;
+import java.util.Map;
+import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
+import org.apache.helix.model.ExternalView;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerStarter;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -62,18 +67,37 @@ public class PinotControllerModeTest extends ControllerTest
{
config.setControllerMode(ControllerConf.ControllerMode.DUAL);
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
+ controllerPortOffset++));
+ // Helix cluster will be set up when starting the first controller.
startController(config);
TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(),
TIMEOUT_IN_MS,
"Failed to start " + config.getControllerMode() + " controller in " +
TIMEOUT_IN_MS + "ms.");
Assert.assertEquals(_controllerStarter.getControllerMode(),
ControllerConf.ControllerMode.DUAL);
+ // Enable the lead controller resource.
+ _helixAdmin.enableResource(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+ // Starting a second dual-mode controller. Helix cluster has already been
set up.
+ ControllerConf controllerConfig = getDefaultControllerConfiguration();
+ controllerConfig.setHelixClusterName(getHelixClusterName());
+ controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL);
+ controllerConfig.setControllerPort(
+ Integer.toString(Integer.parseInt(this.config.getControllerPort()) +
controllerPortOffset++));
+
+ ControllerStarter secondDualModeController = new
TestOnlyControllerStarter(controllerConfig);
+ secondDualModeController.start();
+ TestUtils
+ .waitForCondition(aVoid ->
secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(),
+ TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + "
controller in " + TIMEOUT_IN_MS + "ms.");
+ Assert.assertEquals(secondDualModeController.getControllerMode(),
ControllerConf.ControllerMode.DUAL);
+
+ secondDualModeController.stop();
stopController();
_controllerStarter = null;
}
// TODO: enable it after removing ControllerLeadershipManager which requires
both CONTROLLER and PARTICIPANT
// HelixManager
- @Test (enabled = false)
+ @Test(enabled = false)
public void testPinotOnlyController()
throws Exception {
config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
@@ -95,27 +119,94 @@ public class PinotControllerModeTest extends
ControllerTest {
ControllerStarter helixControllerStarter = new ControllerStarter(config2);
helixControllerStarter.start();
HelixManager helixControllerManager =
helixControllerStarter.getHelixControllerManager();
+ HelixAdmin helixAdmin = helixControllerManager.getClusterManagmentTool();
TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(),
TIMEOUT_IN_MS,
"Failed to start " + config2.getControllerMode() + " controller in " +
TIMEOUT_IN_MS + "ms.");
+ // Enable the lead controller resource.
+ helixAdmin.enableResource(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
// Starting a pinot only controller.
- startController(config, false);
- TestUtils.waitForCondition(aVoid ->
_helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS,
- "Failed to start " + config.getControllerMode() + " controller in " +
TIMEOUT_IN_MS + "ms.");
- Assert.assertEquals(_controllerStarter.getControllerMode(),
ControllerConf.ControllerMode.PINOT_ONLY);
+ ControllerConf config3 = getDefaultControllerConfiguration();
+ config3.setHelixClusterName(getHelixClusterName());
+ config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+
config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
+ controllerPortOffset++));
+
+ ControllerStarter firstPinotOnlyController = new
TestOnlyControllerStarter(config3);
+ firstPinotOnlyController.start();
+ PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager =
+ firstPinotOnlyController.getHelixResourceManager();
+
+ TestUtils.waitForCondition(aVoid ->
firstPinotOnlyPinotHelixResourceManager.getHelixZkManager().isConnected(),
+ TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + "
controller in " + TIMEOUT_IN_MS + "ms.");
+ Assert.assertEquals(firstPinotOnlyController.getControllerMode(),
ControllerConf.ControllerMode.PINOT_ONLY);
// Start a second Pinot only controller.
-
config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
+ controllerPortOffset++));
- ControllerStarter secondControllerStarter = new
TestOnlyControllerStarter(config);
+ ControllerConf config4 = getDefaultControllerConfiguration();
+ config4.setHelixClusterName(getHelixClusterName());
+ config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY);
+
config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort())
+ controllerPortOffset++));
+ ControllerStarter secondControllerStarter = new
TestOnlyControllerStarter(config4);
secondControllerStarter.start();
// Two controller instances assigned to cluster.
- TestUtils.waitForCondition(aVoid ->
_helixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS,
- "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS +
"ms.");
-
+ TestUtils
+ .waitForCondition(aVoid ->
firstPinotOnlyPinotHelixResourceManager.getAllInstances().size() == 2,
TIMEOUT_IN_MS,
+ "Failed to start the 2nd pinot only controller in " +
TIMEOUT_IN_MS + "ms.");
+
+ // Disable lead controller resource, all the participants are in offline
state (from slave state).
+ helixAdmin.enableResource(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, false);
+
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView =
firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+ .getResourceExternalView(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition :
leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap =
leadControllerResourceExternalView.getStateMap(partition);
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if (!"OFFLINE".equals(entry.getValue())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Failed to mark all the participants offline in " +
TIMEOUT_IN_MS + "ms.");
+
+ // Re-enable lead controller resource, all the participants are in healthy
state (either master or slave).
+ helixAdmin.enableResource(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true);
+
+ // Shutdown one controller, it will be removed from external view of lead
controller resource.
secondControllerStarter.stop();
- stopController();
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView =
firstPinotOnlyPinotHelixResourceManager.getHelixAdmin()
+ .getResourceExternalView(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition :
leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap =
leadControllerResourceExternalView.getStateMap(partition);
+ // Only 1 participant left in each partition, which will become the
master.
+ for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+ if (!"MASTER".equals(entry.getValue())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Failed to mark all the participants MASTER in " +
TIMEOUT_IN_MS + "ms.");
+
+ // Shutdown the only one controller left, the partition map should be
empty.
+ firstPinotOnlyController.stop();
+ TestUtils.waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView = helixAdmin
+ .getResourceExternalView(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition :
leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap =
leadControllerResourceExternalView.getStateMap(partition);
+ // There's no participant in all the partitions.
+ if (!stateMap.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ }, TIMEOUT_IN_MS, "Failed to have all the partitions empty in " +
TIMEOUT_IN_MS + "ms.");
+
_controllerStarter = null;
helixControllerStarter.stop();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index 573ab90..7d9f8d9 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -19,13 +19,23 @@
package org.apache.pinot.controller.helix.core;
import com.google.common.collect.BiMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -45,12 +55,14 @@ import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.common.utils.CommonConstants.Helix.*;
import static
org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*;
@@ -64,34 +76,34 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME);
private static final int CONNECTION_TIMEOUT_IN_MILLISECOND = 10_000;
private static final int MAX_TIMEOUT_IN_MILLISECOND = 5_000;
+ private static final int MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES = 10;
+ private static final long TIMEOUT_IN_MS = 10_000L;
private final String _helixClusterName = getHelixClusterName();
@BeforeClass
- public void setUp()
- throws Exception {
+ public void setUp() throws Exception {
startZk();
ControllerConf config = getDefaultControllerConfiguration();
config.setTenantIsolationEnabled(false);
startController(config);
- ControllerRequestBuilderUtil
- .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES,
- false);
- ControllerRequestBuilderUtil
- .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false,
- BASE_SERVER_ADMIN_PORT);
+
ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName,
+ ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false);
+
ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName,
ZkStarter.DEFAULT_ZK_STR,
+ NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT);
// Create server tenant on all Servers
- Tenant serverTenant =
- new
Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(NUM_INSTANCES)
- .build();
+ Tenant serverTenant = new
Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER)
+ .setOfflineInstances(NUM_INSTANCES)
+ .build();
_helixResourceManager.createServerTenant(serverTenant);
+
+ _helixAdmin.enableResource(getHelixClusterName(),
LEAD_CONTROLLER_RESOURCE_NAME, true);
}
@Test
- public void testGetInstanceEndpoints()
- throws InvalidConfigException {
+ public void testGetInstanceEndpoints() throws InvalidConfigException {
Set<String> servers =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
BiMap<String, String> endpoints =
_helixResourceManager.getDataInstanceAdminEndpoints(servers);
for (int i = 0; i < NUM_INSTANCES; i++) {
@@ -100,8 +112,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
@Test
- public void testGetInstanceConfigs()
- throws Exception {
+ public void testGetInstanceConfigs() throws Exception {
Set<String> servers =
_helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME);
for (String server : servers) {
InstanceConfig cachedInstanceConfig =
_helixResourceManager.getHelixInstanceConfig(server);
@@ -118,8 +129,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
zkClient.close();
}
- private void modifyExistingInstanceConfig(ZkClient zkClient)
- throws InterruptedException {
+ private void modifyExistingInstanceConfig(ZkClient zkClient) throws
InterruptedException {
String instanceName = "Server_localhost_" + new
Random().nextInt(NUM_INSTANCES);
String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
Assert.assertTrue(zkClient.exists(instanceConfigPath));
@@ -150,8 +160,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
zkClient.writeData(instanceConfigPath, znRecord);
}
- private void addAndRemoveNewInstanceConfig(ZkClient zkClient)
- throws Exception {
+ private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws
Exception {
int biggerRandomNumber = NUM_INSTANCES + new
Random().nextInt(NUM_INSTANCES);
String instanceName = "Server_localhost_" +
String.valueOf(biggerRandomNumber);
String instanceConfigPath =
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
@@ -184,17 +193,18 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
@Test
- public void testRebuildBrokerResourceFromHelixTags()
- throws Exception {
+ public void testRebuildBrokerResourceFromHelixTags() throws Exception {
// Create broker tenant on 3 Brokers
Tenant brokerTenant =
new
Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(3).build();
_helixResourceManager.createBrokerTenant(brokerTenant);
// Create the table
- TableConfig tableConfig =
- new
TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3)
-
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build();
+ TableConfig tableConfig = new
TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME)
+ .setNumReplicas(3)
+ .setBrokerTenant(BROKER_TENANT_NAME)
+ .setServerTenant(SERVER_TENANT_NAME)
+ .build();
_helixResourceManager.addTable(tableConfig);
// Check that the BrokerResource ideal state has 3 Brokers assigned to the
table
@@ -204,8 +214,8 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
// Untag all Brokers assigned to broker tenant
for (String brokerInstance :
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+ _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
_helixAdmin.addInstanceTag(_helixClusterName, brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
@@ -228,8 +238,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
@Test
- public void testRetrieveMetadata()
- throws Exception {
+ public void testRetrieveMetadata() throws Exception {
String segmentName = "testSegment";
// Test retrieving OFFLINE segment ZK metadata
@@ -263,7 +272,8 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
}
- @Test void testRetrieveTenantNames() {
+ @Test
+ void testRetrieveTenantNames() {
// Create broker tenant on 1 Broker
Tenant brokerTenant =
new
Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(1).build();
@@ -366,8 +376,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
// Create broker tenant on 3 Brokers
- Tenant brokerTenant =
- new
Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
+ Tenant brokerTenant = new
Tenant.TenantBuilder(brokerTag).setRole(TenantRole.BROKER).setTotalInstances(3).build();
_helixResourceManager.createBrokerTenant(brokerTenant);
// empty server instances list
@@ -452,18 +461,119 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
}
for (String brokerInstance :
_helixResourceManager.getAllInstancesForBrokerTenant(brokerTag)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(brokerTag));
+ _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(brokerTag));
_helixAdmin.addInstanceTag(_helixClusterName, brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
+ @Test
+ public void testLeadControllerResource()
+ throws Exception {
+ IdealState leadControllerResourceIdealState =
_helixResourceManager.getHelixAdmin()
+ .getResourceIdealState(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ Assert.assertTrue(leadControllerResourceIdealState.isValid());
+ Assert.assertTrue(leadControllerResourceIdealState.isEnabled());
+ Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(),
+ CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(),
+
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+ Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
+ Integer.toString(LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT));
+ Assert.assertEquals(leadControllerResourceIdealState.getRebalanceMode(),
IdealState.RebalanceMode.FULL_AUTO);
+ Assert.assertTrue(leadControllerResourceIdealState.getInstanceSet(
+
leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty());
+
+ TestUtils
+ .waitForCondition(aVoid -> {
+ ExternalView leadControllerResourceExternalView =
_helixResourceManager.getHelixAdmin()
+ .getResourceExternalView(getHelixClusterName(),
CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+ for (String partition :
leadControllerResourceExternalView.getPartitionSet()) {
+ Map<String, String> stateMap =
leadControllerResourceExternalView.getStateMap(partition);
+ Map.Entry<String, String> entry =
stateMap.entrySet().iterator().next();
+ boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST +
"_" + _controllerPort).equals(entry.getKey());
+ result &= "MASTER".equals(entry.getValue());
+ if (!result) {
+ return false;
+ }
+ }
+ return true;
+ },
+ TIMEOUT_IN_MS, "Failed to assign controller hosts to lead
controller resource in " + TIMEOUT_IN_MS + " ms.");
+ }
+
+ @Test
+ public void testLeadControllerAssignment() {
+ // Given a number of instances (from 1 to 10), make sure all the instances
got assigned to lead controller resource.
+ for (int nInstances = 1; nInstances <=
MAXIMUM_NUMBER_OF_CONTROLLER_INSTANCES; nInstances++) {
+ List<String> instanceNames = new ArrayList<>(nInstances);
+ List<Integer> ports = new ArrayList<>(nInstances);
+ for (int i = 0; i < nInstances; i++) {
+ instanceNames.add(PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" +
i);
+ ports.add(i);
+ }
+
+ List<String> partitions = new
ArrayList<>(NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+ for (int i = 0; i < NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
i++) {
+ partitions.add(LEAD_CONTROLLER_RESOURCE_NAME + "_" + i);
+ }
+
+ LinkedHashMap<String, Integer> states = new LinkedHashMap<>(2);
+ states.put("OFFLINE", 0);
+ states.put("SLAVE", LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT - 1);
+ states.put("MASTER", 1);
+
+ CrushEdRebalanceStrategy crushEdRebalanceStrategy = new
CrushEdRebalanceStrategy();
+ crushEdRebalanceStrategy.init(LEAD_CONTROLLER_RESOURCE_NAME, partitions,
states, Integer.MAX_VALUE);
+
+ ClusterDataCache clusterDataCache = new ClusterDataCache();
+ PropertyKey.Builder keyBuilder = new
PropertyKey.Builder(getHelixClusterName());
+ HelixDataAccessor accessor = _helixManager.getHelixDataAccessor();
+ ClusterConfig clusterConfig =
accessor.getProperty(keyBuilder.clusterConfig());
+ clusterDataCache.setClusterConfig(clusterConfig);
+
+ Map<String, InstanceConfig> instanceConfigMap = new
HashMap<>(nInstances);
+ for (int i = 0; i < nInstances; i++) {
+ String instanceName = instanceNames.get(i);
+ int port = ports.get(i);
+ instanceConfigMap.put(instanceName, new InstanceConfig(instanceName
+ + ", {HELIX_ENABLED=true, HELIX_ENABLED_TIMESTAMP=1559546216610,
HELIX_HOST=Controller_localhost, HELIX_PORT="
+ + port + "}{}{TAG_LIST=[controller]}"));
+ }
+ clusterDataCache.setInstanceConfigMap(instanceConfigMap);
+ ZNRecord znRecord =
+ crushEdRebalanceStrategy.computePartitionAssignment(instanceNames,
instanceNames, new HashMap<>(0),
+ clusterDataCache);
+
+ Assert.assertNotNull(znRecord);
+ Map<String, List<String>> listFields = znRecord.getListFields();
+ Assert.assertEquals(listFields.size(),
NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
+
+ Map<String, Integer> instanceToMasterAssignmentCountMap = new
HashMap<>();
+ int maxCount = 0;
+ for (List<String> assignments : listFields.values()) {
+ Assert.assertEquals(assignments.size(),
LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT);
+ if
(!instanceToMasterAssignmentCountMap.containsKey(assignments.get(0))) {
+ instanceToMasterAssignmentCountMap.put(assignments.get(0), 1);
+ } else {
+ instanceToMasterAssignmentCountMap.put(assignments.get(0),
+ instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1);
+ }
+ maxCount =
Math.max(instanceToMasterAssignmentCountMap.get(assignments.get(0)), maxCount);
+ }
+ Assert.assertEquals(instanceToMasterAssignmentCountMap.size(),
nInstances,
+ "Not all the instances got assigned to the resource!");
+ for (Integer count : instanceToMasterAssignmentCountMap.values()) {
+ Assert.assertTrue((maxCount - count == 0 || maxCount - count == 1),
"Instance assignment isn't distributed");
+ }
+ }
+ }
+
@AfterMethod
public void cleanUpBrokerTags() {
// Untag all Brokers for other tests
for (String brokerInstance :
_helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) {
- _helixAdmin
- .removeInstanceTag(_helixClusterName, brokerInstance,
TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
+ _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance,
+ TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME));
_helixAdmin.addInstanceTag(_helixClusterName, brokerInstance,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]