yashmayya commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2085993960
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String
tableNameWithType, String segment
}
}
+ /**
+ * Uses the default LOGGER
+ */
+ @VisibleForTesting
+ static Map<String, Map<String, String>> getNextAssignment(Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, int
minAvailableReplicas, boolean enableStrictReplicaGroup,
+ boolean lowDiskMode, int batchSizePerServer,
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+ PartitionIdFetcher partitionIdFetcher) {
+ return getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, LOGGER);
+ }
+
/**
* Returns the next assignment for the table based on the current assignment
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement,
use the current assignment for the segment.
+ * <p>
+ * For strict replica group routing only (where the segment assignment is
not StrictRealtimeSegmentAssignment)
+ * if batching is enabled, the instances assigned for the same partitionId
can be different for different segments.
+ * For strict replica group routing with StrictRealtimeSegmentAssignment on
the other hand, the assignment for a given
+ * partitionId will be the same across all segments. We can treat both cases
similarly by creating a mapping from
+ * partitionId -> unique set of instance assignments -> currentAssignment.
With StrictRealtimeSegmentAssignment,
+ * this map will have a single entry for 'unique set of instance
assignments'.
+ * <p>
+ * TODO: Ideally if strict replica group routing is enabled then
StrictRealtimeSegmentAssignment should be used, but
+ * this is not enforced in the code today. Once enforcement is added,
then the routing side and assignment side
+ * will be equivalent and all segments belonging to a given
partitionId will be assigned to the same set of
+ * instances. Special handling to check each group of assigned
instances can be removed in that case. The
+ * strict replica group routing can also be utilized for OFFLINE
tables, thus StrictRealtimeSegmentAssignment
+ * also needs to be made more generic for the OFFLINE case.
*/
- @VisibleForTesting
- static Map<String, Map<String, String>> getNextAssignment(Map<String,
Map<String, String>> currentAssignment,
+ private static Map<String, Map<String, String>>
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, int
minAvailableReplicas, boolean enableStrictReplicaGroup,
- boolean lowDiskMode) {
- return enableStrictReplicaGroup ?
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
- minAvailableReplicas, lowDiskMode)
+ boolean lowDiskMode, int batchSizePerServer,
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+ PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+ return enableStrictReplicaGroup
+ ? getNextStrictReplicaGroupAssignment(currentAssignment,
targetAssignment, minAvailableReplicas, lowDiskMode,
+ batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher,
tableRebalanceLogger)
: getNextNonStrictReplicaGroupAssignment(currentAssignment,
targetAssignment, minAvailableReplicas,
- lowDiskMode);
+ lowDiskMode, batchSizePerServer);
}
private static Map<String, Map<String, String>>
getNextStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
- int minAvailableReplicas, boolean lowDiskMode) {
+ int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+ Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher
partitionIdFetcher,
+ Logger tableRebalanceLogger) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
Map<String, Integer> numSegmentsToOffloadMap =
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+ Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>>
+ partitionIdToAssignedInstancesToCurrentAssignmentMap;
+ if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+ // Don't calculate the partition id to assigned instances to current
assignment mapping if batching is disabled
+ // since we want to update the next assignment based on all partitions
in this case. Use partitionId as 0
+ // and a dummy set for the assigned instances.
+ partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+ partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new
HashMap<>());
+
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""),
currentAssignment);
+ } else {
+ partitionIdToAssignedInstancesToCurrentAssignmentMap =
+
getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment,
segmentPartitionIdMap,
+ partitionIdFetcher);
+ }
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new
HashMap<>();
Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
- for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> currentInstanceStateMap = entry.getValue();
- Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
- SingleSegmentAssignment assignment =
- getNextSingleSegmentAssignment(currentInstanceStateMap,
targetInstanceStateMap, minAvailableReplicas,
- lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
- Set<String> assignedInstances = assignment._instanceStateMap.keySet();
- Set<String> availableInstances = assignment._availableInstances;
- availableInstancesMap.compute(assignedInstances, (k,
currentAvailableInstances) -> {
- if (currentAvailableInstances == null) {
- // First segment assigned to these instances, use the new assignment
and update the available instances
- nextAssignment.put(segmentName, assignment._instanceStateMap);
- updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap,
currentInstanceStateMap.keySet(), k);
- return availableInstances;
- } else {
- // There are other segments assigned to the same instances, check
the available instances to see if adding the
- // new assignment can still hold the minimum available replicas
requirement
- availableInstances.retainAll(currentAvailableInstances);
- if (availableInstances.size() >= minAvailableReplicas) {
- // New assignment can be added
+
+ Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+ for (Map<Set<String>, Map<String, Map<String, String>>>
assignedInstancesToCurrentAssignment
+ : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) {
+ boolean anyServerExhaustedBatchSize = false;
+ if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER)
{
+ // Check if the servers of the first assignment for each unique set of
assigned instances has any space left
+ // to move this partition. If so, let's mark the partitions as to be
moved, otherwise we mark the partition
+ // as a whole as not moveable.
+ for (Map<String, Map<String, String>> curAssignment :
assignedInstancesToCurrentAssignment.values()) {
+ Map.Entry<String, Map<String, String>> firstEntry =
curAssignment.entrySet().iterator().next();
+ // It is enough to check for whether any server for one segment is
above the limit or not since all segments
+ // in curAssignment will have the same assigned instances list
+ Map<String, String> firstEntryInstanceStateMap =
firstEntry.getValue();
Review Comment:
> let me know if you have other ideas though
What I was thinking was that if the # of segments in the group is greater
than the configured batch size, we update the next assignment in this step
itself. However, if the # of segments is less than the configured batch size,
we could hold off on updating the next assignment for these segments in this
step and do so in a subsequent step instead. Basically, we could differentiate
between two cases - one where there isn't enough server capacity left (w.r.t.
batch size) due to prior segment / partition movements in this step itself and
another where the batch size itself isn't sufficient to move the group at once
(we don't have a choice in this case if we want the rebalance to progress). Let
me know if that makes sense or if I'm missing something?
> Yes we might move a much larger number of segments than the batch size
will allow, but we'll move at most 1 extra partitionId worth of segments.
Imagine a scenario where we have a small batch size of say 2, but each
partition has 10 segments. Without this, we'd never be able to move anything.
With this mechanism of choosing, we'd only move that 1 partition, and for the
2nd partition (if moving to one of the selected servers in the last partition),
we'll keep the assignment as current rather than moving
The case I was thinking of was something like this - let's say the batch
size is 5 and we have two partitions. For the first partition, we move 4
segments to a particular server in a step, and the second partition also needs
to move 4 segments to that particular server. With the current logic, we'll
move both in a single step exceeding the batch size even though they could have
been moved in two separate steps while honoring the batch size configuration.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1524,67 +1535,276 @@ private static void handleErrorInstance(String
tableNameWithType, String segment
}
}
+ /**
+ * Uses the default LOGGER
+ */
+ @VisibleForTesting
+ static Map<String, Map<String, String>> getNextAssignment(Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, int
minAvailableReplicas, boolean enableStrictReplicaGroup,
+ boolean lowDiskMode, int batchSizePerServer,
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+ PartitionIdFetcher partitionIdFetcher) {
+ return getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, LOGGER);
+ }
+
/**
* Returns the next assignment for the table based on the current assignment
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement,
use the current assignment for the segment.
+ * <p>
+ * For strict replica group routing only (where the segment assignment is
not StrictRealtimeSegmentAssignment)
+ * if batching is enabled, the instances assigned for the same partitionId
can be different for different segments.
+ * For strict replica group routing with StrictRealtimeSegmentAssignment on
the other hand, the assignment for a given
+ * partitionId will be the same across all segments. We can treat both cases
similarly by creating a mapping from
+ * partitionId -> unique set of instance assignments -> currentAssignment.
With StrictRealtimeSegmentAssignment,
+ * this map will have a single entry for 'unique set of instance
assignments'.
+ * <p>
+ * TODO: Ideally if strict replica group routing is enabled then
StrictRealtimeSegmentAssignment should be used, but
+ * this is not enforced in the code today. Once enforcement is added,
then the routing side and assignment side
+ * will be equivalent and all segments belonging to a given
partitionId will be assigned to the same set of
+ * instances. Special handling to check each group of assigned
instances can be removed in that case.
*/
- @VisibleForTesting
- static Map<String, Map<String, String>> getNextAssignment(Map<String,
Map<String, String>> currentAssignment,
+ private static Map<String, Map<String, String>>
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, int
minAvailableReplicas, boolean enableStrictReplicaGroup,
- boolean lowDiskMode) {
- return enableStrictReplicaGroup ?
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
- minAvailableReplicas, lowDiskMode)
+ boolean lowDiskMode, int batchSizePerServer,
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+ PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
+ return enableStrictReplicaGroup
+ ? getNextStrictReplicaGroupAssignment(currentAssignment,
targetAssignment, minAvailableReplicas, lowDiskMode,
+ batchSizePerServer, segmentPartitionIdMap, partitionIdFetcher,
tableRebalanceLogger)
: getNextNonStrictReplicaGroupAssignment(currentAssignment,
targetAssignment, minAvailableReplicas,
- lowDiskMode);
+ lowDiskMode, batchSizePerServer);
}
private static Map<String, Map<String, String>>
getNextStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
- int minAvailableReplicas, boolean lowDiskMode) {
+ int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+ Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher
partitionIdFetcher,
+ Logger tableRebalanceLogger) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
Map<String, Integer> numSegmentsToOffloadMap =
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+ Map<Integer, Map<Set<String>, Map<String, Map<String, String>>>>
+ partitionIdToAssignedInstancesToCurrentAssignmentMap;
+ if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+ // Don't calculate the partition id to assigned instances to current
assignment mapping if batching is disabled
+ // since we want to update the next assignment based on all partitions
in this case. Use partitionId as 0
+ // and a dummy set for the assigned instances.
+ partitionIdToAssignedInstancesToCurrentAssignmentMap = new TreeMap<>();
+ partitionIdToAssignedInstancesToCurrentAssignmentMap.put(0, new
HashMap<>());
+
partitionIdToAssignedInstancesToCurrentAssignmentMap.get(0).put(Set.of(""),
currentAssignment);
+ } else {
+ partitionIdToAssignedInstancesToCurrentAssignmentMap =
+
getPartitionIdToAssignedInstancesToCurrentAssignmentMap(currentAssignment,
segmentPartitionIdMap,
+ partitionIdFetcher);
+ }
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new
HashMap<>();
Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
- for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> currentInstanceStateMap = entry.getValue();
- Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
- SingleSegmentAssignment assignment =
- getNextSingleSegmentAssignment(currentInstanceStateMap,
targetInstanceStateMap, minAvailableReplicas,
- lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
- Set<String> assignedInstances = assignment._instanceStateMap.keySet();
- Set<String> availableInstances = assignment._availableInstances;
- availableInstancesMap.compute(assignedInstances, (k,
currentAvailableInstances) -> {
- if (currentAvailableInstances == null) {
- // First segment assigned to these instances, use the new assignment
and update the available instances
- nextAssignment.put(segmentName, assignment._instanceStateMap);
- updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap,
currentInstanceStateMap.keySet(), k);
- return availableInstances;
- } else {
- // There are other segments assigned to the same instances, check
the available instances to see if adding the
- // new assignment can still hold the minimum available replicas
requirement
- availableInstances.retainAll(currentAvailableInstances);
- if (availableInstances.size() >= minAvailableReplicas) {
- // New assignment can be added
+
+ Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+ for (Map<Set<String>, Map<String, Map<String, String>>>
assignedInstancesToCurrentAssignment
+ : partitionIdToAssignedInstancesToCurrentAssignmentMap.values()) {
+ boolean anyServerExhaustedBatchSize = false;
+ if (batchSizePerServer != RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER)
{
+ // Check if the servers of the first assignment for each unique set of
assigned instances has any space left
+ // to move this partition. If so, let's mark the partitions as to be
moved, otherwise we mark the partition
+ // as a whole as not moveable.
+ for (Map<String, Map<String, String>> curAssignment :
assignedInstancesToCurrentAssignment.values()) {
+ Map.Entry<String, Map<String, String>> firstEntry =
curAssignment.entrySet().iterator().next();
+ // All segments should be assigned to the same set of servers so it
is enough to check for whether any server
+ // for one segment is above the limit or not
+ Map<String, String> firstEntryInstanceStateMap =
firstEntry.getValue();
+ SingleSegmentAssignment firstAssignment =
+ getNextSingleSegmentAssignment(firstEntryInstanceStateMap,
targetAssignment.get(firstEntry.getKey()),
+ minAvailableReplicas, lowDiskMode, numSegmentsToOffloadMap,
assignmentMap);
+ Set<String> serversAdded =
getServersAddedInSingleSegmentAssignment(firstEntryInstanceStateMap,
+ firstAssignment._instanceStateMap);
+ for (String server : serversAdded) {
+ if (serverToNumSegmentsAddedSoFar.getOrDefault(server, 0) >=
batchSizePerServer) {
+ anyServerExhaustedBatchSize = true;
+ break;
+ }
+ }
+ if (anyServerExhaustedBatchSize) {
+ break;
+ }
+ }
+ }
+ for (Map<String, Map<String, String>> curAssignment :
assignedInstancesToCurrentAssignment.values()) {
+ getNextAssignmentForPartitionIdStrictReplicaGroup(curAssignment,
targetAssignment, nextAssignment,
+ anyServerExhaustedBatchSize, minAvailableReplicas, lowDiskMode,
numSegmentsToOffloadMap, assignmentMap,
+ availableInstancesMap, serverToNumSegmentsAddedSoFar);
+ }
+ }
+
+ checkIfAnyServersAssignedMoreSegmentsThanBatchSize(batchSizePerServer,
serverToNumSegmentsAddedSoFar,
+ tableRebalanceLogger);
+ return nextAssignment;
+ }
+
+ private static void
getNextAssignmentForPartitionIdStrictReplicaGroup(Map<String, Map<String,
String>> curAssignment,
+ Map<String, Map<String, String>> targetAssignment, Map<String,
Map<String, String>> nextAssignment,
+ boolean anyServerExhaustedBatchSize, int minAvailableReplicas, boolean
lowDiskMode,
+ Map<String, Integer> numSegmentsToOffloadMap, Map<Pair<Set<String>,
Set<String>>, Set<String>> assignmentMap,
+ Map<Set<String>, Set<String>> availableInstancesMap, Map<String,
Integer> serverToNumSegmentsAddedSoFar) {
+ if (anyServerExhaustedBatchSize) {
+ // Exhausted the batch size for at least 1 server, just copy over the
remaining segments as is
+ for (Map.Entry<String, Map<String, String>> entry :
curAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> currentInstanceStateMap = entry.getValue();
+ nextAssignment.put(segmentName, currentInstanceStateMap);
+ }
+ } else {
+ // Process all the partitionIds even if segmentsAddedSoFar becomes
larger than batchSizePerServer
+ // Can only do bestEfforts w.r.t. StrictReplicaGroup since a whole
partition must be moved together for
+ // maintaining consistency
+ for (Map.Entry<String, Map<String, String>> entry :
curAssignment.entrySet()) {
Review Comment:
> We set anyServerExhaustedBatchSize based on checking all assigned
instances (there are two loops over
partitionIdToAssignedInstancesToCurrentAssignmentMap.values()). So either the
whole partition moves or it doesn't, even though yes we iterate over
partitionIdToAssignedInstancesToCurrentAssignmentMap.values()
Ah sorry, I missed that `anyServerExhaustedBatchSize` applies to the whole
partition even when there are multiple unique sets of instances for the
partition.
I agree that we could make it a bit more granular instead, being able to
move sets of segments assigned to the same instances in each partition
atomically at a time (instead of all or nothing on the entire partition).
However, given that strict replica group routing was implemented primarily for
upserts (and that's also the most common use case for strict RG now AFAIK)
where a partition will be assigned only to a single instance (per replica), I'm
fine with the current algorithm as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]