somandal commented on code in PR #15175:
URL: https://github.com/apache/pinot/pull/15175#discussion_r1994283360
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment,
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold) {
+ boolean isDiskUtilSafe = true;
+ StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe
disk util footprint: ");
+ String sep = "";
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ long avgSegmentSize = getAverageSegmentSize(tableSubTypeSizeDetails,
currentAssignment);
+
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ DiskUsageInfo diskUsage = getDiskUsageInfoOfInstance(server);
+
+ if (diskUsage.getTotalSpaceBytes() < 0) {
+ return "Disk usage info not enabled. Try to set
controller.enable.resource.utilization.check=true";
+ }
+
+ Set<String> segmentSet = entry.getValue();
+
+ Set<String> newSegmentSet = new HashSet<>(segmentSet);
+ Set<String> existingSegmentSet = new HashSet<>();
+ Set<String> intersection = new HashSet<>();
+ if (existingServersToSegmentMap.containsKey(server)) {
+ Set<String> segmentSetForServer =
existingServersToSegmentMap.get(server);
+ existingSegmentSet.addAll(segmentSetForServer);
+ intersection.addAll(segmentSetForServer);
+ intersection.retainAll(newSegmentSet);
+ }
+ newSegmentSet.removeAll(intersection);
+ Set<String> removedSegmentSet = new HashSet<>(existingSegmentSet);
+ removedSegmentSet.removeAll(intersection);
+
+ long diskUtilizationGain = newSegmentSet.size() * avgSegmentSize;
+ long diskUtilizationLoss = removedSegmentSet.size() * avgSegmentSize;
+
+ long diskUtilizationFootprint = diskUsage.getUsedSpaceBytes() +
diskUtilizationGain;
+ double diskUtilizationFootprintRate =
Review Comment:
nit: rename `diskUtilizationFootprintRate` to
`diskUtilizationFootprintPercentage`
##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/DiskUsageInfo.java:
##########
@@ -34,6 +34,14 @@ public class DiskUsageInfo {
private final long _usedSpaceBytes;
private final long _lastUpdatedTimeInEpochMs;
+ public DiskUsageInfo(String instanceId) {
Review Comment:
nit: add `@JsonProperty("instanceId")` annotation?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -572,28 +575,28 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
}
}
- private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
- long tableSizePerReplicaInBytes = -1;
+ private TableSizeReader.TableSubTypeSizeDetails fetchTableSizeDetails(String
tableNameWithType) {
if (_tableSizeReader == null) {
LOGGER.warn("tableSizeReader is null, cannot calculate table size for
table {}!", tableNameWithType);
- return tableSizePerReplicaInBytes;
+ return null;
}
LOGGER.info("Fetching the table size for rebalance summary for table: {}",
tableNameWithType);
try {
// TODO: Consider making the timeoutMs for fetching table size via table
rebalancer configurable
- TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
- _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
- tableSizePerReplicaInBytes =
tableSizeDetails._reportedSizePerReplicaInBytes;
+ return _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
} catch (InvalidConfigException e) {
LOGGER.error("Caught exception while trying to fetch table size details
for table: {}", tableNameWithType, e);
}
Review Comment:
can you add the exit log back (want it for debugging):
```
LOGGER.info("Fetched the table size details for table: {}",
tableNameWithType);
```
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -513,6 +533,106 @@ public void testRebalance()
assertNull(rebalanceResult.getPreChecksResult());
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+
+ for (int i = 0; i < numServers; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ for (int i = 0; i < numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+ }
+ executorService.shutdown();
+ }
+
+ @Test
+ public void testRebalancePreCheckerDiskUtil()
+ throws Exception {
+ int numServers = 3;
+ // Mock disk usage
+ Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ DiskUsageInfo diskUsageInfo1 =
+ new DiskUsageInfo(instanceId, "", 1000L, 200L,
System.currentTimeMillis());
+ diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+ }
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+ preChecker.init(_helixResourceManager, executorService, 0.5);
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
+ _helixResourceManager.getTableSizeReader());
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i), null);
+ }
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+ // Add 3 more servers
+ int numServersToAdd = 3;
+ for (int i = 0; i < numServersToAdd; i++) {
+ String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
(numServers + i);
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ DiskUsageInfo diskUsageInfo =
+ new DiskUsageInfo(instanceId, "", 1000L, 200L,
System.currentTimeMillis());
+ diskUsageInfoMap.put(instanceId, diskUsageInfo);
+ }
+
+ ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+ // Rebalance in dry-run mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setPreChecks(true);
+
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult();
+ assertNotNull(preCheckResult);
+
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
+ // Sending request to servers should fail for all, so needsPreprocess
should be set to "error" to indicate that a
Review Comment:
is this a copy paste error? you don't seem to be checking for reload
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java:
##########
@@ -22,10 +22,33 @@
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
public interface RebalancePreChecker {
- void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable
ExecutorService executorService);
- Map<String, String> check(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig);
+ void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable
ExecutorService executorService,
+ double diskUtilizationThreshold);
+
+ class TableFacts {
+ public String _rebalanceJobId;
+ public String _tableNameWithType;
+ public TableConfig _tableConfig;
+ public Map<String, Map<String, String>> _currentAssignment;
+ public Map<String, Map<String, String>> _targetAssignment;
+ public TableSizeReader.TableSubTypeSizeDetails _tableSubTypeSizeDetails;
+
+ public TableFacts(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig,
Review Comment:
nit: are any of these nullable? if so, add `@Nullable` annotation for those
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -42,19 +47,28 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
public static final String IS_MINIMIZE_DATA_MOVEMENT =
"isMinimizeDataMovement";
+ public static final String DISK_UTILIZATION = "diskUtilization";
+
+ private static double _diskUtilizationThreshold;
protected PinotHelixResourceManager _pinotHelixResourceManager;
protected ExecutorService _executorService;
@Override
- public void init(PinotHelixResourceManager pinotHelixResourceManager,
@Nullable ExecutorService executorService) {
+ public void init(PinotHelixResourceManager pinotHelixResourceManager,
@Nullable ExecutorService executorService,
+ double diskUtilizationThreshold) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_executorService = executorService;
+ _diskUtilizationThreshold = diskUtilizationThreshold;
}
@Override
- public Map<String, String> check(String rebalanceJobId, String
tableNameWithType, TableConfig tableConfig) {
- LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ public Map<String, String> check(TableFacts tableFacts) {
+ LOGGER.info("Start pre-checks. Table fact: {}", tableFacts.toString());
Review Comment:
what does `tableFacts.toString()` look like? I don't see that you've
implemented that so want to make sure this log looks fine and not too long
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment,
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold) {
+ boolean isDiskUtilSafe = true;
+ StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe
disk util footprint: ");
+ String sep = "";
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
Review Comment:
nit: not your mistake, but `segmentKey` should `serverKey` or
`instanceName`. I'll be correcting this in other parts of the code as part of
other PRs, but it'll be good if you can fix it here 😅
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment,
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold) {
+ boolean isDiskUtilSafe = true;
+ StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe
disk util footprint: ");
+ String sep = "";
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
Review Comment:
nit: not your mistake, but `segmentKey` should `serverKey` or
`instanceName`. I'll be correcting this in other parts of the code as part of
other PRs, but it'll be good if you can fix it here 😅
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -299,18 +292,28 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
+ "rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while calculating target assignment: " + e,
instancePartitionsMap,
- tierToInstancePartitionsMap, null, preChecksResult, null);
+ tierToInstancePartitionsMap, null, null, null);
}
boolean segmentAssignmentUnchanged =
currentAssignment.equals(targetAssignment);
LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {},
tierInstancePartitionsUnchanged: {}, "
+ "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId,
instancePartitionsUnchanged,
tierInstancePartitionsUnchanged, segmentAssignmentUnchanged,
tableNameWithType);
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails =
fetchTableSizeDetails(tableNameWithType);
+
+ Map<String, String> preChecksResult = null;
+ if (preChecks && _rebalancePreChecker != null) {
+ // TODO: consider making an error or warning log when pre-checks are
enabled but the pre-checker is not set
Review Comment:
nit: if you add the recommended log for `_rebalancePreChecker` being null,
then remove this TODO
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment,
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold) {
+ boolean isDiskUtilSafe = true;
+ StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe
disk util footprint: ");
+ String sep = "";
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ long avgSegmentSize = getAverageSegmentSize(tableSubTypeSizeDetails,
currentAssignment);
+
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ DiskUsageInfo diskUsage = getDiskUsageInfoOfInstance(server);
+
+ if (diskUsage.getTotalSpaceBytes() < 0) {
+ return "Disk usage info not enabled. Try to set
controller.enable.resource.utilization.check=true";
+ }
+
+ Set<String> segmentSet = entry.getValue();
+
+ Set<String> newSegmentSet = new HashSet<>(segmentSet);
+ Set<String> existingSegmentSet = new HashSet<>();
+ Set<String> intersection = new HashSet<>();
+ if (existingServersToSegmentMap.containsKey(server)) {
+ Set<String> segmentSetForServer =
existingServersToSegmentMap.get(server);
+ existingSegmentSet.addAll(segmentSetForServer);
+ intersection.addAll(segmentSetForServer);
+ intersection.retainAll(newSegmentSet);
+ }
+ newSegmentSet.removeAll(intersection);
+ Set<String> removedSegmentSet = new HashSet<>(existingSegmentSet);
+ removedSegmentSet.removeAll(intersection);
+
+ long diskUtilizationGain = newSegmentSet.size() * avgSegmentSize;
+ long diskUtilizationLoss = removedSegmentSet.size() * avgSegmentSize;
+
+ long diskUtilizationFootprint = diskUsage.getUsedSpaceBytes() +
diskUtilizationGain;
+ double diskUtilizationFootprintRate =
+ (double) diskUtilizationFootprint / diskUsage.getTotalSpaceBytes();
+
+ if (diskUtilizationFootprintRate >= threshold) {
+ isDiskUtilSafe = false;
+ message.append(sep)
+ .append(server)
+ .append(String.format(" (%d%%)", (short)
(diskUtilizationFootprintRate * 100)));
+ sep = ", ";
+ }
+ }
+ return isDiskUtilSafe ? "Within threshold" : message.toString();
Review Comment:
nit: let's add the threshold value here as well (in case we decide to make
the threshold configurable via `RebalanceConfig` in the future)
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment,
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold) {
+ boolean isDiskUtilSafe = true;
+ StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe
disk util footprint: ");
+ String sep = "";
Review Comment:
nit: what is `sep`? can we use a less confusing name?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java:
##########
@@ -227,6 +227,8 @@ public Map<String, Object>
getDefaultControllerConfiguration() {
properties.put(ControllerConf.DISABLE_GROOVY, false);
properties.put(ControllerConf.CONSOLE_SWAGGER_ENABLE, false);
properties.put(CommonConstants.CONFIG_OF_TIMEZONE, "UTC");
+ // Disable resource util check in test
Review Comment:
why are we disabling this? can you update the comment to explain?
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -878,9 +878,10 @@ private void checkRebalancePreCheckStatus(RebalanceResult
rebalanceResult, Rebal
assertEquals(rebalanceResult.getStatus(), expectedStatus);
Map<String, String> preChecksResult = rebalanceResult.getPreChecksResult();
assertNotNull(preChecksResult);
- assertEquals(preChecksResult.size(), 2);
+ assertEquals(preChecksResult.size(), 3);
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+
assertTrue(preChecksResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
Review Comment:
Can you also add an additional check to validate the returned value for this
check? is it possible to add disk utilization related tests here?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment,
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold) {
+ boolean isDiskUtilSafe = true;
+ StringBuilder message = new StringBuilder("UNSAFE. Servers with unsafe
disk util footprint: ");
+ String sep = "";
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ long avgSegmentSize = getAverageSegmentSize(tableSubTypeSizeDetails,
currentAssignment);
+
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ DiskUsageInfo diskUsage = getDiskUsageInfoOfInstance(server);
+
+ if (diskUsage.getTotalSpaceBytes() < 0) {
+ return "Disk usage info not enabled. Try to set
controller.enable.resource.utilization.check=true";
+ }
+
+ Set<String> segmentSet = entry.getValue();
+
+ Set<String> newSegmentSet = new HashSet<>(segmentSet);
+ Set<String> existingSegmentSet = new HashSet<>();
+ Set<String> intersection = new HashSet<>();
+ if (existingServersToSegmentMap.containsKey(server)) {
+ Set<String> segmentSetForServer =
existingServersToSegmentMap.get(server);
+ existingSegmentSet.addAll(segmentSetForServer);
+ intersection.addAll(segmentSetForServer);
+ intersection.retainAll(newSegmentSet);
+ }
+ newSegmentSet.removeAll(intersection);
+ Set<String> removedSegmentSet = new HashSet<>(existingSegmentSet);
+ removedSegmentSet.removeAll(intersection);
+
+ long diskUtilizationGain = newSegmentSet.size() * avgSegmentSize;
+ long diskUtilizationLoss = removedSegmentSet.size() * avgSegmentSize;
Review Comment:
looks like you don't do anything with `diskUtilizationLoss` here.
Can we have 2 disk utilization pre-checks:
- Worst case disk utilization during rebalance:
`diskUsage.getUsedSpaceBytes() + diskUtilizationGain`
- Actual disk utilization after rebalance were we account for deleted data
too: `diskUsage.getUsedSpaceBytes() + diskUtilizationGain - diskUtilizationLoss`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -299,18 +292,28 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
+ "rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while calculating target assignment: " + e,
instancePartitionsMap,
- tierToInstancePartitionsMap, null, preChecksResult, null);
+ tierToInstancePartitionsMap, null, null, null);
}
boolean segmentAssignmentUnchanged =
currentAssignment.equals(targetAssignment);
LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {},
tierInstancePartitionsUnchanged: {}, "
+ "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId,
instancePartitionsUnchanged,
tierInstancePartitionsUnchanged, segmentAssignmentUnchanged,
tableNameWithType);
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails =
fetchTableSizeDetails(tableNameWithType);
+
+ Map<String, String> preChecksResult = null;
+ if (preChecks && _rebalancePreChecker != null) {
+ // TODO: consider making an error or warning log when pre-checks are
enabled but the pre-checker is not set
+ RebalancePreChecker.TableFacts tableFacts = new
RebalancePreChecker.TableFacts(rebalanceJobId, tableNameWithType,
+ tableConfig, currentAssignment, targetAssignment,
tableSubTypeSizeDetails);
+ preChecksResult = _rebalancePreChecker.check(tableFacts);
+ }
Review Comment:
nit: let's add an else for if `preChecks` are enabled but
`_rebalancePreChecker` is null so that we can debug this scenario from logs
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -572,28 +575,28 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
}
}
- private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
- long tableSizePerReplicaInBytes = -1;
+ private TableSizeReader.TableSubTypeSizeDetails fetchTableSizeDetails(String
tableNameWithType) {
if (_tableSizeReader == null) {
LOGGER.warn("tableSizeReader is null, cannot calculate table size for
table {}!", tableNameWithType);
- return tableSizePerReplicaInBytes;
+ return null;
}
LOGGER.info("Fetching the table size for rebalance summary for table: {}",
tableNameWithType);
Review Comment:
nit: reword this to remove "for rebalance summary" since it will be used for
both pre-checks and summary
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -513,6 +533,106 @@ public void testRebalance()
assertNull(rebalanceResult.getPreChecksResult());
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+
+ for (int i = 0; i < numServers; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ for (int i = 0; i < numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + (numServers + i));
+ }
+ executorService.shutdown();
+ }
+
+ @Test
+ public void testRebalancePreCheckerDiskUtil()
+ throws Exception {
+ int numServers = 3;
+ // Mock disk usage
+ Map<String, DiskUsageInfo> diskUsageInfoMap = new HashMap<>();
+
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ DiskUsageInfo diskUsageInfo1 =
+ new DiskUsageInfo(instanceId, "", 1000L, 200L,
System.currentTimeMillis());
+ diskUsageInfoMap.put(instanceId, diskUsageInfo1);
+ }
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+ preChecker.init(_helixResourceManager, executorService, 0.5);
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
+ _helixResourceManager.getTableSizeReader());
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
+
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegments = 10;
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(OFFLINE_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i), null);
+ }
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+
+ // Add 3 more servers
+ int numServersToAdd = 3;
+ for (int i = 0; i < numServersToAdd; i++) {
+ String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
(numServers + i);
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ DiskUsageInfo diskUsageInfo =
+ new DiskUsageInfo(instanceId, "", 1000L, 200L,
System.currentTimeMillis());
+ diskUsageInfoMap.put(instanceId, diskUsageInfo);
+ }
+
+ ResourceUtilizationInfo.setDiskUsageInfo(diskUsageInfoMap);
+
+ // Rebalance in dry-run mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setPreChecks(true);
+
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ Map<String, String> preCheckResult = rebalanceResult.getPreChecksResult();
+ assertNotNull(preCheckResult);
+
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
+ // Sending request to servers should fail for all, so needsPreprocess
should be set to "error" to indicate that a
+ // manual check is needed
+
assertEquals(preCheckResult.get(DefaultRebalancePreChecker.DISK_UTILIZATION),
"Within threshold");
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ String instanceId = "preCheckerDiskUtil_" + SERVER_INSTANCE_ID_PREFIX +
i;
+ DiskUsageInfo diskUsageInfo =
+ new DiskUsageInfo(instanceId, "", 1000L, 755L,
System.currentTimeMillis());
+ diskUsageInfoMap.put(instanceId, diskUsageInfo);
+ }
+
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setPreChecks(true);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ preCheckResult = rebalanceResult.getPreChecksResult();
+ assertNotNull(preCheckResult);
+
assertTrue(preCheckResult.containsKey(DefaultRebalancePreChecker.DISK_UTILIZATION));
+ // Sending request to servers should fail for all, so needsPreprocess
should be set to "error" to indicate that a
Review Comment:
is this a copy paste error? you don't seem to be checking for reload
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -138,4 +157,80 @@ private boolean checkIsMinimizeDataMovement(String
rebalanceJobId, String tableN
return false;
}
}
+
+ private String checkDiskUtilization(String tableNameWithType, Map<String,
Map<String, String>> currentAssignment,
Review Comment:
Just calling out that the pre-check results will be changing a bit as done
in https://github.com/apache/pinot/pull/15233, you'll have to rebase and pick
up that change once it is merged
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalancePreChecker.java:
##########
@@ -22,10 +22,33 @@
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.TableConfig;
public interface RebalancePreChecker {
- void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable
ExecutorService executorService);
- Map<String, String> check(String rebalanceJobId, String tableNameWithType,
TableConfig tableConfig);
+ void init(PinotHelixResourceManager pinotHelixResourceManager, @Nullable
ExecutorService executorService,
+ double diskUtilizationThreshold);
+
+ class TableFacts {
Review Comment:
nit: can we rename this to `PreCheckContext` since this is specifically used
for pre-checks only.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -42,19 +47,28 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
public static final String NEEDS_RELOAD_STATUS = "needsReloadStatus";
public static final String IS_MINIMIZE_DATA_MOVEMENT =
"isMinimizeDataMovement";
+ public static final String DISK_UTILIZATION = "diskUtilization";
+
+ private static double _diskUtilizationThreshold;
protected PinotHelixResourceManager _pinotHelixResourceManager;
protected ExecutorService _executorService;
@Override
- public void init(PinotHelixResourceManager pinotHelixResourceManager,
@Nullable ExecutorService executorService) {
+ public void init(PinotHelixResourceManager pinotHelixResourceManager,
@Nullable ExecutorService executorService,
+ double diskUtilizationThreshold) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_executorService = executorService;
+ _diskUtilizationThreshold = diskUtilizationThreshold;
}
@Override
- public Map<String, String> check(String rebalanceJobId, String
tableNameWithType, TableConfig tableConfig) {
- LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ public Map<String, String> check(TableFacts tableFacts) {
Review Comment:
+1 - I think in the future we might want to make the threshold configurable
via `RebalanceConfig` in which case having it passed in from `check` is better.
We can still have the config based `_diskUtilizationThreshold` in init though,
as that could be the default if no `RebalanceConfig` override is provided. what
do you folks think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]