jackjlli commented on code in PR #8483:
URL: https://github.com/apache/pinot/pull/8483#discussion_r854730114
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -140,25 +132,118 @@ public void selectInstances(Map<Integer,
List<InstanceConfig>> poolToInstanceCon
LOGGER.info("Selecting {} partitions, {} instances per partition within
a replica-group for table: {}",
numPartitions, numInstancesPerPartition, _tableNameWithType);
- // Assign consecutive instances within a replica-group to each partition.
- // E.g. (within a replica-group, 5 instances, 3 partitions, 3 instances
per partition)
- // [i0, i1, i2, i3, i4]
- // p0 p0 p0 p1 p1
- // p1 p2 p2 p2
- for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int instanceIdInReplicaGroup = 0;
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- List<String> instancesInPartition = new
ArrayList<>(numInstancesPerPartition);
- for (int instanceIdInPartition = 0; instanceIdInPartition <
numInstancesPerPartition;
- instanceIdInPartition++) {
-
instancesInPartition.add(replicaGroupIdToInstancesMap[replicaGroupId][instanceIdInReplicaGroup]);
- instanceIdInReplicaGroup = (instanceIdInReplicaGroup + 1) %
numInstancesPerReplicaGroup;
+ if (_replicaGroupPartitionConfig.isMinimizeDataMovement() &&
_existingInstancePartitions != null) {
+ // Minimize data movement.
+ int existingNumPartitions =
_existingInstancePartitions.getNumPartitions();
+ int existingNumReplicaGroups =
_existingInstancePartitions.getNumReplicaGroups();
+
+ Map<Integer, Set<String>> poolToCandidateInstancesMap = new
TreeMap<>();
+ Map<Integer, Set<String>> replicaGroupIdToInstancesMap = new
HashMap<>();
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
+ Integer pool = replicaGroupIdToPoolMap.get(replicaGroupId);
+ if (pool == null) {
+ // Skip the replica group if it's no longer needed.
+ continue;
+ }
+ Set<String> candidateInstances =
+ poolToCandidateInstancesMap.computeIfAbsent(pool, k -> new
LinkedHashSet<>());
+ List<InstanceConfig> instanceConfigsInPool =
poolToInstanceConfigsMap.get(pool);
+ instanceConfigsInPool.forEach(k ->
candidateInstances.add(k.getInstanceName()));
+
+ for (int partitionId = 0; partitionId < existingNumPartitions;
partitionId++) {
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
+ replicaGroupIdToInstancesMap.computeIfAbsent(replicaGroupId, k ->
new HashSet<>())
+ .addAll(existingInstances);
+ }
+ }
+
+ for (int replicaGroupId = 0; replicaGroupId <
existingNumReplicaGroups; replicaGroupId++) {
Review Comment:
The 1st loop is to find out the existing RGs and their instances. The 2nd
loop is to filter out those instances which are already used in other RGs. In
order to filter all the instances that belong to other RGs, the 1st loop is
needed. That's why we need 2 loops separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]