Jackie-Jiang commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r855564628
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -37,13 +46,15 @@
public class InstanceReplicaGroupPartitionSelector {
private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceReplicaGroupPartitionSelector.class);
- private final InstanceReplicaGroupPartitionConfig
_replicaGroupPartitionConfig;
- private final String _tableNameWithType;
+ protected final InstanceReplicaGroupPartitionConfig
_replicaGroupPartitionConfig;
Review Comment:
(minor) Keep them `private`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
Review Comment:
(minor) rename to be more specific, and use `TreeMap` because the map size
will be small
```suggestion
Map<Integer, Set<String>> replicaGroupIdToExistingInstancesMap = new
TreeMap<>();
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ Set<String> candidateInstances =
+ poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
+ List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
+ instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k ->
new HashSet<>())
+ .addAll(existingInstances);
+ }
+ }
+
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
Review Comment:
The following 2 loops can be simplified (and more efficient) if we loop on
pools first, then loop over then replica-groups within the pool
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
numInstancesToSelect = numInstanceConfigs;
}
- List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ List<String> instancesToSelect;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(0, 0);
+ Set<String> candidateInstances = new LinkedHashSet<>();
+ instanceConfigs.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesToSelect,
candidateInstances, existingInstances);
+ } else {
+ // Select instances sequentially.
+ instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ }
}
instancesToSelect.sort(null);
LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect,
_tableNameWithType);
// Set the instances as partition 0 replica 0
instancePartitions.setInstances(0, 0, instancesToSelect);
}
}
+
+ /**
+ * Select instances with minimum movement.
+ * This algorithm can solve the following scenarios:
+ * * swap an instance
+ * * add/remove replica groups
+ * * increase/decrease number of instances per replica group
+ * TODO: handle the scenarios that selected pools are changed.
+ * @param numInstancesToSelect number of instances to select
+ * @param candidateInstances candidate instances to be selected
+ * @param existingInstances list of existing instances
+ */
+ protected List<String> getInstancesWithMinimumMovement(int
numInstancesToSelect, Set<String> candidateInstances,
+ List<String> existingInstances) {
+ // Initialize the list with empty positions to fill.
+ List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(null);
+ }
+ Deque<String> newlyAddedInstances = new LinkedList<>();
+
+ // Find out the existing instances that are still alive.
+ Set<String> existingInstancesStillAlive = new HashSet<>();
+ for (String existingInstance : existingInstances) {
+ if (candidateInstances.contains(existingInstance)) {
+ existingInstancesStillAlive.add(existingInstance);
+ }
+ }
+
+ // Find out the newly added instances.
+ for (String candidateInstance : candidateInstances) {
+ if (!existingInstancesStillAlive.contains(candidateInstance)) {
+ newlyAddedInstances.add(candidateInstance);
+ }
+ }
+
+ for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect;
i++) {
+ String existingInstance = existingInstances.get(i);
+ if (candidateInstances.contains(existingInstance)) {
+ // If the instance still exists, add it to the instance list.
+ instancesToSelect.set(i, existingInstance);
+ existingInstancesStillAlive.remove(existingInstance);
+ // Add the existing instance to the tail so that it won't be firstly
chosen for the next partition.
+ candidateInstances.remove(existingInstance);
+ candidateInstances.add(existingInstance);
+ } else if (!newlyAddedInstances.isEmpty()) {
+ // If the instance no longer exists, pick a new instance to fill its
vacant position.
+ String newInstance = newlyAddedInstances.pollFirst();
+ instancesToSelect.set(i, newInstance);
+ }
+ }
Review Comment:
I feel the following 3 loops can be simplified
```suggestion
int numExistingInstances = existingInstances.size();
for (int i = 0; i < numInstancesToSelect; i++) {
String existingInstance = i < numExistingInstances ?
existingInstances.get(i) : null;
String selectedInstance;
if (existingInstance != null &&
candidateInstances.contains(existingInstance)) {
selectedInstance = existingInstance;
} else {
selectInstance = newlyAddedInstances.poll();
}
// Add the selected instance to the tail so that it won't be firstly
chosen for the next partition.
candidateInstances.remove(selectedInstance);
candidateInstances.add(selectedInstance);
}
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
numInstancesToSelect = numInstanceConfigs;
}
- List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ List<String> instancesToSelect;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(0, 0);
+ Set<String> candidateInstances = new LinkedHashSet<>();
+ instanceConfigs.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesToSelect,
candidateInstances, existingInstances);
+ } else {
+ // Select instances sequentially.
+ instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ }
}
instancesToSelect.sort(null);
LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect,
_tableNameWithType);
// Set the instances as partition 0 replica 0
instancePartitions.setInstances(0, 0, instancesToSelect);
}
}
+
+ /**
+ * Select instances with minimum movement.
+ * This algorithm can solve the following scenarios:
+ * * swap an instance
+ * * add/remove replica groups
+ * * increase/decrease number of instances per replica group
+ * TODO: handle the scenarios that selected pools are changed.
+ * @param numInstancesToSelect number of instances to select
+ * @param candidateInstances candidate instances to be selected
+ * @param existingInstances list of existing instances
+ */
+ protected List<String> getInstancesWithMinimumMovement(int
numInstancesToSelect, Set<String> candidateInstances,
Review Comment:
(minor) `private static` and put `LinkedHashSet` instead of general `Set`
because we rely on that for the algorithm to work
```suggestion
private static List<String> getInstancesWithMinimumMovement(int
numInstancesToSelect, LinkedHashSet<String> candidateInstances,
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +133,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ Set<String> candidateInstances =
+ poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
+ List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
+ instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k ->
new HashSet<>())
+ .addAll(existingInstances);
+ }
+ }
+
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ // Filter out instances that belong to other replica groups which
should not be the candidate.
+ Set<String> candidateInstances = new
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+ for (int otherReplicaGroupId = 0;
+ otherReplicaGroupId < existingNumReplicaGroups &&
otherReplicaGroupId < numReplicaGroups;
+ otherReplicaGroupId++) {
+ if (replicaGroupId != otherReplicaGroupId) {
+
candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId));
+ }
+ }
+ Set<String> chosenCandidateInstances = new HashSet<>();
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ List<String> instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesPerPartition,
candidateInstances, existingInstances);
+ chosenCandidateInstances.addAll(instancesToSelect);
+ instancePartitions.setInstances(partitionId, replicaGroupId,
instancesToSelect);
+ }
+ // Remove instances that are already been chosen.
+
poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
+ }
+
+ // If the new number of replica groups is greater than the existing
number of replica groups.
+ for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId <
numReplicaGroups; replicaGroupId++) {
+ int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ Set<String> candidateInstances = new
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+
+ Set<String> chosenCandidateInstances = new HashSet<>();
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ if (existingInstances == null) {
Review Comment:
This is always `null`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
numInstancesToSelect = numInstanceConfigs;
}
- List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ List<String> instancesToSelect;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(0, 0);
+ Set<String> candidateInstances = new LinkedHashSet<>();
+ instanceConfigs.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesToSelect,
candidateInstances, existingInstances);
+ } else {
+ // Select instances sequentially.
+ instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ }
}
instancesToSelect.sort(null);
LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect,
_tableNameWithType);
// Set the instances as partition 0 replica 0
instancePartitions.setInstances(0, 0, instancesToSelect);
}
}
+
+ /**
+ * Select instances with minimum movement.
+ * This algorithm can solve the following scenarios:
Review Comment:
I still feel this algorithm cannot guarantee evenly distribution of the
partitions in certain scenarios because it doesn't track the partitions already
assigned to the instance. In order to have guaranteed even distribution, we
should assign in 2 steps: first assign the existing instances for all
partitions; then assign the vacant positions based on the partitions already
assigned to each instance.
But the current algorithm should work well in most scenarios, so probably
add a TODO for the above concern
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -180,14 +266,110 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
numInstancesToSelect = numInstanceConfigs;
}
- List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
- for (int i = 0; i < numInstancesToSelect; i++) {
- instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ List<String> instancesToSelect;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(0, 0);
+ Set<String> candidateInstances = new LinkedHashSet<>();
+ instanceConfigs.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+ instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesToSelect,
candidateInstances, existingInstances);
+ } else {
+ // Select instances sequentially.
+ instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(instanceConfigs.get(i).getInstanceName());
+ }
}
instancesToSelect.sort(null);
LOGGER.info("Selecting instances: {} for table: {}", instancesToSelect,
_tableNameWithType);
// Set the instances as partition 0 replica 0
instancePartitions.setInstances(0, 0, instancesToSelect);
}
}
+
+ /**
+ * Select instances with minimum movement.
+ * This algorithm can solve the following scenarios:
+ * * swap an instance
+ * * add/remove replica groups
+ * * increase/decrease number of instances per replica group
+ * TODO: handle the scenarios that selected pools are changed.
+ * @param numInstancesToSelect number of instances to select
+ * @param candidateInstances candidate instances to be selected
+ * @param existingInstances list of existing instances
+ */
+ protected List<String> getInstancesWithMinimumMovement(int
numInstancesToSelect, Set<String> candidateInstances,
+ List<String> existingInstances) {
+ // Initialize the list with empty positions to fill.
+ List<String> instancesToSelect = new ArrayList<>(numInstancesToSelect);
+ for (int i = 0; i < numInstancesToSelect; i++) {
+ instancesToSelect.add(null);
+ }
+ Deque<String> newlyAddedInstances = new LinkedList<>();
+
+ // Find out the existing instances that are still alive.
+ Set<String> existingInstancesStillAlive = new HashSet<>();
+ for (String existingInstance : existingInstances) {
+ if (candidateInstances.contains(existingInstance)) {
+ existingInstancesStillAlive.add(existingInstance);
+ }
+ }
+
+ // Find out the newly added instances.
+ for (String candidateInstance : candidateInstances) {
+ if (!existingInstancesStillAlive.contains(candidateInstance)) {
+ newlyAddedInstances.add(candidateInstance);
+ }
+ }
+
+ for (int i = 0; i < existingInstances.size() && i < numInstancesToSelect;
i++) {
+ String existingInstance = existingInstances.get(i);
+ if (candidateInstances.contains(existingInstance)) {
+ // If the instance still exists, add it to the instance list.
+ instancesToSelect.set(i, existingInstance);
+ existingInstancesStillAlive.remove(existingInstance);
+ // Add the existing instance to the tail so that it won't be firstly
chosen for the next partition.
+ candidateInstances.remove(existingInstance);
+ candidateInstances.add(existingInstance);
+ } else if (!newlyAddedInstances.isEmpty()) {
+ // If the instance no longer exists, pick a new instance to fill its
vacant position.
+ String newInstance = newlyAddedInstances.pollFirst();
+ instancesToSelect.set(i, newInstance);
+ }
+ }
+
+ // If the new number of instances per replica group is greater than the
previous one.
Review Comment:
```suggestion
// If the new number of instances per partition is greater than the
previous one.
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ Set<String> candidateInstances =
+ poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
+ List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
+ instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k ->
new HashSet<>())
+ .addAll(existingInstances);
+ }
+ }
+
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ // Filter out instances that belong to other replica groups which
should not be the candidate.
+ Set<String> candidateInstances = new
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+ for (int otherReplicaGroupId = 0;
+ otherReplicaGroupId < existingNumReplicaGroups &&
otherReplicaGroupId < numReplicaGroups;
+ otherReplicaGroupId++) {
+ if (replicaGroupId != otherReplicaGroupId) {
+
candidateInstances.removeAll(replicaGroupIdToInstancesMap.get(otherReplicaGroupId));
+ }
+ }
+ Set<String> chosenCandidateInstances = new HashSet<>();
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ List<String> instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesPerPartition,
candidateInstances, existingInstances);
+ chosenCandidateInstances.addAll(instancesToSelect);
+ instancePartitions.setInstances(partitionId, replicaGroupId,
instancesToSelect);
+ }
+ // Remove instances that are already been chosen.
+
poolToCandidateInstancesMap.get(pool).removeAll(chosenCandidateInstances);
+ }
+
+ // If the new number of replica groups is greater than the existing
number of replica groups.
+ for (int replicaGroupId = existingNumReplicaGroups; replicaGroupId <
numReplicaGroups; replicaGroupId++) {
+ int pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ Set<String> candidateInstances = new
LinkedHashSet<>(poolToCandidateInstancesMap.get(pool));
+
+ Set<String> chosenCandidateInstances = new HashSet<>();
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ if (existingInstances == null) {
+ existingInstances = Collections.emptyList();
+ }
+ List<String> instancesToSelect =
+ getInstancesWithMinimumMovement(numInstancesPerPartition,
candidateInstances, existingInstances);
Review Comment:
After reading the doc, the algorithm is much more clear. Shall we add some
comments for the 3 steps of the algorithm before each for loop to explain what
we are trying to achieve for each step?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
Review Comment:
I see. Suggest adding `int numCommonReplicaGroups =
Math.min(numReplicaGroups, existingNumReplicaGroups);` and use it as the end
state to be more clear
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]