yashmayya commented on code in PR #16341:
URL: https://github.com/apache/pinot/pull/16341#discussion_r2255908123
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -352,35 +351,49 @@ private RebalancePreCheckerResult
checkRebalanceConfig(RebalanceConfig rebalance
List<String> segmentsToMove =
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
int numReplicas = Integer.MAX_VALUE;
- if (rebalanceConfig.isDowntime() ||
PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+ String peerSegmentDownloadScheme =
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
+ if (rebalanceConfig.isDowntime() || peerSegmentDownloadScheme != null) {
for (String segment : segmentsToMove) {
numReplicas = Math.min(targetAssignment.get(segment).size(),
numReplicas);
}
}
+ // For non-peer download enabled tables, warn if downtime is enabled but
numReplicas > 1. Should only use
+ // downtime=true for such tables if downtime is indeed acceptable whereas
for numReplicas = 1, rebalance cannot
+ // be done without downtime
if (rebalanceConfig.isDowntime()) {
if (!segmentsToMove.isEmpty() && numReplicas > 1) {
pass = false;
warnings.add("Number of replicas (" + numReplicas + ") is greater than
1, downtime is not recommended.");
}
}
- // It was revealed a risk of data loss for pauseless tables during
rebalance, when downtime=true or
- // minAvailableReplicas=0 -- If a segment is being moved and has not yet
uploaded to deep store, premature
- // deletion could cause irrecoverable data loss. This pre-check added as a
workaround to warn the potential risk.
- // TODO: Get to the root cause of the issue and revisit this pre-check.
- if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+ // Peer download enabled tables may have data loss during rebalance, when
downtime=true or minAvailableReplicas=0.
+ // The scenario plays out as follows:
+ // 1. If the newly built consuming segment is cannot be uploaded to deep
store, it may set up the download URI
Review Comment:
```suggestion
// 1. If the newly built consuming segment cannot be uploaded to deep
store, it may set up the download URI
```
nit
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +416,29 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tierToInstancePartitionsMap, rebalanceConfig);
}
}
+
+ // If peer-download is enabled, verify that for all segments with
changes in assignment, it is safe to rebalance
+ // Create the DataLossRiskAssessor which is used to check for data loss
scenarios if peer-download is enabled
+ // for a table. Skip this step if allowPeerDownloadDataLoss = true
+ if (peerSegmentDownloadScheme != null && !allowPeerDownloadDataLoss) {
+ // Setting minAvailableReplicas to 0 since that is what's equivalent
to downtime=true
+ DataLossRiskAssessor dataLossRiskAssessor = new
PeerDownloadTableDataLossRiskAssessor(tableNameWithType,
+ tableConfig, 0, _helixManager, _pinotLLCRealtimeSegmentManager);
Review Comment:
nit: seems a bit odd to pass in `minAvailableReplicas` into the constructor
for `PeerDownloadTableDataLossRiskAssessor` just to validate that it's 0 when
we're already checking that before even instantiating.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ /**
+ * Assess the risk of data loss for the given segment.
+ *
+ * @param segmentName Name of the segment to assess
+ * @return A pair where the first element indicates if there is a risk of
data loss, and the second element is a
+ * message describing the risk (if any).
+ */
+ Pair<Boolean, String> assessDataLossRisk(String segmentName);
+ }
+
+ /**
+ * To be used for non-peer download enabled tables or peer-download enabled
tables rebalanced with
+ * minAvailableReplicas > 0
+ */
+ @VisibleForTesting
+ static class NoOpRiskAssessor implements DataLossRiskAssessor {
+ NoOpRiskAssessor() {
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ return Pair.of(false, "");
+ }
+ }
+
+ /**
+ * To be used for peer-download enabled tables with downtime=true or
minAvailableReplicas=0
+ */
+ @VisibleForTesting
+ static class PeerDownloadTableDataLossRiskAssessor implements
DataLossRiskAssessor {
+ private final String _tableNameWithType;
+ private final TableConfig _tableConfig;
+ private final HelixManager _helixManager;
+ private final PinotLLCRealtimeSegmentManager
_pinotLLCRealtimeSegmentManager;
+ private final boolean _isPauselessEnabled;
+
+ @VisibleForTesting
+ PeerDownloadTableDataLossRiskAssessor(String tableNameWithType,
TableConfig tableConfig,
+ int minAvailableReplicas, HelixManager helixManager,
+ PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+ // Should only be created for peer-download enabled tables with
minAvailableReplicas = 0
+
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
!= null
+ && minAvailableReplicas == 0);
+ _tableNameWithType = tableNameWithType;
+ _tableConfig = tableConfig;
+ _helixManager = helixManager;
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _isPauselessEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+ .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType, segmentName);
+ if (segmentZKMetadata == null) {
+ return Pair.of(false, "");
+ }
Review Comment:
What scenarios would this occur in? Is it safe to assume no data loss is
possible if the segment ZK metadata is missing?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2552,7 +2552,7 @@ public void
repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableC
}
}
- private boolean allowRepairOfErrorSegments(boolean
repairErrorSegmentsForPartialUpsertOrDedup,
+ public boolean allowRepairOfErrorSegments(boolean
repairErrorSegmentsForPartialUpsertOrDedup,
Review Comment:
Small Javadoc would be useful here considering it's now a public method.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +416,29 @@ private RebalanceResult doRebalance(TableConfig
tableConfig, RebalanceConfig reb
tierToInstancePartitionsMap, rebalanceConfig);
}
}
+
+ // If peer-download is enabled, verify that for all segments with
changes in assignment, it is safe to rebalance
+ // Create the DataLossRiskAssessor which is used to check for data loss
scenarios if peer-download is enabled
+ // for a table. Skip this step if allowPeerDownloadDataLoss = true
+ if (peerSegmentDownloadScheme != null && !allowPeerDownloadDataLoss) {
+ // Setting minAvailableReplicas to 0 since that is what's equivalent
to downtime=true
+ DataLossRiskAssessor dataLossRiskAssessor = new
PeerDownloadTableDataLossRiskAssessor(tableNameWithType,
+ tableConfig, 0, _helixManager, _pinotLLCRealtimeSegmentManager);
+ for (Map.Entry<String, Map<String, String>> segmentToAssignment :
currentAssignment.entrySet()) {
+ String segmentName = segmentToAssignment.getKey();
+ Map<String, String> assignment = segmentToAssignment.getValue();
+ Pair<Boolean, String> dataLossResult =
dataLossRiskAssessor.assessDataLossRisk(segmentName);
Review Comment:
Can this be moved after the check for assignment inequality to reduce
segment ZK metadata access?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ /**
+ * Assess the risk of data loss for the given segment.
+ *
+ * @param segmentName Name of the segment to assess
+ * @return A pair where the first element indicates if there is a risk of
data loss, and the second element is a
+ * message describing the risk (if any).
+ */
+ Pair<Boolean, String> assessDataLossRisk(String segmentName);
+ }
+
+ /**
+ * To be used for non-peer download enabled tables or peer-download enabled
tables rebalanced with
+ * minAvailableReplicas > 0
+ */
+ @VisibleForTesting
+ static class NoOpRiskAssessor implements DataLossRiskAssessor {
+ NoOpRiskAssessor() {
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ return Pair.of(false, "");
+ }
+ }
+
+ /**
+ * To be used for peer-download enabled tables with downtime=true or
minAvailableReplicas=0
+ */
+ @VisibleForTesting
+ static class PeerDownloadTableDataLossRiskAssessor implements
DataLossRiskAssessor {
+ private final String _tableNameWithType;
+ private final TableConfig _tableConfig;
+ private final HelixManager _helixManager;
+ private final PinotLLCRealtimeSegmentManager
_pinotLLCRealtimeSegmentManager;
+ private final boolean _isPauselessEnabled;
+
+ @VisibleForTesting
+ PeerDownloadTableDataLossRiskAssessor(String tableNameWithType,
TableConfig tableConfig,
+ int minAvailableReplicas, HelixManager helixManager,
+ PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+ // Should only be created for peer-download enabled tables with
minAvailableReplicas = 0
+
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
!= null
+ && minAvailableReplicas == 0);
+ _tableNameWithType = tableNameWithType;
+ _tableConfig = tableConfig;
+ _helixManager = helixManager;
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _isPauselessEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+ .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType, segmentName);
+ if (segmentZKMetadata == null) {
+ return Pair.of(false, "");
+ }
+
+ // If the segment state is COMPLETED and the download URL is empty,
there is a data loss risk
+ if (segmentZKMetadata.getStatus().isCompleted() &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(
+ segmentZKMetadata.getDownloadUrl())) {
+ return Pair.of(true, generateDataLossRiskMessage(segmentName));
+ }
+
+ // If the segment is not yet completed, then the following scenarios are
possible:
+ // - Non-upsert / non-dedup table:
+ // - data loss scenarios are not possible. Either the segment will
restart consumption or the
+ // RealtimeSegmentValidationManager will kick in to fix up the
segment if pauseless is enabled
+ // - Upsert / dedup table:
+ // - For non-pauseless tables, it is safe to move the segment
without data loss concerns
+ // - For pauseless tables, if the segment is still in CONSUMING
state, moving it is safe, but if it is in
+ // COMMITTING state then there is a risk of data loss on segment
build failures as well since the
+ // RealtimeSegmentValidationManager does not automatically try to
fix up these segments. To be safe it is
+ // best to return that there is a risk of data loss for pauseless
enabled tables for segments in COMMITTING
+ // state
+ if (_isPauselessEnabled && segmentZKMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.COMMITTING
+ &&
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false,
_tableConfig)) {
+ return Pair.of(true, generateDataLossRiskMessage(segmentName));
Review Comment:
Shouldn't this scenario have a different message as the other one?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ /**
+ * Assess the risk of data loss for the given segment.
+ *
+ * @param segmentName Name of the segment to assess
+ * @return A pair where the first element indicates if there is a risk of
data loss, and the second element is a
+ * message describing the risk (if any).
+ */
+ Pair<Boolean, String> assessDataLossRisk(String segmentName);
+ }
+
+ /**
+ * To be used for non-peer download enabled tables or peer-download enabled
tables rebalanced with
+ * minAvailableReplicas > 0
+ */
+ @VisibleForTesting
+ static class NoOpRiskAssessor implements DataLossRiskAssessor {
+ NoOpRiskAssessor() {
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ return Pair.of(false, "");
+ }
+ }
+
+ /**
+ * To be used for peer-download enabled tables with downtime=true or
minAvailableReplicas=0
+ */
+ @VisibleForTesting
+ static class PeerDownloadTableDataLossRiskAssessor implements
DataLossRiskAssessor {
+ private final String _tableNameWithType;
+ private final TableConfig _tableConfig;
+ private final HelixManager _helixManager;
+ private final PinotLLCRealtimeSegmentManager
_pinotLLCRealtimeSegmentManager;
+ private final boolean _isPauselessEnabled;
+
+ @VisibleForTesting
+ PeerDownloadTableDataLossRiskAssessor(String tableNameWithType,
TableConfig tableConfig,
+ int minAvailableReplicas, HelixManager helixManager,
+ PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+ // Should only be created for peer-download enabled tables with
minAvailableReplicas = 0
+
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
!= null
+ && minAvailableReplicas == 0);
+ _tableNameWithType = tableNameWithType;
+ _tableConfig = tableConfig;
+ _helixManager = helixManager;
+ _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+ _isPauselessEnabled =
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+ .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType, segmentName);
+ if (segmentZKMetadata == null) {
+ return Pair.of(false, "");
+ }
+
+ // If the segment state is COMPLETED and the download URL is empty,
there is a data loss risk
+ if (segmentZKMetadata.getStatus().isCompleted() &&
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(
+ segmentZKMetadata.getDownloadUrl())) {
+ return Pair.of(true, generateDataLossRiskMessage(segmentName));
+ }
+
+ // If the segment is not yet completed, then the following scenarios are
possible:
+ // - Non-upsert / non-dedup table:
+ // - data loss scenarios are not possible. Either the segment will
restart consumption or the
+ // RealtimeSegmentValidationManager will kick in to fix up the
segment if pauseless is enabled
+ // - Upsert / dedup table:
+ // - For non-pauseless tables, it is safe to move the segment
without data loss concerns
+ // - For pauseless tables, if the segment is still in CONSUMING
state, moving it is safe, but if it is in
+ // COMMITTING state then there is a risk of data loss on segment
build failures as well since the
+ // RealtimeSegmentValidationManager does not automatically try to
fix up these segments. To be safe it is
+ // best to return that there is a risk of data loss for pauseless
enabled tables for segments in COMMITTING
+ // state
+ if (_isPauselessEnabled && segmentZKMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.COMMITTING
+ &&
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false,
_tableConfig)) {
+ return Pair.of(true, generateDataLossRiskMessage(segmentName));
+ }
+ return Pair.of(false, "");
+ }
+
+ private static String generateDataLossRiskMessage(String segmentName) {
+ return "Moving segment " + segmentName + " as part of rebalance is risky
for peer-download "
+ + "enabled tables, ensure the deep store has a copy of the segment
and if upsert / dedup enabled "
+ + "that it is completed and try again. It is recommended to
forceCommit and pause ingestion prior to "
Review Comment:
Why `forceCommit and pause ingestion`? Wouldn't just pausing ingestion be
sufficient?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
}
}
+ @VisibleForTesting
+ @FunctionalInterface
+ interface DataLossRiskAssessor {
+ /**
+ * Assess the risk of data loss for the given segment.
+ *
+ * @param segmentName Name of the segment to assess
+ * @return A pair where the first element indicates if there is a risk of
data loss, and the second element is a
+ * message describing the risk (if any).
+ */
+ Pair<Boolean, String> assessDataLossRisk(String segmentName);
+ }
+
+ /**
+ * To be used for non-peer download enabled tables or peer-download enabled
tables rebalanced with
+ * minAvailableReplicas > 0
+ */
+ @VisibleForTesting
+ static class NoOpRiskAssessor implements DataLossRiskAssessor {
+ NoOpRiskAssessor() {
+ }
+
+ @Override
+ public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+ return Pair.of(false, "");
Review Comment:
nit: can we use `null` instead of `""` in all the cases where the left side
of the pair is `false`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -715,6 +715,13 @@ public RebalanceResult rebalance(
@DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap,
@ApiParam(value = "Whether to allow downtime for the rebalance")
@DefaultValue("false") @QueryParam("downtime")
boolean downtime,
+ @ApiParam(value = "This flag only applies to peer-download enabled
tables undergoing downtime=true or "
+ + "minAvailableReplicas=0 rebalance (both of which can result in
possible data loss scenarios). If enabled, "
+ + "this flag will allow the rebalance to continue even in cases
where data loss scenarios have been "
+ + "detected, otherwise the rebalance will be failed and user action
will be required to rebalance again. "
+ + "This flag should be used with caution and only used in scenarios
where data loss is acceptable")
Review Comment:
Is the data loss in this scenario irrecoverable? If yes, should we even be
allowing such an operation? Is there a longer term plan for addressing this? Do
we need more documentation around this scenario?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -670,6 +669,92 @@ public void testRebalance()
}
}
+ @Test
+ public void testRebalancePeerDownloadDataLoss()
+ throws Exception {
+ for (int batchSizePerServer :
Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 1, 2)) {
+ addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX,
true);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
+ DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+ preChecker.init(_helixResourceManager, executorService, 1);
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader, null);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(1)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments with peer download uri (i.e. simulate segments
without deep store uri)
+ int numSegments = 10;
+ for (int i = 0; i < numSegments; i++) {
+ _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+ SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME,
SEGMENT_NAME_PREFIX + i),
+ CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+ }
+ Map<String, Map<String, String>> oldSegmentAssignment =
Review Comment:
Unused?
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##########
@@ -566,6 +572,7 @@ private TableRebalancer.SingleSegmentAssignment
getNextSingleSegmentAssignment(
numSegmentsToOffloadMap.merge(targetInstance, -1, Integer::sum);
}
Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new
HashMap<>();
+ // Just use a dummy segmentName for now. If needed, can modify this
function to take the segmentName as input
Review Comment:
Is this comment supposed to be somewhere else? I don't see how it's relevant
here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]