somandal commented on code in PR #15617:
URL: https://github.com/apache/pinot/pull/15617#discussion_r2059622741
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1434,67 +1444,209 @@ private static void handleErrorInstance(String
tableNameWithType, String segment
}
}
+ /**
+ * Uses the default LOGGER
+ */
+ @VisibleForTesting
+ static Map<String, Map<String, String>> getNextAssignment(Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, int
minAvailableReplicas, boolean enableStrictReplicaGroup,
+ boolean lowDiskMode, int batchSizePerServer,
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+ PartitionIdFetcher partitionIdFetcher) {
+ return getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
+ lowDiskMode, batchSizePerServer, segmentPartitionIdMap,
partitionIdFetcher, LOGGER);
+ }
+
/**
* Returns the next assignment for the table based on the current assignment
and the target assignment with regard to
* the minimum available replicas requirement. For strict replica-group
mode, track the available instances for all
* the segments with the same instances in the next assignment, and ensure
the minimum available replicas requirement
* is met. If adding the assignment for a segment breaks the requirement,
use the current assignment for the segment.
*/
- @VisibleForTesting
- static Map<String, Map<String, String>> getNextAssignment(Map<String,
Map<String, String>> currentAssignment,
+ private static Map<String, Map<String, String>>
getNextAssignment(Map<String, Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, int
minAvailableReplicas, boolean enableStrictReplicaGroup,
- boolean lowDiskMode) {
+ boolean lowDiskMode, int batchSizePerServer,
Object2IntOpenHashMap<String> segmentPartitionIdMap,
+ PartitionIdFetcher partitionIdFetcher, Logger tableRebalanceLogger) {
return enableStrictReplicaGroup ?
getNextStrictReplicaGroupAssignment(currentAssignment, targetAssignment,
- minAvailableReplicas, lowDiskMode)
+ minAvailableReplicas, lowDiskMode, batchSizePerServer,
segmentPartitionIdMap, partitionIdFetcher,
+ tableRebalanceLogger)
: getNextNonStrictReplicaGroupAssignment(currentAssignment,
targetAssignment, minAvailableReplicas,
- lowDiskMode);
+ lowDiskMode, batchSizePerServer);
}
private static Map<String, Map<String, String>>
getNextStrictReplicaGroupAssignment(
Map<String, Map<String, String>> currentAssignment, Map<String,
Map<String, String>> targetAssignment,
- int minAvailableReplicas, boolean lowDiskMode) {
+ int minAvailableReplicas, boolean lowDiskMode, int batchSizePerServer,
+ Object2IntOpenHashMap<String> segmentPartitionIdMap, PartitionIdFetcher
partitionIdFetcher,
+ Logger tableRebalanceLogger) {
Map<String, Map<String, String>> nextAssignment = new TreeMap<>();
Map<String, Integer> numSegmentsToOffloadMap =
getNumSegmentsToOffloadMap(currentAssignment, targetAssignment);
+ Map<Integer, Map<String, Map<String, String>>>
partitionIdToCurrentAssignmentMap;
+ if (batchSizePerServer == RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER) {
+ // Don't calculate the partition id to current assignment mapping if
batching is disabled since
+ // we want to update the next assignment based on all partitions in this
case
+ partitionIdToCurrentAssignmentMap = new TreeMap<>();
+ partitionIdToCurrentAssignmentMap.put(0, currentAssignment);
+ } else {
+ partitionIdToCurrentAssignmentMap =
+ getPartitionIdToCurrentAssignmentMap(currentAssignment,
segmentPartitionIdMap, partitionIdFetcher);
+ }
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new
HashMap<>();
Map<Set<String>, Set<String>> availableInstancesMap = new HashMap<>();
- for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
- String segmentName = entry.getKey();
- Map<String, String> currentInstanceStateMap = entry.getValue();
- Map<String, String> targetInstanceStateMap =
targetAssignment.get(segmentName);
- SingleSegmentAssignment assignment =
- getNextSingleSegmentAssignment(currentInstanceStateMap,
targetInstanceStateMap, minAvailableReplicas,
- lowDiskMode, numSegmentsToOffloadMap, assignmentMap);
- Set<String> assignedInstances = assignment._instanceStateMap.keySet();
- Set<String> availableInstances = assignment._availableInstances;
- availableInstancesMap.compute(assignedInstances, (k,
currentAvailableInstances) -> {
- if (currentAvailableInstances == null) {
- // First segment assigned to these instances, use the new assignment
and update the available instances
- nextAssignment.put(segmentName, assignment._instanceStateMap);
- updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap,
currentInstanceStateMap.keySet(), k);
- return availableInstances;
- } else {
- // There are other segments assigned to the same instances, check
the available instances to see if adding the
- // new assignment can still hold the minimum available replicas
requirement
- availableInstances.retainAll(currentAvailableInstances);
- if (availableInstances.size() >= minAvailableReplicas) {
- // New assignment can be added
- nextAssignment.put(segmentName, assignment._instanceStateMap);
- updateNumSegmentsToOffloadMap(numSegmentsToOffloadMap,
currentInstanceStateMap.keySet(), k);
- return availableInstances;
- } else {
- // New assignment cannot be added, use the current instance state
map
- nextAssignment.put(segmentName, currentInstanceStateMap);
- return currentAvailableInstances;
+
+ Map<String, Integer> serverToNumSegmentsAddedSoFar = new HashMap<>();
+ for (Map<String, Map<String, String>> curAssignment :
partitionIdToCurrentAssignmentMap.values()) {
+ Map.Entry<String, Map<String, String>> firstEntry =
curAssignment.entrySet().iterator().next();
+ // All partitions should be assigned to the same set of servers so it is
enough to check for whether any server
Review Comment:
Discussed offline, we cannot make an assumption that
`StrictRealtimeSegmentAssignment` will always be used when strict replica group
routing is used, which means we cannot safely partition the segments based on
partitionId, as the partitionId might be spread across different servers.
Due to this, decided to add special handling for this case until we fix it
later to enforce that `StrictRealtimeSegmentAssignment` is always used when
strict replica group routing is enabled. Added a separate function to deal with
strict replica group routing enabled but using `RealtimeSegmentAssignment`. I
basically still try to move a full partition together, but look at all the
unique current assigned instances since these can differ to search for whether
any violates the batch size setting for the servers.
Long term fix for this is to:
- Enforce that `StrictRealtimeSegmentAssignment` is used if the strict
replica group routing is enabled via config rather than the check done today to
only enable this for upsert tables
- Add support for tiered storage in `StrictRealtimeSegmentAssignment` with
correct semantics so that features like dedup can be supported with tiered
storage. Dedup should also ideally use `StrictRealtimeSegmentAssignment`
- Clean up this new function in Rebalance that allows strict replica group
routing with `RealtimeSegmentAssignment`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]