This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7c3c8e8705 Allow replica group assignment support in tier configs
(#10255)
7c3c8e8705 is described below
commit 7c3c8e87052edca26648a646850b2b17ec0aa690
Author: Saurabh Dubey <[email protected]>
AuthorDate: Thu Mar 9 10:55:35 2023 +0530
Allow replica group assignment support in tier configs (#10255)
* Allow replica group assignment support in tier configs
* Add reassignment logic
* Add tier to assignInstances
* Make changes to APIs
* Consolidate tier assignemnt configs inside InstanceAssignmentConfigsMap
* Add removal logic for tier partitions
* Lint fix
* Review comments
* Review comments
* Add tests
* Fix java8 quickstart
* Fix test
---------
Co-authored-by: Saurabh Dubey <[email protected]>
---
.../assignment/InstanceAssignmentConfigUtils.java | 20 +--
.../common/assignment/InstancePartitionsUtils.java | 16 ++
.../pinot/common/metadata/ZKMetadataProvider.java | 16 ++
.../common/utils/config/TableConfigUtils.java | 13 +-
.../common/utils/config/TableConfigSerDeTest.java | 10 +-
.../PinotInstanceAssignmentRestletResource.java | 188 +++++++++++++++------
.../api/resources/PinotTableRestletResource.java | 2 +-
.../helix/core/PinotHelixResourceManager.java | 32 +++-
.../instance/InstanceAssignmentDriver.java | 32 +++-
.../helix/core/rebalance/RebalanceResult.java | 8 +
.../helix/core/rebalance/TableRebalancer.java | 111 ++++++++----
...anceAssignmentRestletResourceStatelessTest.java | 141 ++++++++++------
.../instance/InstanceAssignmentTest.java | 64 +++----
.../TableRebalancerClusterStatelessTest.java | 127 +++++++++++++-
.../segment/local/utils/TableConfigUtils.java | 3 +-
.../segment/local/utils/TableConfigUtilsTest.java | 8 +-
.../apache/pinot/spi/config/table/TableConfig.java | 8 +-
.../utils/builder/ControllerRequestURLBuilder.java | 2 +-
.../spi/utils/builder/TableConfigBuilder.java | 4 +-
19 files changed, 594 insertions(+), 211 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index 6a0ae1188e..b571918c0c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -44,11 +44,10 @@ public class InstanceAssignmentConfigUtils {
* backward-compatibility) COMPLETED server tag is overridden to be
different from the CONSUMING server tag.
*/
public static boolean shouldRelocateCompletedSegments(TableConfig
tableConfig) {
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
return (instanceAssignmentConfigMap != null
- && instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED)
!= null) || TagNameUtils
- .isRelocateCompletedSegments(tableConfig.getTenantConfig());
+ &&
instanceAssignmentConfigMap.get(InstancePartitionsType.COMPLETED.toString()) !=
null)
+ ||
TagNameUtils.isRelocateCompletedSegments(tableConfig.getTenantConfig());
}
/**
@@ -60,21 +59,20 @@ public class InstanceAssignmentConfigUtils {
return true;
}
TableType tableType = tableConfig.getTableType();
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
switch (instancePartitionsType) {
// Allow OFFLINE instance assignment if the offline table has it
configured or (for backward-compatibility) is
// using replica-group segment assignment
case OFFLINE:
return tableType == TableType.OFFLINE && ((instanceAssignmentConfigMap
!= null
- && instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE)
!= null)
+ &&
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString()) !=
null)
|| AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY
.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy()));
// Allow CONSUMING/COMPLETED instance assignment if the real-time table
has it configured
case CONSUMING:
case COMPLETED:
return tableType == TableType.REALTIME && (instanceAssignmentConfigMap
!= null
- && instanceAssignmentConfigMap.get(instancePartitionsType) !=
null);
+ &&
instanceAssignmentConfigMap.get(instancePartitionsType.toString()) != null);
default:
throw new IllegalStateException();
}
@@ -89,10 +87,10 @@ public class InstanceAssignmentConfigUtils {
"Instance assignment is not allowed for the given table config");
// Use the instance assignment config from the table config if it exists
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
- InstanceAssignmentConfig instanceAssignmentConfig =
instanceAssignmentConfigMap.get(instancePartitionsType);
+ InstanceAssignmentConfig instanceAssignmentConfig =
+ instanceAssignmentConfigMap.get(instancePartitionsType.toString());
if (instanceAssignmentConfig != null) {
return instanceAssignmentConfig;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
index a15554f3d3..759d387af4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java
@@ -46,6 +46,7 @@ public class InstancePartitionsUtils {
}
public static final char TYPE_SUFFIX_SEPARATOR = '_';
+ public static final String TIER_SUFFIX = "__TIER__";
/**
* Returns the name of the instance partitions for the given table name
(with or without type suffix) and instance
@@ -93,6 +94,11 @@ public class InstancePartitionsUtils {
return znRecord != null ? InstancePartitions.fromZNRecord(znRecord) : null;
}
+ public static String getInstancePartitionsNameForTier(String tableName,
String tierName) {
+ return TableNameBuilder.extractRawTableName(tableName) + TIER_SUFFIX +
tierName;
+ }
+
+
/**
* Gets the instance partitions with the given name, and returns a re-named
copy of the same.
* This method is useful when we use a table with instancePartitionsMap
since in that case
@@ -177,4 +183,14 @@ public class InstancePartitionsUtils {
throw new ZkException("Failed to remove instance partitions: " +
instancePartitionsName);
}
}
+
+ public static void removeTierInstancePartitions(HelixPropertyStore<ZNRecord>
propertyStore,
+ String tableNameWithType) {
+ List<InstancePartitions> instancePartitions =
ZKMetadataProvider.getAllInstancePartitions(propertyStore);
+ instancePartitions.stream().filter(instancePartition ->
instancePartition.getInstancePartitionsName()
+
.startsWith(TableNameBuilder.extractRawTableName(tableNameWithType) +
TIER_SUFFIX))
+ .forEach(instancePartition -> {
+ removeInstancePartitions(propertyStore,
instancePartition.getInstancePartitionsName());
+ });
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index 623a856454..b14ba30391 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -26,9 +26,11 @@ import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.helix.AccessOption;
+import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
+import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SchemaUtils;
@@ -261,6 +263,20 @@ public class ZKMetadataProvider {
}
}
+ @Nullable
+ public static List<InstancePartitions>
getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
+ List<ZNRecord> znRecordss =
+ propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX,
null, AccessOption.PERSISTENT);
+
+ try {
+ return
Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting instance partitions", e);
+ return null;
+ }
+ }
+
@Nullable
public static List<UserConfig>
getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecordss =
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index 8abb0ea964..9af13f44bc 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -113,11 +113,11 @@ public class TableConfigUtils {
queryConfig = JsonUtils.stringToObject(queryConfigString,
QueryConfig.class);
}
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap = null;
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = null;
String instanceAssignmentConfigMapString =
simpleFields.get(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY);
if (instanceAssignmentConfigMapString != null) {
instanceAssignmentConfigMap =
JsonUtils.stringToObject(instanceAssignmentConfigMapString,
- new TypeReference<Map<InstancePartitionsType,
InstanceAssignmentConfig>>() {
+ new TypeReference<Map<String, InstanceAssignmentConfig>>() {
});
}
@@ -181,9 +181,9 @@ public class TableConfigUtils {
}
return new TableConfig(tableName, tableType, validationConfig,
tenantConfig, indexingConfig, customConfig,
- quotaConfig, taskConfig, routingConfig, queryConfig,
instanceAssignmentConfigMap,
- fieldConfigList, upsertConfig, dedupConfig, dimensionTableConfig,
ingestionConfig, tierConfigList, isDimTable,
- tunerConfigList, instancePartitionsMap, segmentAssignmentConfigMap);
+ quotaConfig, taskConfig, routingConfig, queryConfig,
instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
+ dedupConfig, dimensionTableConfig, ingestionConfig, tierConfigList,
isDimTable, tunerConfigList,
+ instancePartitionsMap, segmentAssignmentConfigMap);
}
public static ZNRecord toZNRecord(TableConfig tableConfig)
@@ -216,8 +216,7 @@ public class TableConfigUtils {
if (queryConfig != null) {
simpleFields.put(TableConfig.QUERY_CONFIG_KEY,
queryConfig.toJsonString());
}
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
if (instanceAssignmentConfigMap != null) {
simpleFields
.put(TableConfig.INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY,
JsonUtils.objectToString(instanceAssignmentConfigMap));
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index aad62db8f7..30dbfe80b4 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -212,7 +212,7 @@ public class TableConfigSerDeTest {
new InstanceConstraintConfig(Arrays.asList("constraint1",
"constraint2")),
new InstanceReplicaGroupPartitionConfig(true, 0, 3, 5, 0, 0,
false));
TableConfig tableConfig =
tableConfigBuilder.setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
instanceAssignmentConfig)).build();
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
instanceAssignmentConfig)).build();
checkInstanceAssignmentConfig(tableConfig);
@@ -488,12 +488,12 @@ public class TableConfigSerDeTest {
}
private void checkInstanceAssignmentConfig(TableConfig tableConfig) {
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap =
- tableConfig.getInstanceAssignmentConfigMap();
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap =
tableConfig.getInstanceAssignmentConfigMap();
assertNotNull(instanceAssignmentConfigMap);
assertEquals(instanceAssignmentConfigMap.size(), 1);
-
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE));
- InstanceAssignmentConfig instanceAssignmentConfig =
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE);
+
assertTrue(instanceAssignmentConfigMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ InstanceAssignmentConfig instanceAssignmentConfig =
+
instanceAssignmentConfigMap.get(InstancePartitionsType.OFFLINE.toString());
InstanceTagPoolConfig tagPoolConfig =
instanceAssignmentConfig.getTagPoolConfig();
assertEquals(tagPoolConfig.getTag(), "tenant_OFFLINE");
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index dfdaefafaa..aee7df4e8a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +46,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
@@ -57,6 +59,7 @@ import
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -80,39 +83,57 @@ public class PinotInstanceAssignmentRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/{tableName}/instancePartitions")
@ApiOperation(value = "Get the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+ public Map<String, InstancePartitions> getInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String
tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type")
@Nullable
- InstancePartitionsType instancePartitionsType) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name")
@QueryParam("type") @Nullable String type) {
+ Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType != TableType.REALTIME) {
- if (instancePartitionsType == InstancePartitionsType.OFFLINE ||
instancePartitionsType == null) {
- InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.OFFLINE.toString().equals(type) || type ==
null) {
+ InstancePartitions offlineInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
if (offlineInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
offlineInstancePartitions);
+ instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(),
offlineInstancePartitions);
}
}
}
if (tableType != TableType.OFFLINE) {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING ||
instancePartitionsType == null) {
- InstancePartitions consumingInstancePartitions =
InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.CONSUMING.toString().equals(type) || type ==
null) {
+ InstancePartitions consumingInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
if (consumingInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
consumingInstancePartitions);
+
instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(),
consumingInstancePartitions);
}
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED ||
instancePartitionsType == null) {
- InstancePartitions completedInstancePartitions =
InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.COMPLETED.toString().equals(type) || type ==
null) {
+ InstancePartitions completedInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
if (completedInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
completedInstancePartitions);
+
instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(),
completedInstancePartitions);
+ }
+ }
+ }
+
+ List<TableConfig> tableConfigs =
Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+ _resourceManager.getOfflineTableConfig(tableName));
+
+ for (TableConfig tableConfig : tableConfigs) {
+ if (tableConfig != null &&
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if (type == null || type.equals(tierConfig.getName())) {
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+ tierConfig.getName()));
+ if (instancePartitions != null) {
+ instancePartitionsMap.put(tierConfig.getName(),
instancePartitions);
+ }
+ }
}
}
}
@@ -130,22 +151,20 @@ public class PinotInstanceAssignmentRestletResource {
@Path("/tables/{tableName}/assignInstances")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Assign server instances to a table")
- public Map<InstancePartitionsType, InstancePartitions> assignInstances(
+ public Map<String, InstancePartitions> assignInstances(
@ApiParam(value = "Name of the table") @PathParam("tableName") String
tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type")
@Nullable
- InstancePartitionsType instancePartitionsType,
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name")
@QueryParam("type") @Nullable String type,
@ApiParam(value = "Whether to do dry-run") @DefaultValue("false")
@QueryParam("dryRun") boolean dryRun) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
+ Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
List<InstanceConfig> instanceConfigs =
_resourceManager.getAllHelixInstanceConfigs();
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType != TableType.REALTIME && (instancePartitionsType ==
InstancePartitionsType.OFFLINE
- || instancePartitionsType == null)) {
+ if (tableType != TableType.REALTIME &&
(InstancePartitionsType.OFFLINE.toString().equals(type) || type == null)) {
TableConfig offlineTableConfig =
_resourceManager.getOfflineTableConfig(tableName);
if (offlineTableConfig != null) {
try {
- if (InstanceAssignmentConfigUtils
- .allowInstanceAssignment(offlineTableConfig,
InstancePartitionsType.OFFLINE)) {
+ if
(InstanceAssignmentConfigUtils.allowInstanceAssignment(offlineTableConfig,
+ InstancePartitionsType.OFFLINE)) {
assignInstancesForInstancePartitionsType(instancePartitionsMap,
offlineTableConfig, instanceConfigs,
InstancePartitionsType.OFFLINE);
}
@@ -158,20 +177,20 @@ public class PinotInstanceAssignmentRestletResource {
}
}
}
- if (tableType != TableType.OFFLINE && instancePartitionsType !=
InstancePartitionsType.OFFLINE) {
+ if (tableType != TableType.OFFLINE &&
!InstancePartitionsType.OFFLINE.toString().equals(type)) {
TableConfig realtimeTableConfig =
_resourceManager.getRealtimeTableConfig(tableName);
if (realtimeTableConfig != null) {
try {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING ||
instancePartitionsType == null) {
- if (InstanceAssignmentConfigUtils
- .allowInstanceAssignment(realtimeTableConfig,
InstancePartitionsType.CONSUMING)) {
+ if (InstancePartitionsType.CONSUMING.toString().equals(type) || type
== null) {
+ if
(InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+ InstancePartitionsType.CONSUMING)) {
assignInstancesForInstancePartitionsType(instancePartitionsMap,
realtimeTableConfig, instanceConfigs,
InstancePartitionsType.CONSUMING);
}
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED ||
instancePartitionsType == null) {
- if (InstanceAssignmentConfigUtils
- .allowInstanceAssignment(realtimeTableConfig,
InstancePartitionsType.COMPLETED)) {
+ if (InstancePartitionsType.COMPLETED.toString().equals(type) || type
== null) {
+ if
(InstanceAssignmentConfigUtils.allowInstanceAssignment(realtimeTableConfig,
+ InstancePartitionsType.COMPLETED)) {
assignInstancesForInstancePartitionsType(instancePartitionsMap,
realtimeTableConfig, instanceConfigs,
InstancePartitionsType.COMPLETED);
}
@@ -186,6 +205,16 @@ public class PinotInstanceAssignmentRestletResource {
}
}
+ TableConfig realtimeTableConfig =
_resourceManager.getRealtimeTableConfig(tableName);
+ if (realtimeTableConfig != null) {
+ assignInstancesForTier(instancePartitionsMap, realtimeTableConfig,
instanceConfigs, type);
+ }
+
+ TableConfig offlineTableConfig =
_resourceManager.getOfflineTableConfig(tableName);
+ if (offlineTableConfig != null) {
+ assignInstancesForTier(instancePartitionsMap, offlineTableConfig,
instanceConfigs, type);
+ }
+
if (instancePartitionsMap.isEmpty()) {
throw new ControllerApplicationException(LOGGER, "Failed to find the
instance assignment config",
Response.Status.NOT_FOUND);
@@ -207,22 +236,43 @@ public class PinotInstanceAssignmentRestletResource {
* @param instanceConfigs list of instance configs
* @param instancePartitionsType type of instancePartitions
*/
- private void assignInstancesForInstancePartitionsType(
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
TableConfig tableConfig,
- List<InstanceConfig> instanceConfigs, InstancePartitionsType
instancePartitionsType) {
+ private void assignInstancesForInstancePartitionsType(Map<String,
InstancePartitions> instancePartitionsMap,
+ TableConfig tableConfig, List<InstanceConfig> instanceConfigs,
InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)) {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- instancePartitionsMap.put(instancePartitionsType,
InstancePartitionsUtils.fetchInstancePartitionsWithRename(
- _resourceManager.getPropertyStore(),
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
- instancePartitionsType.getInstancePartitionsName(rawTableName)));
+ instancePartitionsMap.put(instancePartitionsType.toString(),
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+ instancePartitionsType.getInstancePartitionsName(rawTableName)));
return;
}
InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
instancePartitionsType.toString()));
- instancePartitionsMap.put(instancePartitionsType, new
InstanceAssignmentDriver(tableConfig)
- .assignInstances(instancePartitionsType, instanceConfigs,
existingInstancePartitions));
+ instancePartitionsMap.put(instancePartitionsType.toString(),
+ new
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType,
instanceConfigs,
+ existingInstancePartitions));
+ }
+
+ private void assignInstancesForTier(Map<String, InstancePartitions>
instancePartitionsMap, TableConfig tableConfig,
+ List<InstanceConfig> instanceConfigs, String tierName) {
+ if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+ && tableConfig.getInstanceAssignmentConfigMap() != null) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if ((tierConfig.getName().equals(tierName) || tierName == null)
+ &&
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()) != null)
{
+ InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(
+ _resourceManager.getHelixZkManager().getHelixPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+ tierConfig.getName()));
+
+ instancePartitionsMap.put(tierConfig.getName(),
+ new
InstanceAssignmentDriver(tableConfig).assignInstances(tierConfig.getName(),
instanceConfigs,
+ existingInstancePartitions,
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName())));
+ }
+ }
+ }
}
private void persistInstancePartitionsHelper(InstancePartitions
instancePartitions) {
@@ -240,7 +290,7 @@ public class PinotInstanceAssignmentRestletResource {
@Path("/tables/{tableName}/instancePartitions")
@Authenticate(AccessType.UPDATE)
@ApiOperation(value = "Create/update the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> setInstancePartitions(
+ public Map<String, InstancePartitions> setInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String
tableName, String instancePartitionsStr) {
InstancePartitions instancePartitions;
try {
@@ -256,17 +306,32 @@ public class PinotInstanceAssignmentRestletResource {
if (tableType != TableType.REALTIME) {
if
(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
{
persistInstancePartitionsHelper(instancePartitions);
- return Collections.singletonMap(InstancePartitionsType.OFFLINE,
instancePartitions);
+ return
Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
instancePartitions);
}
}
if (tableType != TableType.OFFLINE) {
if
(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
{
persistInstancePartitionsHelper(instancePartitions);
- return Collections.singletonMap(InstancePartitionsType.CONSUMING,
instancePartitions);
+ return
Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(),
instancePartitions);
}
if
(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName).equals(instancePartitionsName))
{
persistInstancePartitionsHelper(instancePartitions);
- return Collections.singletonMap(InstancePartitionsType.COMPLETED,
instancePartitions);
+ return
Collections.singletonMap(InstancePartitionsType.COMPLETED.toString(),
instancePartitions);
+ }
+ }
+
+ List<TableConfig> tableConfigs =
Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+ _resourceManager.getOfflineTableConfig(tableName));
+
+ for (TableConfig tableConfig : tableConfigs) {
+ if (tableConfig != null &&
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if
(InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
tierConfig.getName())
+ .equals(instancePartitionsName)) {
+ persistInstancePartitionsHelper(instancePartitions);
+ return Collections.singletonMap(tierConfig.getName(),
instancePartitions);
+ }
+ }
}
}
@@ -281,22 +346,39 @@ public class PinotInstanceAssignmentRestletResource {
@ApiOperation(value = "Remove the instance partitions")
public SuccessResponse removeInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String
tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type")
@Nullable
- InstancePartitionsType instancePartitionsType) {
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name")
@QueryParam("type") @Nullable
+ String instancePartitionsType) {
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
- if (tableType != TableType.REALTIME && (instancePartitionsType ==
InstancePartitionsType.OFFLINE
+ if (tableType != TableType.REALTIME &&
(InstancePartitionsType.OFFLINE.toString().equals(instancePartitionsType)
|| instancePartitionsType == null)) {
removeInstancePartitionsHelper(InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
}
if (tableType != TableType.OFFLINE) {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING ||
instancePartitionsType == null) {
+ if
(InstancePartitionsType.CONSUMING.toString().equals(instancePartitionsType)
+ || instancePartitionsType == null) {
removeInstancePartitionsHelper(InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED ||
instancePartitionsType == null) {
+ if
(InstancePartitionsType.COMPLETED.toString().equals(instancePartitionsType)
+ || instancePartitionsType == null) {
removeInstancePartitionsHelper(InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
}
}
+
+ List<TableConfig> tableConfigs =
Arrays.asList(_resourceManager.getRealtimeTableConfig(tableName),
+ _resourceManager.getOfflineTableConfig(tableName));
+
+ for (TableConfig tableConfig : tableConfigs) {
+ if (tableConfig != null &&
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if (instancePartitionsType == null ||
instancePartitionsType.equals(tierConfig.getName())) {
+ removeInstancePartitionsHelper(
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
+ tierConfig.getName()));
+ }
+ }
+ }
+ }
return new SuccessResponse("Instance partitions removed");
}
@@ -315,16 +397,16 @@ public class PinotInstanceAssignmentRestletResource {
@Path("/tables/{tableName}/replaceInstance")
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Replace an instance in the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> replaceInstance(
+ public Map<String, InstancePartitions> replaceInstance(
@ApiParam(value = "Name of the table") @PathParam("tableName") String
tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type")
@Nullable
- InstancePartitionsType instancePartitionsType,
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|tier name")
@QueryParam("type") @Nullable
+ String type,
@ApiParam(value = "Old instance to be replaced", required = true)
@QueryParam("oldInstanceId")
String oldInstanceId,
@ApiParam(value = "New instance to replace with", required = true)
@QueryParam("newInstanceId")
String newInstanceId) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
- getInstancePartitions(tableName, instancePartitionsType);
+ Map<String, InstancePartitions> instancePartitionsMap =
+ getInstancePartitions(tableName, type);
Iterator<InstancePartitions> iterator =
instancePartitionsMap.values().iterator();
while (iterator.hasNext()) {
InstancePartitions instancePartitions = iterator.next();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index cfa1cffe2d..e43b244893 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -683,7 +683,7 @@ public class PinotTableRestletResource {
});
return new RebalanceResult(RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates",
dryRunResult.getInstanceAssignment(),
- dryRunResult.getSegmentAssignment());
+ dryRunResult.getTierInstanceAssignment(),
dryRunResult.getSegmentAssignment());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 91dc57956f..48418f2bda 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -147,6 +147,7 @@ import org.apache.pinot.spi.config.table.TableStats;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
+import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
@@ -1734,10 +1735,10 @@ public class PinotHelixResourceManager {
}
}
+ InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
+ List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
if (!instancePartitionsTypesToAssign.isEmpty()) {
LOGGER.info("Assigning {} instances to table: {}",
instancePartitionsTypesToAssign, tableNameWithType);
- InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
- List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
for (InstancePartitionsType instancePartitionsType :
instancePartitionsTypesToAssign) {
boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
@@ -1757,6 +1758,26 @@ public class PinotHelixResourceManager {
}
}
}
+
+ // Process and persist tier config instancePartitions
+ if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+ && tableConfig.getInstanceAssignmentConfigMap() != null) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if
(tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName()))
{
+ if (override ||
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType,
tierConfig.getName()))
+ == null) {
+ LOGGER.info("Calculating instance partitions for tier: {}, table :
{}", tierConfig.getName(),
+ tableNameWithType);
+ InstancePartitions instancePartitions =
+ instanceAssignmentDriver.assignInstances(tierConfig.getName(),
instanceConfigs, null,
+
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
+ LOGGER.info("Persisting instance partitions: {}",
instancePartitions);
+ InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
+ }
+ }
+ }
+ }
}
public void updateUserConfig(UserConfig userConfig)
@@ -1909,6 +1930,10 @@ public class PinotHelixResourceManager {
InstancePartitionsUtils.removeInstancePartitions(_propertyStore,
offlineTableName);
LOGGER.info("Deleting table {}: Removed instance partitions",
offlineTableName);
+ // Remove tier instance partitions
+ InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore,
offlineTableName);
+ LOGGER.info("Deleting table {}: Removed tier instance partitions",
offlineTableName);
+
// Remove segment lineage
SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore,
offlineTableName);
LOGGER.info("Deleting table {}: Removed segment lineage",
offlineTableName);
@@ -1968,6 +1993,9 @@ public class PinotHelixResourceManager {
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
LOGGER.info("Deleting table {}: Removed instance partitions",
realtimeTableName);
+ InstancePartitionsUtils.removeTierInstancePartitions(_propertyStore,
rawTableName);
+ LOGGER.info("Deleting table {}: Removed tier instance partitions",
realtimeTableName);
+
// Remove segment lineage
SegmentLineageAccessHelper.deleteSegmentLineage(_propertyStore,
realtimeTableName);
LOGGER.info("Deleting table {}: Removed segment lineage",
realtimeTableName);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index ba73f7bd6d..7a5c901029 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
@@ -55,15 +56,31 @@ public class InstanceAssignmentDriver {
public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
- LOGGER.info("Starting {} instance assignment for table: {}",
instancePartitionsType, tableNameWithType);
-
InstanceAssignmentConfig assignmentConfig =
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
+ return getInstancePartitions(
+
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
+ assignmentConfig, instanceConfigs, existingInstancePartitions);
+ }
+
+ public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
+ return getInstancePartitions(
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
+ }
+
+ private InstancePartitions getInstancePartitions(String
instancePartitionsName,
+ InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions) {
+ String tableNameWithType = _tableConfig.getTableName();
+ LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
+
InstanceTagPoolSelector tagPoolSelector =
- new InstanceTagPoolSelector(assignmentConfig.getTagPoolConfig(),
tableNameWithType);
+ new
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(),
tableNameWithType);
Map<Integer, List<InstanceConfig>> poolToInstanceConfigsMap =
tagPoolSelector.selectInstances(instanceConfigs);
- InstanceConstraintConfig constraintConfig =
assignmentConfig.getConstraintConfig();
+ InstanceConstraintConfig constraintConfig =
instanceAssignmentConfig.getConstraintConfig();
List<InstanceConstraintApplier> constraintAppliers = new ArrayList<>();
if (constraintConfig == null) {
LOGGER.info("No instance constraint is configured, using default
hash-based-rotate instance constraint");
@@ -75,10 +92,9 @@ public class InstanceAssignmentDriver {
}
InstancePartitionSelector instancePartitionSelector =
-
InstancePartitionSelectorFactory.getInstance(assignmentConfig.getPartitionSelector(),
- assignmentConfig.getReplicaGroupPartitionConfig(),
tableNameWithType, existingInstancePartitions);
- InstancePartitions instancePartitions = new InstancePartitions(
-
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)));
+
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
+ instanceAssignmentConfig.getReplicaGroupPartitionConfig(),
tableNameWithType, existingInstancePartitions);
+ InstancePartitions instancePartitions = new
InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap,
instancePartitions);
return instancePartitions;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index a2d6d630d0..4234abc5d5 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -31,6 +31,7 @@ import
org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
public class RebalanceResult {
private final Status _status;
private final Map<InstancePartitionsType, InstancePartitions>
_instanceAssignment;
+ private final Map<String, InstancePartitions> _tierInstanceAssignment;
private final Map<String, Map<String, String>> _segmentAssignment;
private final String _description;
@@ -38,10 +39,12 @@ public class RebalanceResult {
public RebalanceResult(@JsonProperty(value = "status", required = true)
Status status,
@JsonProperty(value = "description", required = true) String description,
@JsonProperty("instanceAssignment") @Nullable
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
+ @JsonProperty("tierInstanceAssignment") @Nullable Map<String,
InstancePartitions> tierInstanceAssignment,
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String,
String>> segmentAssignment) {
_status = status;
_description = description;
_instanceAssignment = instanceAssignment;
+ _tierInstanceAssignment = tierInstanceAssignment;
_segmentAssignment = segmentAssignment;
}
@@ -60,6 +63,11 @@ public class RebalanceResult {
return _instanceAssignment;
}
+ @JsonProperty
+ public Map<String, InstancePartitions> getTierInstanceAssignment() {
+ return _tierInstanceAssignment;
+ }
+
@JsonProperty
public Map<String, Map<String, String>> getSegmentAssignment() {
return _segmentAssignment;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index b8257ed450..218193e210 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -163,13 +163,13 @@ public class TableRebalancer {
IngestionConfigUtils.getStreamConfigMap(tableConfig)).hasHighLevelConsumerType())
{
LOGGER.warn("Cannot rebalance table: {} with high-level consumer,
aborting the rebalance", tableNameWithType);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot
rebalance table with high-level consumer",
- null, null);
+ null, null, null);
}
} catch (Exception e) {
LOGGER.warn("Caught exception while validating table config for table:
{}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught
exception while validating table config: " + e,
- null, null);
+ null, null, null);
}
// Fetch ideal state
@@ -181,16 +181,17 @@ public class TableRebalancer {
LOGGER.warn("Caught exception while fetching IdealState for table: {},
aborting the rebalance", tableNameWithType,
e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught
exception while fetching IdealState: " + e,
- null, null);
+ null, null, null);
}
if (currentIdealState == null) {
LOGGER.warn("Cannot find the IdealState for table: {}, aborting the
rebalance", tableNameWithType);
- return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find
the IdealState for table", null, null);
+ return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot find
the IdealState for table", null, null,
+ null);
}
if (!currentIdealState.isEnabled() && !downtime) {
LOGGER.warn("Cannot rebalance disabled table: {} without downtime,
aborting the rebalance", tableNameWithType);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Cannot
rebalance disabled table without downtime",
- null, null);
+ null, null, null);
}
LOGGER.info("Fetching/computing instance partitions, reassigning instances
if configured for table: {}",
@@ -205,13 +206,13 @@ public class TableRebalancer {
"Caught exception while fetching/calculating instance partitions for
table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
- "Caught exception while fetching/calculating instance partitions: "
+ e, null, null);
+ "Caught exception while fetching/calculating instance partitions: "
+ e, null, null, null);
}
// Calculate instance partitions for tiers if configured
List<Tier> sortedTiers = getSortedTiers(tableConfig);
Map<String, InstancePartitions> tierToInstancePartitionsMap =
- getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun);
LOGGER.info("Calculating the target assignment for table: {}",
tableNameWithType);
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -224,7 +225,8 @@ public class TableRebalancer {
LOGGER.warn("Caught exception while calculating target assignment for
table: {}, aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
- "Caught exception while calculating target assignment: " + e,
instancePartitionsMap, null);
+ "Caught exception while calculating target assignment: " + e,
instancePartitionsMap,
+ tierToInstancePartitionsMap, null);
}
if (currentAssignment.equals(targetAssignment)) {
@@ -233,20 +235,21 @@ public class TableRebalancer {
if (dryRun) {
return new RebalanceResult(RebalanceResult.Status.DONE,
"Instance reassigned in dry-run mode, table is already
balanced", instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
} else {
return new RebalanceResult(RebalanceResult.Status.DONE, "Instance
reassigned, table is already balanced",
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
}
} else {
return new RebalanceResult(RebalanceResult.Status.NO_OP, "Table is
already balanced", instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
}
}
if (dryRun) {
LOGGER.info("Rebalancing table: {} in dry-run mode, returning the target
assignment", tableNameWithType);
- return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode",
instancePartitionsMap, targetAssignment);
+ return new RebalanceResult(RebalanceResult.Status.DONE, "Dry-run mode",
instancePartitionsMap,
+ tierToInstancePartitionsMap, targetAssignment);
}
if (downtime) {
@@ -267,12 +270,13 @@ public class TableRebalancer {
System.currentTimeMillis() - startTimeMs);
return new RebalanceResult(RebalanceResult.Status.DONE,
"Success with downtime (replaced IdealState with the target
segment assignment, ExternalView might not "
- + "reach the target segment assignment yet)",
instancePartitionsMap, targetAssignment);
+ + "reach the target segment assignment yet)",
instancePartitionsMap, tierToInstancePartitionsMap,
+ targetAssignment);
} catch (Exception e) {
LOGGER.warn("Caught exception while updating IdealState for table: {},
aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught
exception while updating IdealState: " + e,
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
}
}
@@ -295,7 +299,7 @@ public class TableRebalancer {
+ "replicas: {}, aborting the rebalance",
minReplicasToKeepUpForNoDowntime, tableNameWithType,
numReplicas);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Illegal min
available replicas config",
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
}
minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
} else {
@@ -321,7 +325,7 @@ public class TableRebalancer {
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
"Caught exception while waiting for ExternalView to converge: " +
e, instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
}
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
@@ -353,7 +357,8 @@ public class TableRebalancer {
try {
// Re-calculate the instance partitions in case the instance
configs changed during the rebalance
instancePartitionsMap = getInstancePartitionsMap(tableConfig,
reassignInstances, bootstrap, false);
- tierToInstancePartitionsMap =
getTierToInstancePartitionsMap(tableNameWithType, sortedTiers);
+ tierToInstancePartitionsMap =
+ getTierToInstancePartitionsMap(tableConfig, sortedTiers,
reassignInstances, bootstrap, dryRun);
targetAssignment =
segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap,
sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
@@ -362,7 +367,7 @@ public class TableRebalancer {
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED,
"Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- targetAssignment);
+ tierToInstancePartitionsMap, targetAssignment);
}
} else {
LOGGER.info(
@@ -384,7 +389,7 @@ public class TableRebalancer {
return new RebalanceResult(RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target
segment assignment)",
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
}
Map<String, Map<String, String>> nextAssignment =
@@ -412,7 +417,7 @@ public class TableRebalancer {
LOGGER.warn("Caught exception while updating IdealState for table: {},
aborting the rebalance",
tableNameWithType, e);
return new RebalanceResult(RebalanceResult.Status.FAILED, "Caught
exception while updating IdealState: " + e,
- instancePartitionsMap, targetAssignment);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment);
}
}
}
@@ -524,31 +529,73 @@ public class TableRebalancer {
}
@Nullable
- private Map<String, InstancePartitions>
getTierToInstancePartitionsMap(String tableNameWithType,
- @Nullable List<Tier> sortedTiers) {
+ private Map<String, InstancePartitions>
getTierToInstancePartitionsMap(TableConfig tableConfig,
+ @Nullable List<Tier> sortedTiers, boolean reassignInstances, boolean
bootstrap, boolean dryRun) {
if (sortedTiers == null) {
return null;
}
Map<String, InstancePartitions> tierToInstancePartitionsMap = new
HashMap<>();
for (Tier tier : sortedTiers) {
LOGGER.info("Fetching/computing instance partitions for tier: {} of
table: {}", tier.getName(),
- tableNameWithType);
- tierToInstancePartitionsMap.put(tier.getName(),
getInstancePartitionsForTier(tier, tableNameWithType));
+ tableConfig.getTableName());
+ tierToInstancePartitionsMap.put(tier.getName(),
+ getInstancePartitionsForTier(tableConfig, tier, reassignInstances,
bootstrap, dryRun));
}
return tierToInstancePartitionsMap;
}
/**
- * Creates a default instance assignment for the tier.
- * TODO: We only support default server-tag based assignment currently.
- * In next iteration, we will add InstanceAssignmentConfig to the
TierConfig and also support persisting of the
- * InstancePartitions to zk.
- * Then we'll be able to support replica group assignment while creating
InstancePartitions for tiers
+ * Computes the instance partitions for the given tier. If table's
instanceAssignmentConfigMap has an entry for the
+ * tier, it's used to calculate the instance partitions. Else default
instance partitions are returned
*/
- private InstancePartitions getInstancePartitionsForTier(Tier tier, String
tableNameWithType) {
+ private InstancePartitions getInstancePartitionsForTier(TableConfig
tableConfig, Tier tier, boolean reassignInstances,
+ boolean bootstrap, boolean dryRun) {
PinotServerTierStorage storage = (PinotServerTierStorage)
tier.getStorage();
- return
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager,
tableNameWithType,
- tier.getName(), storage.getServerTag());
+ InstancePartitions defaultInstancePartitions =
+
InstancePartitionsUtils.computeDefaultInstancePartitionsForTag(_helixManager,
tableConfig.getTableName(),
+ tier.getName(), storage.getServerTag());
+
+ if (tableConfig.getInstanceAssignmentConfigMap() == null ||
!tableConfig.getInstanceAssignmentConfigMap()
+ .containsKey(tier.getName())) {
+ LOGGER.info("Skipping fetching/computing instance partitions for tier {}
for table: {}", tier.getName(),
+ tableConfig.getTableName());
+ if (!dryRun) {
+ String instancePartitionsName =
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
tier.getName());
+ LOGGER.info("Removing instance partitions: {} from ZK if it exists",
instancePartitionsName);
+
InstancePartitionsUtils.removeInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitionsName);
+ }
+ return defaultInstancePartitions;
+ }
+
+ String tableNameWithType = tableConfig.getTableName();
+ String instancePartitionsName =
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableConfig.getTableName(),
tier.getName());
+ if (reassignInstances) {
+ // Set existing instance partition to null if bootstrap mode is enabled,
so that the instance partition
+ // map can be fully recalculated.
+ InstancePartitions existingInstancePartitions = bootstrap ? null
+ :
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+ instancePartitionsName);
+ InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(tier.getName(),
+
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
+ existingInstancePartitions,
tableConfig.getInstanceAssignmentConfigMap().get(tier.getName()));
+ if (!dryRun) {
+ LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
+
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
+ }
+ return instancePartitions;
+ }
+
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_helixManager.getHelixPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(tableNameWithType,
tier.getName()));
+ if (instancePartitions != null) {
+ return instancePartitions;
+ }
+
+ return defaultInstancePartitions;
}
private IdealState waitForExternalViewToConverge(String tableNameWithType,
boolean bestEfforts,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
index 38334bb25a..ef95da5135 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceStatelessTest.java
@@ -21,15 +21,19 @@ package org.apache.pinot.controller.api;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -59,6 +63,8 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
private static final String RAW_TABLE_NAME = "testTable";
private static final String TIME_COLUMN_NAME = "daysSinceEpoch";
+ private static final String TIER_NAME = "tier1";
+
@BeforeClass
public void setUp()
throws Exception {
@@ -114,13 +120,13 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME),
false, 0, null), null,
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
offlineTableConfig.setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
offlineInstanceAssignmentConfig));
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
offlineInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(offlineTableConfig);
// OFFLINE instance partitions should be generated
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
getInstancePartitionsMap();
+ Map<String, InstancePartitions> instancePartitionsMap =
getInstancePartitionsMap();
assertEquals(instancePartitionsMap.size(), 1);
- InstancePartitions offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ InstancePartitions offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
@@ -132,72 +138,108 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
new
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME),
false, 0, null), null,
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
realtimeTableConfig.setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.CONSUMING,
consumingInstanceAssignmentConfig));
+ Collections.singletonMap(InstancePartitionsType.CONSUMING.toString(),
consumingInstanceAssignmentConfig));
_helixResourceManager.setExistingTableConfig(realtimeTableConfig);
// CONSUMING instance partitions should be generated
instancePartitionsMap = getInstancePartitionsMap();
assertEquals(instancePartitionsMap.size(), 2);
- offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
assertEquals(offlineInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
- InstancePartitions consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ InstancePartitions consumingInstancePartitions =
+ instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
assertNotNull(consumingInstancePartitions);
assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
assertEquals(consumingInstancePartitions.getInstances(0, 0).size(), 1);
String consumingInstanceId = consumingInstancePartitions.getInstances(0,
0).get(0);
+ // Add tier config and tier instance assignment config to the offline
table config
+ offlineTableConfig.setTierConfigsList(Collections.singletonList(
+ new TierConfig(TIER_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE,
"7d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE,
TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME), null,
+ null)));
+ InstanceAssignmentConfig tierInstanceAssignmentConfig = new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false));
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap = new
HashMap<>();
+ instanceAssignmentConfigMap.put(InstancePartitionsType.OFFLINE.toString(),
offlineInstanceAssignmentConfig);
+ instanceAssignmentConfigMap.put(TIER_NAME, tierInstanceAssignmentConfig);
+
offlineTableConfig.setInstanceAssignmentConfigMap(instanceAssignmentConfigMap);
+ _helixResourceManager.setExistingTableConfig(offlineTableConfig);
+
+ // tier instance partitions should be generated
+ Map<String, InstancePartitions> tierInstancePartitionsMap =
getInstancePartitionsMap();
+ assertEquals(tierInstancePartitionsMap.size(), 3);
+ InstancePartitions tierInstancePartitions =
tierInstancePartitionsMap.get(TIER_NAME);
+ assertNotNull(tierInstancePartitions);
+ assertEquals(tierInstancePartitions.getNumReplicaGroups(), 1);
+ assertEquals(tierInstancePartitions.getNumPartitions(), 1);
+ assertEquals(tierInstancePartitions.getInstances(0, 0).size(), 1);
+
// Use OFFLINE instance assignment config as the COMPLETED instance
assignment config
- realtimeTableConfig.setInstanceAssignmentConfigMap(
- new TreeMap<InstancePartitionsType, InstanceAssignmentConfig>() {{
- put(InstancePartitionsType.CONSUMING,
consumingInstanceAssignmentConfig);
- put(InstancePartitionsType.COMPLETED,
offlineInstanceAssignmentConfig);
- }});
+ realtimeTableConfig.setInstanceAssignmentConfigMap(new TreeMap<String,
InstanceAssignmentConfig>() {{
+ put(InstancePartitionsType.CONSUMING.toString(),
consumingInstanceAssignmentConfig);
+ put(InstancePartitionsType.COMPLETED.toString(),
offlineInstanceAssignmentConfig);
+ }});
_helixResourceManager.setExistingTableConfig(realtimeTableConfig);
// COMPLETED instance partitions should be generated
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 3);
- offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ assertEquals(instancePartitionsMap.size(), 4);
+ offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
assertEquals(offlineInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
- consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
assertNotNull(consumingInstancePartitions);
assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
assertEquals(consumingInstancePartitions.getInstances(0, 0),
Collections.singletonList(consumingInstanceId));
- InstancePartitions completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+ InstancePartitions completedInstancePartitions =
+ instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(completedInstancePartitions.getNumPartitions(), 1);
assertEquals(completedInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
+ InstancePartitions tInstancePartitions =
instancePartitionsMap.get(TIER_NAME);
+ assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+ assertEquals(tInstancePartitions.getNumPartitions(), 1);
+ assertEquals(tInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
// Test fetching instance partitions by table name with type suffix
instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME),
null)));
- assertEquals(instancePartitionsMap.size(), 1);
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+ assertEquals(instancePartitionsMap.size(), 2);
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
null)));
assertEquals(instancePartitionsMap.size(), 2);
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// Test fetching instance partitions by table name and instance partitions
type
for (InstancePartitionsType instancePartitionsType :
InstancePartitionsType.values()) {
- instancePartitionsMap = deserializeInstancePartitionsMap(
-
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
instancePartitionsType)));
+ instancePartitionsMap = deserializeInstancePartitionsMap(sendGetRequest(
+ _controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
instancePartitionsType.toString())));
assertEquals(instancePartitionsMap.size(), 1);
-
assertEquals(instancePartitionsMap.get(instancePartitionsType).getInstancePartitionsName(),
+
assertEquals(instancePartitionsMap.get(instancePartitionsType.toString()).getInstancePartitionsName(),
instancePartitionsType.getInstancePartitionsName(RAW_TABLE_NAME));
}
+ // Test fetching instance partitions by table name and tier name
+ instancePartitionsMap = deserializeInstancePartitionsMap(
+
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
TIER_NAME)));
+ assertEquals(instancePartitionsMap.size(), 1);
+
assertEquals(instancePartitionsMap.get(TIER_NAME).getInstancePartitionsName(),
+
InstancePartitionsUtils.getInstancePartitionsNameForTier(RAW_TABLE_NAME,
TIER_NAME));
+
// Remove the instance partitions for both offline and real-time table
sendDeleteRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
null));
try {
@@ -210,21 +252,25 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Assign instances without instance partitions type (dry run)
instancePartitionsMap = deserializeInstancePartitionsMap(
sendPostRequest(_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME,
null, true), null));
- assertEquals(instancePartitionsMap.size(), 3);
- offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE);
+ assertEquals(instancePartitionsMap.size(), 4);
+ offlineInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString());
assertNotNull(offlineInstancePartitions);
assertEquals(offlineInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(offlineInstancePartitions.getNumPartitions(), 1);
assertEquals(offlineInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
- consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
+ consumingInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString());
assertNotNull(consumingInstancePartitions);
assertEquals(consumingInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(consumingInstancePartitions.getNumPartitions(), 1);
assertEquals(consumingInstancePartitions.getInstances(0, 0),
Collections.singletonList(consumingInstanceId));
- completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED);
+ completedInstancePartitions =
instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString());
assertEquals(completedInstancePartitions.getNumReplicaGroups(), 1);
assertEquals(completedInstancePartitions.getNumPartitions(), 1);
assertEquals(completedInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
+ tInstancePartitions = instancePartitionsMap.get(TIER_NAME);
+ assertEquals(tInstancePartitions.getNumReplicaGroups(), 1);
+ assertEquals(tInstancePartitions.getNumPartitions(), 1);
+ assertEquals(tInstancePartitions.getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
// Instance partitions should not be persisted
try {
@@ -239,34 +285,36 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
// Instance partitions should be persisted
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 3);
+ assertEquals(instancePartitionsMap.size(), 4);
// Remove the instance partitions for real-time table
sendDeleteRequest(
_controllerRequestURLBuilder.forInstancePartitions(TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME),
null));
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 1);
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
+ assertEquals(instancePartitionsMap.size(), 2);
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
// Assign instances for COMPLETED segments
instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
_controllerRequestURLBuilder.forInstanceAssign(RAW_TABLE_NAME,
InstancePartitionsType.COMPLETED, false), null));
assertEquals(instancePartitionsMap.size(), 1);
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// There should be OFFLINE and COMPLETED instance partitions persisted
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 2);
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE));
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+ assertEquals(instancePartitionsMap.size(), 3);
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.OFFLINE.toString()));
+ assertTrue(instancePartitionsMap.containsKey(TIER_NAME));
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// Replace OFFLINE instance with CONSUMING instance for COMPLETED instance
partitions
instancePartitionsMap = deserializeInstancePartitionsMap(sendPostRequest(
_controllerRequestURLBuilder.forInstanceReplace(RAW_TABLE_NAME,
InstancePartitionsType.COMPLETED,
offlineInstanceId, consumingInstanceId), null));
assertEquals(instancePartitionsMap.size(), 1);
-
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
0),
+
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0,
0),
Collections.singletonList(consumingInstanceId));
// Replace the instance again using real-time table name (old instance
does not exist)
@@ -284,26 +332,27 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
sendPutRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
null),
consumingInstancePartitions.toJsonString()));
assertEquals(instancePartitionsMap.size(), 1);
-
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
0),
+
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0,
0),
Collections.singletonList(consumingInstanceId));
// OFFLINE instance partitions should have OFFLINE instance, CONSUMING and
COMPLETED instance partitions should have
// CONSUMING instance
instancePartitionsMap = getInstancePartitionsMap();
- assertEquals(instancePartitionsMap.size(), 3);
-
assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE).getInstances(0,
0),
+ assertEquals(instancePartitionsMap.size(), 4);
+
assertEquals(instancePartitionsMap.get(InstancePartitionsType.OFFLINE.toString()).getInstances(0,
0),
Collections.singletonList(offlineInstanceId));
-
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING).getInstances(0,
0),
+ assertEquals(instancePartitionsMap.get(TIER_NAME).getInstances(0, 0),
Collections.singletonList(offlineInstanceId));
+
assertEquals(instancePartitionsMap.get(InstancePartitionsType.CONSUMING.toString()).getInstances(0,
0),
Collections.singletonList(consumingInstanceId));
-
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED).getInstances(0,
0),
+
assertEquals(instancePartitionsMap.get(InstancePartitionsType.COMPLETED.toString()).getInstances(0,
0),
Collections.singletonList(consumingInstanceId));
// Delete the offline table
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
instancePartitionsMap = getInstancePartitionsMap();
assertEquals(instancePartitionsMap.size(), 2);
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING));
-
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED));
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.CONSUMING.toString()));
+
assertTrue(instancePartitionsMap.containsKey(InstancePartitionsType.COMPLETED.toString()));
// Delete the real-time table
_helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
@@ -315,18 +364,16 @@ public class
PinotInstanceAssignmentRestletResourceStatelessTest extends Control
}
}
- private Map<InstancePartitionsType, InstancePartitions>
getInstancePartitionsMap()
+ private Map<String, InstancePartitions> getInstancePartitionsMap()
throws Exception {
return deserializeInstancePartitionsMap(
sendGetRequest(_controllerRequestURLBuilder.forInstancePartitions(RAW_TABLE_NAME,
null)));
}
- private Map<InstancePartitionsType, InstancePartitions>
deserializeInstancePartitionsMap(
- String instancePartitionsMapString)
+ private Map<String, InstancePartitions>
deserializeInstancePartitionsMap(String instancePartitionsMapString)
throws Exception {
- return JsonUtils.stringToObject(instancePartitionsMapString,
- new TypeReference<Map<InstancePartitionsType, InstancePartitions>>() {
- });
+ return JsonUtils.stringToObject(instancePartitionsMapString, new
TypeReference<Map<String, InstancePartitions>>() {
+ });
}
@AfterClass
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 4715271fc5..53bd2da317 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -352,7 +352,7 @@ public class InstanceAssignmentTest {
InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups, 0,
0, 0, false);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig))).build();
InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
@@ -405,7 +405,7 @@ public class InstanceAssignmentTest {
// Select all 3 pools in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 3 = 2
@@ -427,7 +427,7 @@ public class InstanceAssignmentTest {
// Select pool 0 and 1 in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0,
Arrays.asList(0, 1));
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -449,7 +449,7 @@ public class InstanceAssignmentTest {
// Assign instances from 2 pools to 3 replica-groups
numReplicaGroups = numPools;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0,
numReplicaGroups, 0, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 2 = 0
@@ -479,7 +479,7 @@ public class InstanceAssignmentTest {
numPools = 2;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0,
numReplicaGroups, 0, 0, 0, true);
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Reset the instance configs to have only two pools.
instanceConfigs.clear();
@@ -528,7 +528,7 @@ public class InstanceAssignmentTest {
// Select pool 0 and 1 in pool selection
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0,
Arrays.asList(0, 1));
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Get the latest existingInstancePartitions from last computation.
@@ -555,7 +555,7 @@ public class InstanceAssignmentTest {
// Assign instances from 2 pools to 3 replica-groups
numReplicaGroups = 3;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0,
numReplicaGroups, 0, 0, 0, true);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Get the latest existingInstancePartitions from last computation.
@@ -634,7 +634,7 @@ public class InstanceAssignmentTest {
// Reduce number of replica groups from 3 to 2.
numReplicaGroups = 2;
replicaPartitionConfig = new InstanceReplicaGroupPartitionConfig(true, 0,
numReplicaGroups, 0, 0, 0, true);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig)));
// Get the latest existingInstancePartitions from last computation.
@@ -761,7 +761,7 @@ public class InstanceAssignmentTest {
InstanceTagPoolConfig tagPoolConfig = new
InstanceTagPoolConfig(OFFLINE_TAG, false, 0, null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(false, 0, 0, 0, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// No instance with correct tag
@@ -791,7 +791,7 @@ public class InstanceAssignmentTest {
// Enable pool
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// No instance has correct pool configured
@@ -825,7 +825,7 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(0, 0), expectedInstances);
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 3, null);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Ask for too many pools
@@ -837,7 +837,7 @@ public class InstanceAssignmentTest {
}
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0,
Arrays.asList(0, 2));
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Ask for pool that does not exist
@@ -850,7 +850,7 @@ public class InstanceAssignmentTest {
tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, 0, null);
replicaGroupPartitionConfig = new
InstanceReplicaGroupPartitionConfig(false, 6, 0, 0, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Ask for too many instances
@@ -863,7 +863,7 @@ public class InstanceAssignmentTest {
// Enable replica-group
replicaGroupPartitionConfig = new
InstanceReplicaGroupPartitionConfig(true, 0, 0, 0, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Number of replica-groups must be positive
@@ -875,7 +875,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new
InstanceReplicaGroupPartitionConfig(true, 0, 11, 0, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Ask for too many replica-groups
@@ -888,7 +888,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new
InstanceReplicaGroupPartitionConfig(true, 0, 3, 3, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Ask for too many instances
@@ -900,7 +900,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new
InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 3, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Ask for too many instances per partition
@@ -913,7 +913,7 @@ public class InstanceAssignmentTest {
}
replicaGroupPartitionConfig = new
InstanceReplicaGroupPartitionConfig(true, 0, 3, 2, 0, 0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
// Math.abs("myTable_OFFLINE".hashCode()) % 5 = 3
@@ -952,7 +952,7 @@ public class InstanceAssignmentTest {
try {
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig, "ILLEGAL_SELECTOR"))).build();
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
@@ -979,7 +979,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, false);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1011,7 +1011,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, false);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1051,7 +1051,7 @@ public class InstanceAssignmentTest {
replicaPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, 0, 0, false);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1089,7 +1089,7 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
numInstancesPerPartition, false);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
@@ -1161,7 +1161,7 @@ public class InstanceAssignmentTest {
new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
numInstancesPerPartition, true);
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString()))).build();
driver = new InstanceAssignmentDriver(tableConfig);
@@ -1242,7 +1242,7 @@ public class InstanceAssignmentTest {
SegmentPartitionConfig segmentPartitionConfig = new SegmentPartitionConfig(
Collections.singletonMap(partitionColumnName, new
ColumnPartitionConfig("Modulo", numPartitionsSegment, null)));
tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setInstanceAssignmentConfigMap(
- Collections.singletonMap(InstancePartitionsType.OFFLINE,
+ Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.setReplicaGroupStrategyConfig(new
ReplicaGroupStrategyConfig(partitionColumnName, numInstancesPerReplicaGroup))
@@ -1316,7 +1316,7 @@ public class InstanceAssignmentTest {
new InstanceConstraintConfig(Arrays.asList("constraint1",
"constraint2"));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1372,7 +1372,7 @@ public class InstanceAssignmentTest {
instanceConstraintConfig = new
InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1439,7 +1439,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1505,7 +1505,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1576,7 +1576,7 @@ public class InstanceAssignmentTest {
instanceConstraintConfig = new
InstanceConstraintConfig(Arrays.asList("constraint1", "constraint2"));
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1627,7 +1627,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
@@ -1691,7 +1691,7 @@ public class InstanceAssignmentTest {
// Do not rotate pool sequence (for testing)
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME
+ TABLE_NAME_ZERO_HASH_COMPLEMENT)
-
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig,
instanceConstraintConfig, replicaPartitionConfig,
InstanceAssignmentConfig.PartitionSelector.FD_AWARE_INSTANCE_PARTITION_SELECTOR.toString())))
.build();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 4c006967d1..4b9e869dcf 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -196,7 +196,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
new InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant(null),
false, 0, null);
InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0,
0, false);
-
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
_helixResourceManager.updateTableConfig(tableConfig);
@@ -403,6 +403,131 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertTrue(instance.startsWith(expectedPrefix));
}
}
+ _helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
+ }
+
+ @Test
+ public void testRebalanceWithTiersAndInstanceAssignments()
+ throws Exception {
+ int numServers = 3;
+ for (int i = 0; i < numServers; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(
+ "replicaAssignment" + NO_TIER_NAME + "_" + SERVER_INSTANCE_ID_PREFIX
+ i, false);
+ }
+ _helixResourceManager.createServerTenant(
+ new Tenant(TenantRole.SERVER, "replicaAssignment" + NO_TIER_NAME,
numServers, numServers, 0));
+
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TIERED_TABLE_NAME).setNumReplicas(NUM_REPLICAS)
+ .setServerTenant("replicaAssignment" + NO_TIER_NAME).build();
+ // Create the table
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ long nowInDays = TimeUnit.MILLISECONDS.toDays(System.currentTimeMillis());
+
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TIERED_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithEndTimeInfo(TIERED_TABLE_NAME,
SEGMENT_NAME_PREFIX + i,
+ nowInDays), null);
+ }
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add 6 nodes tierA
+ for (int i = 0; i < 6; i++) {
+ addFakeServerInstanceToAutoJoinHelixCluster(
+ "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX
+ i, false);
+ }
+ _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER,
"replicaAssignment" + TIER_A_NAME, 6, 6, 0));
+ // rebalance is NOOP and no change in assignment caused by new instances
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // add tier config
+ tableConfig.setTierConfigsList(Lists.newArrayList(
+ new TierConfig(TIER_A_NAME, TierFactory.TIME_SEGMENT_SELECTOR_TYPE,
"0d", null,
+ TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" +
TIER_A_NAME + "_OFFLINE", null, null)));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ // rebalance should change assignment
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+ // check that segments have moved to tier a
+ Map<String, Map<String, String>> tierSegmentAssignment =
rebalanceResult.getSegmentAssignment();
+ for (Map.Entry<String, Map<String, String>> entry :
tierSegmentAssignment.entrySet()) {
+ Map<String, String> instanceStateMap = entry.getValue();
+ for (String instance : instanceStateMap.keySet()) {
+ assertTrue(instance.startsWith("replicaAssignment" + TIER_A_NAME + "_"
+ SERVER_INSTANCE_ID_PREFIX));
+ }
+ }
+
+ // Test rebalance with tier instance assignment
+ InstanceTagPoolConfig tagPoolConfig =
+ new
InstanceTagPoolConfig(TagNameUtils.getOfflineTagForTenant("replicaAssignment" +
TIER_A_NAME), false, 0,
+ null);
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, NUM_REPLICAS, 0, 0,
0, false);
+
tableConfig.setInstanceAssignmentConfigMap(Collections.singletonMap(TIER_A_NAME,
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig)));
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
BaseConfiguration());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
+
+ InstancePartitions instancePartitions =
rebalanceResult.getTierInstanceAssignment().get(TIER_A_NAME);
+
+ // Math.abs("testTable_OFFLINE".hashCode()) % 6 = 2
+ // [i2, i3, i4, i5, i0, i1]
+ // r0 r1 r2 r0 r1 r2
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" +
SERVER_INSTANCE_ID_PREFIX + 2,
+ "replicaAssignment" + TIER_A_NAME + "_" +
SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" +
SERVER_INSTANCE_ID_PREFIX + 3,
+ "replicaAssignment" + TIER_A_NAME + "_" +
SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList("replicaAssignment" + TIER_A_NAME + "_" +
SERVER_INSTANCE_ID_PREFIX + 4,
+ "replicaAssignment" + TIER_A_NAME + "_" +
SERVER_INSTANCE_ID_PREFIX + 1));
+
+ // The assignment are based on replica-group 0 and mirrored to all the
replica-groups, so server of index 0, 1, 5
+ // should have the same segments assigned, and server of index 2, 3, 4
should have the same segments assigned, each
+ // with 5 segments
+ Map<String, Map<String, String>> newSegmentAssignment =
rebalanceResult.getSegmentAssignment();
+ int numSegmentsOnServer0 = 0;
+ for (int i = 0; i < numSegments; i++) {
+ String segmentName = SEGMENT_NAME_PREFIX + i;
+ Map<String, String> instanceStateMap =
newSegmentAssignment.get(segmentName);
+ assertEquals(instanceStateMap.size(), NUM_REPLICAS);
+ if (instanceStateMap.containsKey("replicaAssignment" + TIER_A_NAME + "_"
+ SERVER_INSTANCE_ID_PREFIX + 0)) {
+ numSegmentsOnServer0++;
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME +
"_" + SERVER_INSTANCE_ID_PREFIX + 0),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME +
"_" + SERVER_INSTANCE_ID_PREFIX + 1),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME +
"_" + SERVER_INSTANCE_ID_PREFIX + 5),
+ ONLINE);
+ } else {
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME +
"_" + SERVER_INSTANCE_ID_PREFIX + 2),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME +
"_" + SERVER_INSTANCE_ID_PREFIX + 3),
+ ONLINE);
+ assertEquals(instanceStateMap.get("replicaAssignment" + TIER_A_NAME +
"_" + SERVER_INSTANCE_ID_PREFIX + 4),
+ ONLINE);
+ }
+ }
+ assertEquals(numSegmentsOnServer0, numSegments / 2);
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 046c0175e7..361999a881 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -595,7 +595,8 @@ public final class TableConfigUtils {
return;
}
for (InstancePartitionsType instancePartitionsType :
tableConfig.getInstancePartitionsMap().keySet()) {
-
Preconditions.checkState(!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType),
+ Preconditions.checkState(
+
!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()),
String.format("Both InstanceAssignmentConfigMap and
InstancePartitionsMap set for %s",
instancePartitionsType));
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 88a829d846..ee14ef2b4e 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1724,10 +1724,10 @@ public class TableConfigUtilsTest {
// Call validate with a table-config with instance partitions set but not
instance assignment config
TableConfigUtils.validateInstancePartitionsTypeMapConfig(tableConfigWithInstancePartitionsMap);
- TableConfig invalidTableConfig =
- new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE,
"test_OFFLINE"))
-
.setInstanceAssignmentConfigMap(ImmutableMap.of(InstancePartitionsType.OFFLINE,
instanceAssignmentConfig))
+ TableConfig invalidTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+
.setInstancePartitionsMap(ImmutableMap.of(InstancePartitionsType.OFFLINE,
"test_OFFLINE"))
+ .setInstanceAssignmentConfigMap(
+ ImmutableMap.of(InstancePartitionsType.OFFLINE.toString(),
instanceAssignmentConfig))
.build();
try {
// Call validate with instance partitions and config set for the same
type
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index bae7b7c798..91a54bc942 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -89,7 +89,7 @@ public class TableConfig extends BaseJsonConfig {
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
- private Map<InstancePartitionsType, InstanceAssignmentConfig>
_instanceAssignmentConfigMap;
+ private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
@JsonPropertyDescription(value = "Point to an existing instance partitions")
private Map<InstancePartitionsType, String> _instancePartitionsMap;
@@ -128,7 +128,7 @@ public class TableConfig extends BaseJsonConfig {
@JsonProperty(ROUTING_CONFIG_KEY) @Nullable RoutingConfig routingConfig,
@JsonProperty(QUERY_CONFIG_KEY) @Nullable QueryConfig queryConfig,
@JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY) @Nullable
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap,
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap,
@JsonProperty(FIELD_CONFIG_LIST_KEY) @Nullable List<FieldConfig>
fieldConfigList,
@JsonProperty(UPSERT_CONFIG_KEY) @Nullable UpsertConfig upsertConfig,
@JsonProperty(DEDUP_CONFIG_KEY) @Nullable DedupConfig dedupConfig,
@@ -267,12 +267,12 @@ public class TableConfig extends BaseJsonConfig {
@JsonProperty(INSTANCE_ASSIGNMENT_CONFIG_MAP_KEY)
@Nullable
- public Map<InstancePartitionsType, InstanceAssignmentConfig>
getInstanceAssignmentConfigMap() {
+ public Map<String, InstanceAssignmentConfig>
getInstanceAssignmentConfigMap() {
return _instanceAssignmentConfigMap;
}
public void setInstanceAssignmentConfigMap(
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap) {
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index a687a3eafd..2a00acc28f 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -401,7 +401,7 @@ public class ControllerRequestURLBuilder {
return url;
}
- public String forInstancePartitions(String tableName, @Nullable
InstancePartitionsType instancePartitionsType) {
+ public String forInstancePartitions(String tableName, @Nullable String
instancePartitionsType) {
String url = StringUtil.join("/", _baseUrl, "tables", tableName,
"instancePartitions");
if (instancePartitionsType != null) {
url += "?type=" + instancePartitionsType;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
index 4efa7db452..4916668eed 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/TableConfigBuilder.java
@@ -109,7 +109,7 @@ public class TableConfigBuilder {
private TableTaskConfig _taskConfig;
private RoutingConfig _routingConfig;
private QueryConfig _queryConfig;
- private Map<InstancePartitionsType, InstanceAssignmentConfig>
_instanceAssignmentConfigMap;
+ private Map<String, InstanceAssignmentConfig> _instanceAssignmentConfigMap;
private Map<InstancePartitionsType, String> _instancePartitionsMap;
private Map<String, SegmentAssignmentConfig> _segmentAssignmentConfigMap;
private List<FieldConfig> _fieldConfigList;
@@ -344,7 +344,7 @@ public class TableConfigBuilder {
}
public TableConfigBuilder setInstanceAssignmentConfigMap(
- Map<InstancePartitionsType, InstanceAssignmentConfig>
instanceAssignmentConfigMap) {
+ Map<String, InstanceAssignmentConfig> instanceAssignmentConfigMap) {
_instanceAssignmentConfigMap = instanceAssignmentConfigMap;
return this;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]