This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2e37f2c837 pre-configuration based assignment (#11578)
2e37f2c837 is described below
commit 2e37f2c837dd736ef94b0af4bc3f0d800f279f97
Author: Jia Guo <[email protected]>
AuthorDate: Mon Oct 30 09:06:39 2023 -0700
pre-configuration based assignment (#11578)
* add tenant partition map rest resource
add tenant partition map rest resource
fix logic
refactor and fix some logic
pre-configuration based assignment
* checkstyle
* checkstyle
* checkstyle
* Trigger Test
* Address comments
* refactored branches
* address comments
* address comments
* address comments
* Change the shuffling logic to be compatible with future changes
* add test cases
* address comments
* address comments
* address comments
* change table validation logic
* Trigger Test
* update shuffling logic
* add more logging
---
.../assignment/InstanceAssignmentConfigUtils.java | 10 +
.../PinotInstanceAssignmentRestletResource.java | 44 +-
.../api/resources/PinotTenantRestletResource.java | 82 +-
.../helix/core/PinotHelixResourceManager.java | 24 +-
.../instance/InstanceAssignmentDriver.java | 21 +-
.../instance/InstancePartitionSelectorFactory.java | 12 +-
.../MirrorServerSetInstancePartitionSelector.java | 366 ++++++++
.../helix/core/rebalance/TableRebalancer.java | 63 +-
.../instance/InstanceAssignmentTest.java | 964 ++++++++++++++++++++-
.../java/org/apache/pinot/core/auth/Actions.java | 2 +
.../segment/local/utils/TableConfigUtils.java | 24 +-
.../table/assignment/InstanceAssignmentConfig.java | 3 +-
12 files changed, 1559 insertions(+), 56 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
index b37429c527..30a3a19f20 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java
@@ -124,4 +124,14 @@ public class InstanceAssignmentConfigUtils {
return new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig);
}
+
+ public static boolean isMirrorServerSetAssignment(TableConfig tableConfig,
+ InstancePartitionsType instancePartitionsType) {
+ // If the instance assignment config is not null and the partition
selector is
+ // MIRROR_SERVER_SET_PARTITION_SELECTOR,
+ return
tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString())
!= null
+ &&
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig,
instancePartitionsType)
+ .getPartitionSelector()
+ ==
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
index 3eeb6665a4..282431e04b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java
@@ -69,7 +69,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+import static org.apache.pinot.spi.utils.CommonConstants.*;
@Api(tags = Constants.TABLE_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
@@ -244,20 +244,38 @@ public class PinotInstanceAssignmentRestletResource {
private void assignInstancesForInstancePartitionsType(Map<String,
InstancePartitions> instancePartitionsMap,
TableConfig tableConfig, List<InstanceConfig> instanceConfigs,
InstancePartitionsType instancePartitionsType) {
String tableNameWithType = tableConfig.getTableName();
- if (TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)) {
- String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ if (!TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType)) {
+ InstancePartitions existingInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
instancePartitionsType.toString()));
instancePartitionsMap.put(instancePartitionsType.toString(),
-
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
-
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
- instancePartitionsType.getInstancePartitionsName(rawTableName)));
- return;
- }
- InstancePartitions existingInstancePartitions =
-
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getHelixZkManager().getHelixPropertyStore(),
+ new
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType,
instanceConfigs,
+ existingInstancePartitions));
+ } else {
+ if
(InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType)) {
+ // fetch the existing instance partitions, if the table, this is
referenced in the new instance partitions
+ // generation for minimum difference
+ InstancePartitions existingInstancePartitions =
InstancePartitionsUtils.fetchInstancePartitions(
+ _resourceManager.getHelixZkManager().getHelixPropertyStore(),
InstancePartitionsUtils.getInstancePartitionsName(tableNameWithType,
instancePartitionsType.toString()));
- instancePartitionsMap.put(instancePartitionsType.toString(),
- new
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType,
instanceConfigs,
- existingInstancePartitions));
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ // fetch the pre-configured instance partitions, the renaming part is
irrelevant as we are not really
+ // preserving this preConfigured, but only using it as a reference to
generate the new instance partitions
+ InstancePartitions preConfigured =
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+
instancePartitionsType.getInstancePartitionsName(rawTableName));
+ instancePartitionsMap.put(instancePartitionsType.toString(),
+ new
InstanceAssignmentDriver(tableConfig).assignInstances(instancePartitionsType,
instanceConfigs,
+ existingInstancePartitions, preConfigured));
+ } else {
+ String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
+ instancePartitionsMap.put(instancePartitionsType.toString(),
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_resourceManager.getPropertyStore(),
+
tableConfig.getInstancePartitionsMap().get(instancePartitionsType),
+
instancePartitionsType.getInstancePartitionsName(rawTableName)));
+ }
+ }
}
private void assignInstancesForTier(Map<String, InstancePartitions>
instancePartitionsMap, TableConfig tableConfig,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
index 3aa5da48e9..8166427a93 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java
@@ -30,6 +30,7 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -50,6 +51,8 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -68,13 +71,14 @@ import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+import static org.apache.pinot.spi.utils.CommonConstants.*;
/**
@@ -286,6 +290,82 @@ public class PinotTenantRestletResource {
}
}
+ @GET
+ @Path("/tenants/{tenantName}/instancePartitions")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_INSTANCE_PARTITIONS)
+ @Authenticate(AccessType.READ)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the instance partitions of a tenant")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success",
response = InstancePartitions.class),
+ @ApiResponse(code = 404, message = "Instance partitions not found")})
+ public InstancePartitions getInstancePartitions(
+ @ApiParam(value = "Tenant name ", required = true)
@PathParam("tenantName") String tenantName,
+ @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)",
required = true,
+ allowableValues = "OFFLINE, CONSUMING, COMPLETED")
+ @QueryParam("instancePartitionType") String instancePartitionType) {
+ String tenantNameWithType =
InstancePartitionsType.valueOf(instancePartitionType)
+ .getInstancePartitionsName(tenantName);
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
+ tenantNameWithType);
+
+ if (instancePartitions == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find the instance partitions for %s",
tenantNameWithType),
+ Response.Status.NOT_FOUND);
+ } else {
+ return instancePartitions;
+ }
+ }
+
+ @PUT
+ @Path("/tenants/{tenantName}/instancePartitions")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_INSTANCE_PARTITIONS)
+ @Authenticate(AccessType.UPDATE)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update an instance partition for a server type in a
tenant")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success",
response = InstancePartitions.class),
+ @ApiResponse(code = 400, message = "Failed to deserialize/validate the
instance partitions"),
+ @ApiResponse(code = 500, message = "Error updating the tenant")})
+ public InstancePartitions assignInstancesPartitionMap(
+ @ApiParam(value = "Tenant name ", required = true)
@PathParam("tenantName") String tenantName,
+ @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)",
required = true,
+ allowableValues = "OFFLINE, CONSUMING, COMPLETED")
+ @QueryParam("instancePartitionType") String instancePartitionType,
+ String instancePartitionsStr) {
+ InstancePartitions instancePartitions;
+ try {
+ instancePartitions = JsonUtils.stringToObject(instancePartitionsStr,
InstancePartitions.class);
+ } catch (IOException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to deserialize
the instance partitions",
+ Response.Status.BAD_REQUEST);
+ }
+
+ String inputTenantName =
InstancePartitionsType.valueOf(instancePartitionType)
+ .getInstancePartitionsName(tenantName);
+
+ if
(!instancePartitions.getInstancePartitionsName().equals(inputTenantName)) {
+ throw new ControllerApplicationException(LOGGER, "Instance partitions
name mismatch, expected: "
+ + inputTenantName
+ + ", got: " + instancePartitions.getInstancePartitionsName(),
Response.Status.BAD_REQUEST);
+ }
+
+ persistInstancePartitionsHelper(instancePartitions);
+ return instancePartitions;
+ }
+
+ private void persistInstancePartitionsHelper(InstancePartitions
instancePartitions) {
+ try {
+ LOGGER.info("Persisting instance partitions: {}", instancePartitions);
+
InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
+ instancePartitions);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, "Caught Exception while
persisting the instance partitions",
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
private String getTablesServedFromServerTenant(String tenantName) {
Set<String> tables = new HashSet<>();
ObjectNode resourceGetRet = JsonUtils.newObjectNode();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 9df5835216..8e1ff52e13 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1753,20 +1753,30 @@ public class PinotHelixResourceManager {
for (InstancePartitionsType instancePartitionsType :
instancePartitionsTypesToAssign) {
boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
+ boolean isPreConfigurationBasedAssignment =
+
InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType);
InstancePartitions instancePartitions;
if (!hasPreConfiguredInstancePartitions) {
instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
instanceConfigs, null);
LOGGER.info("Persisting instance partitions: {}",
instancePartitions);
- InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
} else {
String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
- instancePartitions =
-
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
referenceInstancePartitionsName,
-
instancePartitionsType.getInstancePartitionsName(rawTableName));
- LOGGER.info("Persisting instance partitions: {} (referencing {})",
instancePartitions,
- referenceInstancePartitionsName);
- InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
+ if (isPreConfigurationBasedAssignment) {
+ InstancePartitions preConfiguredInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
+ referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
+ instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
instanceConfigs, null,
+ preConfiguredInstancePartitions);
+ LOGGER.info("Persisting instance partitions: {} (based on {})",
instancePartitions,
+ preConfiguredInstancePartitions);
+ } else {
+ instancePartitions =
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_propertyStore,
+ referenceInstancePartitionsName,
instancePartitionsType.getInstancePartitionsName(rawTableName));
+ LOGGER.info("Persisting instance partitions: {} (referencing {})",
instancePartitions,
+ referenceInstancePartitionsName);
+ }
}
+ InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index 7a5c901029..6d869b86c1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -60,19 +60,31 @@ public class InstanceAssignmentDriver {
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
return getInstancePartitions(
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
- assignmentConfig, instanceConfigs, existingInstancePartitions);
+ assignmentConfig, instanceConfigs, existingInstancePartitions, null);
+ }
+
+ public InstancePartitions assignInstances(InstancePartitionsType
instancePartitionsType,
+ List<InstanceConfig> instanceConfigs, @Nullable InstancePartitions
existingInstancePartitions, @Nullable
+ InstancePartitions preConfiguredInstancePartitions) {
+ String tableNameWithType = _tableConfig.getTableName();
+ InstanceAssignmentConfig assignmentConfig =
+
InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(_tableConfig,
instancePartitionsType);
+ return getInstancePartitions(
+
instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)),
+ assignmentConfig, instanceConfigs, existingInstancePartitions,
preConfiguredInstancePartitions);
}
public InstancePartitions assignInstances(String tierName,
List<InstanceConfig> instanceConfigs,
@Nullable InstancePartitions existingInstancePartitions,
InstanceAssignmentConfig instanceAssignmentConfig) {
return getInstancePartitions(
InstancePartitionsUtils.getInstancePartitionsNameForTier(_tableConfig.getTableName(),
tierName),
- instanceAssignmentConfig, instanceConfigs, existingInstancePartitions);
+ instanceAssignmentConfig, instanceConfigs, existingInstancePartitions,
null);
}
private InstancePartitions getInstancePartitions(String
instancePartitionsName,
InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
- @Nullable InstancePartitions existingInstancePartitions) {
+ @Nullable InstancePartitions existingInstancePartitions,
+ @Nullable InstancePartitions preConfiguredInstancePartitions) {
String tableNameWithType = _tableConfig.getTableName();
LOGGER.info("Starting {} instance assignment for table {}",
instancePartitionsName, tableNameWithType);
@@ -93,7 +105,8 @@ public class InstanceAssignmentDriver {
InstancePartitionSelector instancePartitionSelector =
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
- instanceAssignmentConfig.getReplicaGroupPartitionConfig(),
tableNameWithType, existingInstancePartitions);
+ instanceAssignmentConfig.getReplicaGroupPartitionConfig(),
tableNameWithType, existingInstancePartitions,
+ preConfiguredInstancePartitions);
InstancePartitions instancePartitions = new
InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap,
instancePartitions);
return instancePartitions;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index f786ffe0b4..256aa89b02 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -31,7 +31,14 @@ public class InstancePartitionSelectorFactory {
public static InstancePartitionSelector
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig,
String tableNameWithType,
- InstancePartitions existingInstancePartitions
+ InstancePartitions existingInstancePartitions) {
+ return getInstance(partitionSelector, instanceReplicaGroupPartitionConfig,
tableNameWithType,
+ existingInstancePartitions, null);
+ }
+
+ public static InstancePartitionSelector
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
+ InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig,
String tableNameWithType,
+ InstancePartitions existingInstancePartitions, InstancePartitions
preConfiguredInstancePartitions
) {
switch (partitionSelector) {
case FD_AWARE_INSTANCE_PARTITION_SELECTOR:
@@ -40,6 +47,9 @@ public class InstancePartitionSelectorFactory {
case INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR:
return new
InstanceReplicaGroupPartitionSelector(instanceReplicaGroupPartitionConfig,
tableNameWithType,
existingInstancePartitions);
+ case MIRROR_SERVER_SET_PARTITION_SELECTOR:
+ return new
MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig,
tableNameWithType,
+ existingInstancePartitions, preConfiguredInstancePartitions);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " +
partitionSelector + ", should be from"
+
Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
new file mode 100644
index 0000000000..6b4086615a
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java
@@ -0,0 +1,366 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+
+ // _numTargetReplicaGroups should be positive
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+
+ // validate target partition count is 1
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+
+ // Validate the existing instance partitions is null or has only one
partition
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumPartitions());
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support replica group level partitioning for
existing assignment");
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}",
+ _preConfiguredInstancePartitions.getNumPartitions());
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ LOGGER.info("Number of instances per replica-group in pre-configured
instance partitions: {}, target number of "
+ + "instances per replica-group: {}",
_numPreConfiguredInstancesPerReplicaGroup,
+ _numTargetInstancesPerReplicaGroup);
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
+ + "instance partitions is less than the target number of instances
per replica-group %s",
+ _numTargetInstancesPerReplicaGroup);
+
+ // Validate the pool to instance configs map is not null or empty
+ Preconditions.checkNotNull(poolToInstanceConfigsMap,
"poolToInstanceConfigsMap is null");
+ int numPools = poolToInstanceConfigsMap.size();
+ Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+ Integer totalInstanceCount =
poolToInstanceConfigsMap.values().stream().map(List::size)
+ .reduce(Integer::sum).orElse(0);
+ LOGGER.info("Total number of instances in all pools: {}, target number of
instances: {}", totalInstanceCount,
+ _numTargetTotalInstances);
+ Preconditions.checkState(totalInstanceCount
+ >= _numTargetTotalInstances,
+ "The total number of instances in all pools is less than the target
number of target instances");
+
+ HashSet<String> availableInstanceSet = new HashSet<>();
+ poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i ->
availableInstanceSet.add(i.getInstanceName())));
+ LOGGER.info("Number of pools: {}", numPools);
+ LOGGER.info("Number of instances in all pools: {}",
availableInstanceSet.size());
+ LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ List<String> instances =
_preConfiguredInstancePartitions.getInstances(0, i);
+ for (String instance : instances) {
+ Preconditions.checkState(availableInstanceSet.contains(instance),
+ "Instance %s in pre-configured instance partitions is not in "
+ + "the pool to instance configs map",
+ instance);
+ }
+ }
+
+ LOGGER.info("Validation passed. The instances provided can satisfy the
pool diverse requirement.");
+ LOGGER.info("Trying to assign total {} instances to {} replica groups, " +
"with {} instance per replica group",
+ _numTargetTotalInstances, _numTargetReplicaGroups,
_numTargetInstancesPerReplicaGroup);
+ }
+
+ private void createMirrorServerListFromPreconfiguredInstancePartition() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ List<String> mirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+ }
+ _preConfiguredMirroredServerLists.add(mirroredServerList);
+ }
+ }
+
+ private void
createMirrorServerListLookupTablesFromPreconfiguredInstancePartition() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ String instance = preConfiguredReplicaGroups.get(i).get(j);
+ _preConfiguredInstanceNameToOffsetMap.put(instance, j);
+ }
+ }
+ }
+
+ @Override
+ public void selectInstances(Map<Integer, List<InstanceConfig>>
poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions) {
+ // throw exception instantly if not replica-group based
+ if (!_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+ throw new IllegalStateException("Does not support Non-replica-group
based selection");
+ }
+
+ validatePoolDiversePreconditions(poolToInstanceConfigsMap);
+ if (_existingInstancePartitions == null) {
+ // If no existing instance partitions, create new instance partitions
based on the pre-configured instance
+ // partitions. This is done by just selecting
_targetNumInstancesPerReplicaGroup set of mirrored servers
+ // from the pre-configured instance partitions.
+ initialAssignment(instancePartitions);
+ } else {
+ // If existing instance partitions exist, adjust the existing instance
partitions based on the pre-configured
+ // instance partitions. This code path takes care of instance
replacement, uplift, and downlift.
+ // This is done by search in the pre-configured instance partitions for
the mirrored
+ // servers sets that are similar to the existing sets in instance
partitions.
+ scale(instancePartitions);
+ }
+ }
+
+ private void initialAssignment(InstancePartitions instancePartitions) {
+ LOGGER.info("No existing instance partitions found. Will build new on top
of"
+ + " the pre-configured instance partitions");
+ // create a list of lists of mirrored servers from the pre-configured
instance partitions
+ createMirrorServerListFromPreconfiguredInstancePartition();
+ // shuffle the list of lists of mirrored servers based on the table name
hash
+ int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+ // initialize a list of indices from 0 to
_numPreConfiguredInstancesPerReplicaGroup
+ List<Integer> shuffledIndex = new
ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup);
+ for (int i = 0; i < _numPreConfiguredInstancesPerReplicaGroup; i++) {
+ shuffledIndex.add(i);
+ }
+ // shuffle the list of indices based on the table name hash
+ Collections.shuffle(shuffledIndex, new Random(tableNameHash));
+ // select the first _numTargetInstancesPerReplicaGroup indices
+ shuffledIndex = shuffledIndex.subList(0,
_numTargetInstancesPerReplicaGroup);
+ // sort the list of indices so that they follow the original order of the
pre-configured instance partitions
+ shuffledIndex.sort(Comparator.naturalOrder());
+
+ // create the instance partitions based on the shuffled list of mirrored
servers
+ List<List<String>> resultReplicaGroups = new
ArrayList<>(_numTargetReplicaGroups);
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ resultReplicaGroups.add(new
ArrayList<>(_numTargetInstancesPerReplicaGroup));
+ }
+
+ // populate the instance partitions with the selected mirrored servers
+ for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+
resultReplicaGroups.get(i).add(_preConfiguredMirroredServerLists.get(shuffledIndex.get(j)).get(i));
+ }
+ }
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ instancePartitions.setInstances(0, i, resultReplicaGroups.get(i));
+ }
+ }
+
+ private void scale(InstancePartitions instancePartitions) {
+ LOGGER.info("Existing instance partitions found. Will adjust the existing
instance partitions"
+ + " based on the pre-configured instance partitions");
+ createMirrorServerListFromPreconfiguredInstancePartition();
+ createMirrorServerListLookupTablesFromPreconfiguredInstancePartition();
+ createListAndLookupTablesFromExistingInstancePartitions();
+ Set<Integer> usedPreconfiguredInstanceOffsets = new HashSet<>();
+ Map<Integer, Map.Entry<Integer, Long>> existingOffsetToResultTuple = new
HashMap<>();
+
+ // For each instance offset, find the mirrored server that is most similar
to the existing mirrored server
+ // set. If this mirrored server is not used, add it to the result list.
+ for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) {
+ List<String> existingMirroredServers =
_existingMirroredServerLists.get(j);
+ int finalJ = j;
+ existingMirroredServers.stream()
+ .map(_preConfiguredInstanceNameToOffsetMap::get)
+ .filter(Objects::nonNull)
+ .filter(offset -> !usedPreconfiguredInstanceOffsets.contains(offset))
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()))
+ .entrySet()
+ .stream()
+ .max(Map.Entry.comparingByValue())
+ .ifPresent(e -> {
+ existingOffsetToResultTuple.put(finalJ, e);
+ usedPreconfiguredInstanceOffsets.add(e.getKey());
+ });
+ }
+
+ if (_numExistingInstancesPerReplicaGroup >
_numTargetInstancesPerReplicaGroup) {
+ // If this is a downlift case
+ List<Map.Entry<Integer, Long>> collect =
existingOffsetToResultTuple.values()
+ .stream()
+ .sorted((a, b) -> b.getValue().compareTo(a.getValue()))
+ .limit(_numTargetInstancesPerReplicaGroup)
+ .collect(Collectors.toList());
+ int size = collect.size();
+ existingOffsetToResultTuple.clear();
+ usedPreconfiguredInstanceOffsets.clear();
+ for (int j = 0; j < size; j++) {
+ existingOffsetToResultTuple.put(j, collect.get(j));
+ usedPreconfiguredInstanceOffsets.add(collect.get(j).getKey());
+ }
+ }
+
+ if (existingOffsetToResultTuple.size() <
_numTargetInstancesPerReplicaGroup) {
+ // If the number of instances selected from the result list is less than
the target number
+ // of instances per replica group, add the remaining instances from the
pre-configured instance partitions.
+ List<Integer> shuffledOffsets = new
ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup);
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ shuffledOffsets.add(j);
+ }
+ for (Map.Entry<Integer, Map.Entry<Integer, Long>> entry :
existingOffsetToResultTuple.entrySet()) {
+ shuffledOffsets.remove(entry.getValue().getKey());
+ }
+ Collections.shuffle(shuffledOffsets, new
Random(Math.abs(_tableNameWithType.hashCode())));
+ shuffledOffsets =
+ shuffledOffsets.subList(0, _numTargetInstancesPerReplicaGroup -
existingOffsetToResultTuple.size());
+ shuffledOffsets.sort(Comparator.naturalOrder());
+ for (int k = 0, j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+ if (existingOffsetToResultTuple.containsKey(j)) {
+ continue;
+ }
+ Integer offset = shuffledOffsets.get(k++);
+ existingOffsetToResultTuple.put(j, new
AbstractMap.SimpleEntry<>(offset, 0L));
+ usedPreconfiguredInstanceOffsets.add(offset);
+ }
+ }
+
+ List<List<String>> resultReplicaGroups = new
ArrayList<>(_numTargetReplicaGroups);
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ resultReplicaGroups.add(new
ArrayList<>(_numTargetInstancesPerReplicaGroup));
+ }
+ for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+ List<String> mirrorServers =
_preConfiguredMirroredServerLists.get(existingOffsetToResultTuple.get(j).getKey());
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ resultReplicaGroups.get(i).add(mirrorServers.get(i));
+ }
+ }
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ instancePartitions.setInstances(0, i, resultReplicaGroups.get(i));
+ }
+ }
+
+ private void createListAndLookupTablesFromExistingInstancePartitions() {
+ List<List<String>> existingReplicaGroups = new
ArrayList<>(_numExistingReplicaGroups);
+ for (int i = 0; i < _numExistingReplicaGroups; i++) {
+ existingReplicaGroups.add(_existingInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) {
+ List<String> existingMirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numExistingReplicaGroups; i++) {
+ existingMirroredServerList.add(existingReplicaGroups.get(i).get(j));
+ }
+ _existingMirroredServerLists.add(existingMirroredServerList);
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index a27c4ffbfb..a052794ae2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -557,31 +557,52 @@ public class TableRebalancer {
if (InstanceAssignmentConfigUtils.allowInstanceAssignment(tableConfig,
instancePartitionsType)) {
boolean hasPreConfiguredInstancePartitions =
TableConfigUtils.hasPreConfiguredInstancePartitions(tableConfig,
instancePartitionsType);
- if (hasPreConfiguredInstancePartitions) {
- String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
- InstancePartitions instancePartitions =
-
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
- referenceInstancePartitionsName, instancePartitionsName);
- boolean instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
+ boolean isPreConfigurationBasedAssignment =
+
InstanceAssignmentConfigUtils.isMirrorServerSetAssignment(tableConfig,
instancePartitionsType);
+ InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions instancePartitions;
+ boolean instancePartitionsUnchanged;
+ if (!hasPreConfiguredInstancePartitions) {
+ LOGGER.info("Reassigning {} instances for table: {}",
instancePartitionsType, tableNameWithType);
+ // Assign instances with existing instance partition to null if
bootstrap mode is enabled, so that the
+ // instance partition map can be fully recalculated.
+ instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
+
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
+ bootstrap ? null : existingInstancePartitions);
+ instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
if (!dryRun && !instancePartitionsUnchanged) {
- LOGGER.info("Persisting instance partitions: {} (referencing {})",
instancePartitions,
- referenceInstancePartitionsName);
+ LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
}
- return Pair.of(instancePartitions, instancePartitionsUnchanged);
- }
- LOGGER.info("Reassigning {} instances for table: {}",
instancePartitionsType, tableNameWithType);
- InstanceAssignmentDriver instanceAssignmentDriver = new
InstanceAssignmentDriver(tableConfig);
- // Assign instances with existing instance partition to null if
bootstrap mode is enabled, so that the instance
- // partition map can be fully recalculated.
- InstancePartitions instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
-
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
- bootstrap ? null : existingInstancePartitions);
- boolean instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
- if (!dryRun && !instancePartitionsUnchanged) {
- LOGGER.info("Persisting instance partitions: {} to ZK",
instancePartitions);
-
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
instancePartitions);
+ } else {
+ String referenceInstancePartitionsName =
tableConfig.getInstancePartitionsMap().get(instancePartitionsType);
+ if (isPreConfigurationBasedAssignment) {
+ InstancePartitions preConfiguredInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+ referenceInstancePartitionsName, instancePartitionsName);
+ instancePartitions =
instanceAssignmentDriver.assignInstances(instancePartitionsType,
+
_helixDataAccessor.getChildValues(_helixDataAccessor.keyBuilder().instanceConfigs(),
true),
+ bootstrap ? null : existingInstancePartitions,
preConfiguredInstancePartitions);
+ instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
+ LOGGER.info("Persisting instance partitions: {} (based on {})",
instancePartitions,
+ preConfiguredInstancePartitions);
+
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+ instancePartitions);
+ }
+ } else {
+ instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitionsWithRename(_helixManager.getHelixPropertyStore(),
+ referenceInstancePartitionsName, instancePartitionsName);
+ instancePartitionsUnchanged =
instancePartitions.equals(existingInstancePartitions);
+ if (!dryRun && !instancePartitionsUnchanged) {
+ LOGGER.info("Persisting instance partitions: {} (referencing
{})", instancePartitions,
+ referenceInstancePartitionsName);
+
InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(),
+ instancePartitions);
+ }
+ }
}
return Pair.of(instancePartitions, instancePartitionsUnchanged);
} else {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 4335d80b14..b25a529e10 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -18,12 +18,20 @@
*/
package org.apache.pinot.controller.helix.core.assignment.instance;
+import java.io.FileNotFoundException;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
+import java.util.Set;
import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.utils.config.InstanceUtils;
@@ -42,9 +50,7 @@ import
org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
public class InstanceAssignmentTest {
@@ -54,6 +60,7 @@ public class InstanceAssignmentTest {
private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
private static final String SERVER_INSTANCE_POOL_PREFIX = "_pool_";
private static final String TABLE_NAME_ZERO_HASH_COMPLEMENT = "12";
+ public static final Logger LOGGER =
LogManager.getLogger(InstanceAssignmentTest.class);
@Test
public void testDefaultOfflineReplicaGroup() {
@@ -329,6 +336,957 @@ public class InstanceAssignmentTest {
Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 11,
SERVER_INSTANCE_ID_PREFIX + 0));
}
+ public void testMirrorServerSetBasedRandom() throws FileNotFoundException {
+ testMirrorServerSetBasedRandomInner(10000000);
+ }
+
+ public void testMirrorServerSetBasedRandomInner(int loopCount) throws
FileNotFoundException {
+ PrintStream o = new PrintStream("output.txt");
+ System.setOut(o);
+ for (int iter = 0; iter < loopCount; iter++) {
+
System.out.printf("_____________________________ITERATION:%d________________________________%n",
iter);
+ Random random1 = new Random();
+ int numTargetReplicaGroups = random1.nextInt(7) + 1;
+ int numExistingReplicaGroups = random1.nextInt(7) + 1;
+ int numPreConfiguredInstancesPerReplicaGroup = random1.nextInt(10) + 5;
+ int numTargetInstancesPerReplicaGroup =
Math.max(random1.nextInt(numPreConfiguredInstancesPerReplicaGroup), 5);
+ int numExistingInstancesPerReplicaGroup =
Math.max(random1.nextInt(numPreConfiguredInstancesPerReplicaGroup), 5);
+ int numPools = random1.nextInt(10) + 1;
+
+ int numPartitions = 0;
+ int numInstancesPerPartition = 0;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>();
+
+ int preConfiguredOffsetStart = random1.nextInt(10);
+ for (int i = 0; i < 1000; i++) {
+ int pool = i % numPools;
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY,
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+ InstanceTagPoolConfig tagPoolConfig = new
InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0,
numTargetReplicaGroups, numTargetInstancesPerReplicaGroup,
+ numPartitions, numInstancesPerPartition, false, null);
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured"))
+ .build();
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions preConfigured = new
InstancePartitions("preConfigured");
+ InstancePartitions existing = new InstancePartitions("existing");
+
+ List<String> preconfiguredInstances = new LinkedList<>();
+ List<String> existingInstances = new LinkedList<>();
+
+ Set<Integer> preConfiguredUsed = new HashSet<>();
+ Set<Integer> existingUsed = new HashSet<>();
+
+ for (int i = 0; i < numTargetReplicaGroups; i++) {
+ for (int j = 0; j < numPreConfiguredInstancesPerReplicaGroup; j++) {
+ int instance =
+ random1.nextInt((int) (1.5 * numTargetReplicaGroups *
numPreConfiguredInstancesPerReplicaGroup));
+ while (preConfiguredUsed.contains(instance)) {
+ instance = random1.nextInt((int) (1.5 * numTargetReplicaGroups *
numPreConfiguredInstancesPerReplicaGroup));
+ }
+ preConfiguredUsed.add(instance);
+ preconfiguredInstances.add(SERVER_INSTANCE_ID_PREFIX + (instance +
preConfiguredOffsetStart));
+ }
+ }
+
+ for (int i = 0; i < numExistingReplicaGroups; i++) {
+ for (int j = 0; j < numExistingInstancesPerReplicaGroup; j++) {
+ int instance = random1.nextInt((int) (1.5 * numExistingReplicaGroups
* numExistingInstancesPerReplicaGroup));
+ while (existingUsed.contains(instance)) {
+ instance = random1.nextInt((int) (1.5 * numExistingReplicaGroups *
numExistingInstancesPerReplicaGroup));
+ }
+ existingUsed.add(instance);
+ existingInstances.add(SERVER_INSTANCE_ID_PREFIX + instance);
+ }
+ }
+
+ Collections.shuffle(preconfiguredInstances);
+ Collections.shuffle(existingInstances);
+
+ for (int i = 0; i < numTargetReplicaGroups; i++) {
+ preConfigured.setInstances(0, i, preconfiguredInstances.subList(i *
numPreConfiguredInstancesPerReplicaGroup,
+ (i + 1) * numPreConfiguredInstancesPerReplicaGroup));
+ }
+
+ for (int i = 0; i < numExistingReplicaGroups; i++) {
+ existing.setInstances(0, i, existingInstances.subList(i *
numExistingInstancesPerReplicaGroup,
+ (i + 1) * numExistingInstancesPerReplicaGroup));
+ }
+
+ System.out.println("Done initializing preconfigured and existing
instances");
+ System.out.println("numTargetReplicaGroups " + numTargetReplicaGroups);
+ System.out.println("numPreConfiguredInstancesPerReplicaGroup " +
numPreConfiguredInstancesPerReplicaGroup);
+ System.out.println("numTargetInstancesPerReplicaGroup " +
numTargetInstancesPerReplicaGroup);
+
+ System.out.println("numExistingReplicaGroups " +
numExistingReplicaGroups);
+ System.out.println("numExistingInstancesPerReplicaGroup " +
numExistingInstancesPerReplicaGroup);
+ System.out.println("");
+ for (int i = 0; i < numTargetReplicaGroups; i++) {
+ System.out.println("Preconfigured instances for replica group " + i +
" : " + preConfigured.getInstances(0, i));
+ }
+ System.out.println("");
+ for (int i = 0; i < numExistingReplicaGroups; i++) {
+ System.out.println("Existing instances for replica group " + i + " : "
+ existing.getInstances(0, i));
+ }
+ System.out.println("");
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existing, preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(),
numTargetReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+
+ for (int i = 0; i < numTargetReplicaGroups; i++) {
+ System.out.println("Assigned instances for replica group " + i + " : "
+ instancePartitions.getInstances(0, i));
+ }
+ }
+ }
+
+ @Test
+ public void testMirrorServerSetBased() {
+ LogManager.getLogger(MirrorServerSetInstancePartitionSelector.class)
+ .setLevel(Level.INFO);
+
+ // Test initial assignment 3 replica groups, 7 instances per rg.
+ int numPartitions = 0;
+ int numInstancesPerPartition = 0;
+ int numInstances = 21;
+ int numPools = 5;
+ int numReplicaGroups = 3;
+ int numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < 100; i++) {
+ int pool = i % numPools;
+ InstanceConfig instanceConfig =
+ new InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(OFFLINE_TAG);
+ instanceConfig.getRecord()
+ .setMapField(InstanceUtils.POOL_KEY,
Collections.singletonMap(OFFLINE_TAG, Integer.toString(pool)));
+ instanceConfigs.add(instanceConfig);
+ }
+ InstanceTagPoolConfig tagPoolConfig = new
InstanceTagPoolConfig(OFFLINE_TAG, true, numPools, null);
+ InstanceReplicaGroupPartitionConfig replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12,
SERVER_INSTANCE_ID_PREFIX + 15,
+ SERVER_INSTANCE_ID_PREFIX + 18));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13,
SERVER_INSTANCE_ID_PREFIX + 16,
+ SERVER_INSTANCE_ID_PREFIX + 19));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14,
SERVER_INSTANCE_ID_PREFIX + 17,
+ SERVER_INSTANCE_ID_PREFIX + 20));
+
+ InstancePartitions instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, null, preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * Pre-configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 3 4 5
+ * Host 6 7 8
+ * Host 9 10 11
+ * Host 12 13 14
+ * Host 15 16 17
+ * Host 18 19 20
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 3 4 5
+ * Host 6 7 8
+ * Host 9 10 11
+ * Host 12 13 14
+ * Host 15 16 17
+ * Host 18 19 20
+ */
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 3,
+ SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 15,
+ SERVER_INSTANCE_ID_PREFIX + 18));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 13,
+ SERVER_INSTANCE_ID_PREFIX + 16,
+ SERVER_INSTANCE_ID_PREFIX + 19));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 14,
+ SERVER_INSTANCE_ID_PREFIX + 17,
+ SERVER_INSTANCE_ID_PREFIX + 20));
+
+ // Test instance shuffling/uplifting from 3*5 to 3*7
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 21;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12,
SERVER_INSTANCE_ID_PREFIX + 15,
+ SERVER_INSTANCE_ID_PREFIX + 18));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13,
SERVER_INSTANCE_ID_PREFIX + 16,
+ SERVER_INSTANCE_ID_PREFIX + 19));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14,
SERVER_INSTANCE_ID_PREFIX + 17,
+ SERVER_INSTANCE_ID_PREFIX + 20));
+
+ InstancePartitions existingInstancePartitions = new
InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 12, SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX
+ 7, SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 14, SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE, instanceConfigs,
+ existingInstancePartitions, preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * uplift from 15 instances in 3 replicas to 21 instance in 3 replicas
+ * 21 instances in 4 pools
+ * Pre-configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 3 4 5
+ * Host 6 7 8
+ * Host 9 10 11
+ * Host 12 13 14
+ * Host 15 16 17
+ * Host 18 19 20
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 6 2
+ * Host 12 7 14
+ * Host 1 4 5
+ * Host 3 13 8
+ * Host 9 10 11
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 12 13 14
+ * Host 3 4 5
+ * Host 6 7 8
+ * Host 9 10 11
+ * Host 15 16 17
+ * Host 18 19 20
+ */
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 3,
+ SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 15,
+ SERVER_INSTANCE_ID_PREFIX + 18));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 13,
+ SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 16,
+ SERVER_INSTANCE_ID_PREFIX + 19));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 14,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 17,
+ SERVER_INSTANCE_ID_PREFIX + 20));
+
+ // Test instance replacement from 3*6 to 3*5
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 15;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 24,
+ SERVER_INSTANCE_ID_PREFIX + 27, SERVER_INSTANCE_ID_PREFIX + 30));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 19,
SERVER_INSTANCE_ID_PREFIX + 22, SERVER_INSTANCE_ID_PREFIX + 25,
+ SERVER_INSTANCE_ID_PREFIX + 28, SERVER_INSTANCE_ID_PREFIX + 31));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 20,
SERVER_INSTANCE_ID_PREFIX + 23, SERVER_INSTANCE_ID_PREFIX + 26,
+ SERVER_INSTANCE_ID_PREFIX + 29, SERVER_INSTANCE_ID_PREFIX + 32));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 15,
SERVER_INSTANCE_ID_PREFIX + 18));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 16,
SERVER_INSTANCE_ID_PREFIX + 19));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17,
SERVER_INSTANCE_ID_PREFIX + 20));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * From 18 instances in 3 replicas to 15 instance in 3 replicas
+ * Pre-configured partitioning:
+ * RG1 RG2 RG3
+ * Host 18 19 20
+ * Host 21 22 23
+ * Host 24 25 26
+ * Host 27 28 29
+ * Host 30 31 32
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 3 4 5
+ * Host 6 7 8
+ * Host 9 10 11
+ * Host 15 16 17
+ * Host 18 19 20
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 18 19 20
+ * Host 21 22 23
+ * Host 24 25 26
+ * Host 27 28 29
+ * Host 30 31 32
+ *
+ */
+
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+ SERVER_INSTANCE_ID_PREFIX + 21,
+ SERVER_INSTANCE_ID_PREFIX + 24,
+ SERVER_INSTANCE_ID_PREFIX + 27,
+ SERVER_INSTANCE_ID_PREFIX + 30));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 19,
+ SERVER_INSTANCE_ID_PREFIX + 22,
+ SERVER_INSTANCE_ID_PREFIX + 25,
+ SERVER_INSTANCE_ID_PREFIX + 28,
+ SERVER_INSTANCE_ID_PREFIX + 31));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 20,
+ SERVER_INSTANCE_ID_PREFIX + 23,
+ SERVER_INSTANCE_ID_PREFIX + 26,
+ SERVER_INSTANCE_ID_PREFIX + 29,
+ SERVER_INSTANCE_ID_PREFIX + 32));
+
+ // Test instance shuffling/uplifting from 3*5 to 3*7, with some instance
replacement
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 18;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 15,
SERVER_INSTANCE_ID_PREFIX + 18));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 16,
SERVER_INSTANCE_ID_PREFIX + 19));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17,
SERVER_INSTANCE_ID_PREFIX + 20));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 12, SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX
+ 7, SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 14, SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * uplift from 15 instances in 3 replicas to 21 instance in 3 replicas
+ * Pre-configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 3 4 5
+ * Host 6 7 8
+ * Host 9 10 11
+ * Host 15 16 17
+ * Host 18 19 20
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 6 2
+ * Host 12 7 14
+ * Host 1 4 5
+ * Host 3 13 8
+ * Host 9 10 11
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 0 1 2
+ * Host 6 7 8
+ * Host 3 4 5
+ * Host 15 16 17
+ * Host 9 10 11
+ * Host 18 19 20
+ */
+
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 3,
+ SERVER_INSTANCE_ID_PREFIX + 15,
+ SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 18));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 16,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 19));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 17,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 20));
+
+ // Test instance shuffling/uplifting from 3*5 to 4*6
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 24;
+ numReplicaGroups = 4;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 3, SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9, SERVER_INSTANCE_ID_PREFIX + 12,
SERVER_INSTANCE_ID_PREFIX + 15));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13,
SERVER_INSTANCE_ID_PREFIX + 16));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14,
SERVER_INSTANCE_ID_PREFIX + 17));
+ preConfigured.setInstances(0, 3,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 20,
+ SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 22,
SERVER_INSTANCE_ID_PREFIX + 23));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0, SERVER_INSTANCE_ID_PREFIX
+ 12, SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX + 9));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 6, SERVER_INSTANCE_ID_PREFIX
+ 7, SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 13, SERVER_INSTANCE_ID_PREFIX + 10));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 14, SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8, SERVER_INSTANCE_ID_PREFIX + 11));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * Test instance shuffling/uplifting from 3*5 to 4*6
+ * Pre-configured partitioning:
+ * RG1 RG2 RG3 RG4
+ * Host 0 1 2 18
+ * Host 3 4 5 19
+ * Host 6 7 8 20
+ * Host 9 10 11 21
+ * Host 12 13 14 22
+ * Host 15 16 17 23
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3
+ * Host 0 6 2
+ * Host 12 7 14
+ * Host 1 4 5
+ * Host 3 13 8
+ * Host 9 10 11
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 0 1 2 18
+ * Host 12 13 14 22
+ * Host 3 4 5 19
+ * Host 6 7 8 20
+ * Host 9 10 11 21
+ * Host 15 16 17 23
+ */
+
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 3,
+ SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 15));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 13,
+ SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 16));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 14,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 17));
+ assertEquals(instancePartitions.getInstances(0, 3),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+ SERVER_INSTANCE_ID_PREFIX + 22,
+ SERVER_INSTANCE_ID_PREFIX + 19,
+ SERVER_INSTANCE_ID_PREFIX + 20,
+ SERVER_INSTANCE_ID_PREFIX + 21,
+ SERVER_INSTANCE_ID_PREFIX + 23));
+
+ // Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling
of instances
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 12;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 14));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 22, SERVER_INSTANCE_ID_PREFIX + 13,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 17));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
SERVER_INSTANCE_ID_PREFIX + 19, SERVER_INSTANCE_ID_PREFIX + 20,
+ SERVER_INSTANCE_ID_PREFIX + 21, SERVER_INSTANCE_ID_PREFIX + 23));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 0,
+ SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 3,
+ SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 15));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 13,
+ SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 16));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 14,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 17));
+ existingInstancePartitions.setInstances(0, 3,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+ SERVER_INSTANCE_ID_PREFIX + 22,
+ SERVER_INSTANCE_ID_PREFIX + 19,
+ SERVER_INSTANCE_ID_PREFIX + 20,
+ SERVER_INSTANCE_ID_PREFIX + 21,
+ SERVER_INSTANCE_ID_PREFIX + 23));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling
of instances
+ * Pre-configured partitioning:
+ * RG2 RG3 RG4
+ * Host 1 2 18
+ * Host 4 22 19
+ * Host 7 13 20
+ * Host 10 11 21
+ * Host 14 17 23
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3 RG4
+ * Host 0 1 2 18
+ * Host 3 4 5 19
+ * Host 6 7 8 20
+ * Host 9 10 11 21
+ * Host 12 13 14 22
+ * Host 15 16 17 23
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 1 2 18
+ * Host 10 11 21
+ * Host 7 13 20
+ * Host 14 17 23
+ */
+
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 14));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 13,
+ SERVER_INSTANCE_ID_PREFIX + 17));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 18,
+ SERVER_INSTANCE_ID_PREFIX + 21,
+ SERVER_INSTANCE_ID_PREFIX + 20,
+ SERVER_INSTANCE_ID_PREFIX + 23));
+
+
+ // upscale 3*3 to 3*5
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 15;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 3));
+
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 6));
+
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 9));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+ /*
+ * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling
of instances
+ * Pre-configured partitioning:
+ * RG2 RG3 RG4
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ * Host 10 11 12
+ * Host 13 14 15
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ * Host 10 11 12
+ * Host 13 14 15
+ */
+
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1,
+ SERVER_INSTANCE_ID_PREFIX + 4,
+ SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10,
+ SERVER_INSTANCE_ID_PREFIX + 13));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2,
+ SERVER_INSTANCE_ID_PREFIX + 5,
+ SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11,
+ SERVER_INSTANCE_ID_PREFIX + 14));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3,
+ SERVER_INSTANCE_ID_PREFIX + 6,
+ SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 12,
+ SERVER_INSTANCE_ID_PREFIX + 15));
+
+ // downscale 3*5 to 3*3
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 9;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7,
+ SERVER_INSTANCE_ID_PREFIX + 10, SERVER_INSTANCE_ID_PREFIX + 13));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8,
+ SERVER_INSTANCE_ID_PREFIX + 11, SERVER_INSTANCE_ID_PREFIX + 14));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9,
+ SERVER_INSTANCE_ID_PREFIX + 12, SERVER_INSTANCE_ID_PREFIX + 15));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+
+ /*
+ * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling
of instances
+ * Pre-configured partitioning:
+ * RG2 RG3 RG4
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ *
+ * Existing configured partitioning:
+ * RG1 RG2 RG3
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ * Host 10 11 12
+ * Host 13 14 15
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ */
+
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ // replace instance 5 with instance 11
+ numPartitions = 0;
+ numInstancesPerPartition = 0;
+ numInstances = 9;
+ numReplicaGroups = 3;
+ numInstancesPerReplicaGroup = numInstances / numReplicaGroups;
+ tagPoolConfig = new InstanceTagPoolConfig(OFFLINE_TAG, true, numPools,
null);
+ replicaPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicaGroups,
numInstancesPerReplicaGroup, numPartitions,
+ numInstancesPerPartition, false, null);
+
+ tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Collections.singletonMap(InstancePartitionsType.OFFLINE.toString(),
+ new InstanceAssignmentConfig(tagPoolConfig, null,
replicaPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR.toString())))
+
.setInstancePartitionsMap(Collections.singletonMap(InstancePartitionsType.OFFLINE,
"preConfigured")).build();
+ driver = new InstanceAssignmentDriver(tableConfig);
+
+ preConfigured = new InstancePartitions("preConfigured");
+ preConfigured.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+ preConfigured.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 11, SERVER_INSTANCE_ID_PREFIX + 8));
+ preConfigured.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ existingInstancePartitions = new InstancePartitions("existing");
+ existingInstancePartitions.setInstances(0, 0,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+ existingInstancePartitions.setInstances(0, 1,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 5, SERVER_INSTANCE_ID_PREFIX + 8));
+ existingInstancePartitions.setInstances(0, 2,
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+
+ instancePartitions =
+ driver.assignInstances(InstancePartitionsType.OFFLINE,
instanceConfigs, existingInstancePartitions,
+ preConfigured);
+
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicaGroups);
+ assertEquals(instancePartitions.getNumPartitions(), 1);
+
+ /*
+ * Test instance shuffling/downlifting from 4 * 6 to 3 * 4 with shuffling
of instances
+ * Pre-configured partitioning:
+ * RG2 RG3 RG4
+ * Host 1 2 3
+ * Host 4 11 6
+ * Host 7 8 9
+ *
+ * Existing configured partitioning:
+ * RG2 RG3 RG4
+ * Host 1 2 3
+ * Host 4 5 6
+ * Host 7 8 9
+ *
+ * Final assignment for this table:
+ * RG1 RG2 RG3
+ * Host 1 2 3
+ * Host 4 11 6
+ * Host 7 8 9
+ */
+
+ // Verifying the final configuration after downlifting
+ assertEquals(instancePartitions.getInstances(0, 0),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 1, SERVER_INSTANCE_ID_PREFIX
+ 4, SERVER_INSTANCE_ID_PREFIX + 7));
+ assertEquals(instancePartitions.getInstances(0, 1),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 2, SERVER_INSTANCE_ID_PREFIX
+ 11, SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(0, 2),
+ Arrays.asList(SERVER_INSTANCE_ID_PREFIX + 3, SERVER_INSTANCE_ID_PREFIX
+ 6, SERVER_INSTANCE_ID_PREFIX + 9));
+ }
+
@Test
public void testPoolBased() {
// 10 instances in 2 pools, each with 5 instances
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index db6941bf1f..8a22aaf4ea 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -85,6 +85,8 @@ public class Actions {
public static final String UPDATE_USER = "UpdateUser";
public static final String UPDATE_ZNODE = "UpdateZnode";
public static final String UPLOAD_SEGMENT = "UploadSegment";
+ public static final String GET_INSTANCE_PARTITIONS =
"GetInstancePartitions";
+ public static final String UPDATE_INSTANCE_PARTITIONS =
"UpdateInstancePartitions";
}
// Action names for table
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 73cf7f27a1..b7d8cadc4a 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -771,11 +771,25 @@ public final class TableConfigUtils {
tableConfig.getInstanceAssignmentConfigMap())) {
return;
}
- for (InstancePartitionsType instancePartitionsType :
tableConfig.getInstancePartitionsMap().keySet()) {
- Preconditions.checkState(
-
!tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()),
- String.format("Both InstanceAssignmentConfigMap and
InstancePartitionsMap set for %s",
- instancePartitionsType));
+
+ for (InstancePartitionsType instancePartitionsType :
InstancePartitionsType.values()) {
+ if
(tableConfig.getInstanceAssignmentConfigMap().containsKey(instancePartitionsType.toString()))
{
+ InstanceAssignmentConfig instanceAssignmentConfig =
+
tableConfig.getInstanceAssignmentConfigMap().get(instancePartitionsType.toString());
+ if (instanceAssignmentConfig.getPartitionSelector()
+ ==
InstanceAssignmentConfig.PartitionSelector.MIRROR_SERVER_SET_PARTITION_SELECTOR)
{
+ Preconditions.checkState(
+
tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType),
+ String.format("Both InstanceAssignmentConfigMap and
InstancePartitionsMap needed for %s, as "
+ + "MIRROR_SERVER_SET_PARTITION_SELECTOR is used",
+ instancePartitionsType));
+ } else {
+ Preconditions.checkState(
+
!tableConfig.getInstancePartitionsMap().containsKey(instancePartitionsType),
+ String.format("Both InstanceAssignmentConfigMap and
InstancePartitionsMap set for %s",
+ instancePartitionsType));
+ }
+ }
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index 186f545cea..391ba4812d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -82,6 +82,7 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
}
public enum PartitionSelector {
- FD_AWARE_INSTANCE_PARTITION_SELECTOR,
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR
+ FD_AWARE_INSTANCE_PARTITION_SELECTOR,
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
+ MIRROR_SERVER_SET_PARTITION_SELECTOR
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]