somandal commented on code in PR #16341:
URL: https://github.com/apache/pinot/pull/16341#discussion_r2257160577
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +416,29 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tierToInstancePartitionsMap, rebalanceConfig);
}
}
+
+ // If peer-download is enabled, verify that for all segments with
changes in assignment, it is safe to rebalance
+ // Create the DataLossRiskAssessor which is used to check for data loss
scenarios if peer-download is enabled
+ // for a table. Skip this step if allowPeerDownloadDataLoss = true
+ if (peerSegmentDownloadScheme != null && !allowPeerDownloadDataLoss) {
+ // Setting minAvailableReplicas to 0 since that is what's equivalent
to downtime=true
+ DataLossRiskAssessor dataLossRiskAssessor = new
PeerDownloadTableDataLossRiskAssessor(tableNameWithType,
+ tableConfig, 0, _helixManager, _pinotLLCRealtimeSegmentManager);
+ for (Map.Entry<String, Map<String, String>> segmentToAssignment :
currentAssignment.entrySet()) {
+ String segmentName = segmentToAssignment.getKey();
+ Map<String, String> assignment = segmentToAssignment.getValue();
+ Pair<Boolean, String> dataLossResult =
dataLossRiskAssessor.assessDataLossRisk(segmentName);
Review Comment:
good catch! done
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -670,6 +669,92 @@ public void testRebalance()
}
}
+ @Test
+ public void testRebalancePeerDownloadDataLoss()
+ throws Exception {
+ for (int batchSizePerServer :
Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 1, 2)) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX,
true);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+ preChecker.init(_helixResourceManager, executorService, 1);
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader, null);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(1)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments with peer download uri (i.e. simulate segments
without deep store uri)
+ int numSegments = 10;
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i),
+ CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ }
+ Map<String, Map<String, String>> oldSegmentAssignment =
Review Comment:
removed
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##########
@@ -566,6 +572,7 @@ private TableRebalancer.SingleSegmentAssignment
getNextSingleSegmentAssignment(
numSegmentsToOffloadMap.merge(targetInstance, -1, Integer::sum);
}
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new
HashMap<>();
+ // Just use a dummy segmentName for now. If needed, can modify this
function to take the segmentName as input
Review Comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]