yashmayya commented on code in PR #16341:
URL: https://github.com/apache/pinot/pull/16341#discussion_r2255908123


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java:
##########
@@ -352,35 +351,49 @@ private RebalancePreCheckerResult 
checkRebalanceConfig(RebalanceConfig rebalance
     List<String> segmentsToMove = 
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
 
     int numReplicas = Integer.MAX_VALUE;
-    if (rebalanceConfig.isDowntime() || 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+    String peerSegmentDownloadScheme = 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
+    if (rebalanceConfig.isDowntime() || peerSegmentDownloadScheme != null) {
       for (String segment : segmentsToMove) {
         numReplicas = Math.min(targetAssignment.get(segment).size(), 
numReplicas);
       }
     }
 
+    // For non-peer download enabled tables, warn if downtime is enabled but 
numReplicas > 1. Should only use
+    // downtime=true for such tables if downtime is indeed acceptable whereas 
for numReplicas = 1, rebalance cannot
+    // be done without downtime
     if (rebalanceConfig.isDowntime()) {
       if (!segmentsToMove.isEmpty() && numReplicas > 1) {
         pass = false;
         warnings.add("Number of replicas (" + numReplicas + ") is greater than 
1, downtime is not recommended.");
       }
     }
 
-    // It was revealed a risk of data loss for pauseless tables during 
rebalance, when downtime=true or
-    // minAvailableReplicas=0 -- If a segment is being moved and has not yet 
uploaded to deep store, premature
-    // deletion could cause irrecoverable data loss. This pre-check added as a 
workaround to warn the potential risk.
-    // TODO: Get to the root cause of the issue and revisit this pre-check.
-    if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+    // Peer download enabled tables may have data loss during rebalance, when 
downtime=true or minAvailableReplicas=0.
+    // The scenario plays out as follows:
+    // 1. If the newly built consuming segment is cannot be uploaded to deep 
store, it may set up the download URI

Review Comment:
   ```suggestion
       // 1. If the newly built consuming segment cannot be uploaded to deep 
store, it may set up the download URI
   ```
   nit



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +416,29 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               tierToInstancePartitionsMap, rebalanceConfig);
         }
       }
+
+      // If peer-download is enabled, verify that for all segments with 
changes in assignment, it is safe to rebalance
+      // Create the DataLossRiskAssessor which is used to check for data loss 
scenarios if peer-download is enabled
+      // for a table. Skip this step if allowPeerDownloadDataLoss = true
+      if (peerSegmentDownloadScheme != null && !allowPeerDownloadDataLoss) {
+        // Setting minAvailableReplicas to 0 since that is what's equivalent 
to downtime=true
+        DataLossRiskAssessor dataLossRiskAssessor = new 
PeerDownloadTableDataLossRiskAssessor(tableNameWithType,
+            tableConfig, 0, _helixManager, _pinotLLCRealtimeSegmentManager);

Review Comment:
   nit: seems a bit odd to pass in `minAvailableReplicas` into the constructor 
for `PeerDownloadTableDataLossRiskAssessor` just to validate that it's 0 when 
we're already checking that before even instantiating.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    /**
+     * Assess the risk of data loss for the given segment.
+     *
+     * @param segmentName Name of the segment to assess
+     * @return A pair where the first element indicates if there is a risk of 
data loss, and the second element is a
+     *         message describing the risk (if any).
+     */
+    Pair<Boolean, String> assessDataLossRisk(String segmentName);
+  }
+
+  /**
+   * To be used for non-peer download enabled tables or peer-download enabled 
tables rebalanced with
+   * minAvailableReplicas > 0
+   */
+  @VisibleForTesting
+  static class NoOpRiskAssessor implements DataLossRiskAssessor {
+    NoOpRiskAssessor() {
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      return Pair.of(false, "");
+    }
+  }
+
+  /**
+   * To be used for peer-download enabled tables with downtime=true or 
minAvailableReplicas=0
+   */
+  @VisibleForTesting
+  static class PeerDownloadTableDataLossRiskAssessor implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final TableConfig _tableConfig;
+    private final HelixManager _helixManager;
+    private final PinotLLCRealtimeSegmentManager 
_pinotLLCRealtimeSegmentManager;
+    private final boolean _isPauselessEnabled;
+
+    @VisibleForTesting
+    PeerDownloadTableDataLossRiskAssessor(String tableNameWithType, 
TableConfig tableConfig,
+        int minAvailableReplicas, HelixManager helixManager,
+        PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+      // Should only be created for peer-download enabled tables with 
minAvailableReplicas = 0
+      
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
 != null
+          && minAvailableReplicas == 0);
+      _tableNameWithType = tableNameWithType;
+      _tableConfig = tableConfig;
+      _helixManager = helixManager;
+      _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+          .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType, segmentName);
+      if (segmentZKMetadata == null) {
+        return Pair.of(false, "");
+      }

Review Comment:
   What scenarios would this occur in? Is it safe to assume no data loss is 
possible if the segment ZK metadata is missing?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -2552,7 +2552,7 @@ public void 
repairSegmentsInErrorStateForPauselessConsumption(TableConfig tableC
     }
   }
 
-  private boolean allowRepairOfErrorSegments(boolean 
repairErrorSegmentsForPartialUpsertOrDedup,
+  public boolean allowRepairOfErrorSegments(boolean 
repairErrorSegmentsForPartialUpsertOrDedup,

Review Comment:
   Small Javadoc would be useful here considering it's now a public method.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -411,6 +416,29 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               tierToInstancePartitionsMap, rebalanceConfig);
         }
       }
+
+      // If peer-download is enabled, verify that for all segments with 
changes in assignment, it is safe to rebalance
+      // Create the DataLossRiskAssessor which is used to check for data loss 
scenarios if peer-download is enabled
+      // for a table. Skip this step if allowPeerDownloadDataLoss = true
+      if (peerSegmentDownloadScheme != null && !allowPeerDownloadDataLoss) {
+        // Setting minAvailableReplicas to 0 since that is what's equivalent 
to downtime=true
+        DataLossRiskAssessor dataLossRiskAssessor = new 
PeerDownloadTableDataLossRiskAssessor(tableNameWithType,
+            tableConfig, 0, _helixManager, _pinotLLCRealtimeSegmentManager);
+        for (Map.Entry<String, Map<String, String>> segmentToAssignment : 
currentAssignment.entrySet()) {
+          String segmentName = segmentToAssignment.getKey();
+          Map<String, String> assignment = segmentToAssignment.getValue();
+          Pair<Boolean, String> dataLossResult = 
dataLossRiskAssessor.assessDataLossRisk(segmentName);

Review Comment:
   Can this be moved after the check for assignment inequality to reduce 
segment ZK metadata access?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    /**
+     * Assess the risk of data loss for the given segment.
+     *
+     * @param segmentName Name of the segment to assess
+     * @return A pair where the first element indicates if there is a risk of 
data loss, and the second element is a
+     *         message describing the risk (if any).
+     */
+    Pair<Boolean, String> assessDataLossRisk(String segmentName);
+  }
+
+  /**
+   * To be used for non-peer download enabled tables or peer-download enabled 
tables rebalanced with
+   * minAvailableReplicas > 0
+   */
+  @VisibleForTesting
+  static class NoOpRiskAssessor implements DataLossRiskAssessor {
+    NoOpRiskAssessor() {
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      return Pair.of(false, "");
+    }
+  }
+
+  /**
+   * To be used for peer-download enabled tables with downtime=true or 
minAvailableReplicas=0
+   */
+  @VisibleForTesting
+  static class PeerDownloadTableDataLossRiskAssessor implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final TableConfig _tableConfig;
+    private final HelixManager _helixManager;
+    private final PinotLLCRealtimeSegmentManager 
_pinotLLCRealtimeSegmentManager;
+    private final boolean _isPauselessEnabled;
+
+    @VisibleForTesting
+    PeerDownloadTableDataLossRiskAssessor(String tableNameWithType, 
TableConfig tableConfig,
+        int minAvailableReplicas, HelixManager helixManager,
+        PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+      // Should only be created for peer-download enabled tables with 
minAvailableReplicas = 0
+      
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
 != null
+          && minAvailableReplicas == 0);
+      _tableNameWithType = tableNameWithType;
+      _tableConfig = tableConfig;
+      _helixManager = helixManager;
+      _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+          .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType, segmentName);
+      if (segmentZKMetadata == null) {
+        return Pair.of(false, "");
+      }
+
+      // If the segment state is COMPLETED and the download URL is empty, 
there is a data loss risk
+      if (segmentZKMetadata.getStatus().isCompleted() && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(
+          segmentZKMetadata.getDownloadUrl())) {
+        return Pair.of(true, generateDataLossRiskMessage(segmentName));
+      }
+
+      // If the segment is not yet completed, then the following scenarios are 
possible:
+      // - Non-upsert / non-dedup table:
+      //     - data loss scenarios are not possible. Either the segment will 
restart consumption or the
+      //       RealtimeSegmentValidationManager will kick in to fix up the 
segment if pauseless is enabled
+      // - Upsert / dedup table:
+      //     - For non-pauseless tables, it is safe to move the segment 
without data loss concerns
+      //     - For pauseless tables, if the segment is still in CONSUMING 
state, moving it is safe, but if it is in
+      //       COMMITTING state then there is a risk of data loss on segment 
build failures as well since the
+      //       RealtimeSegmentValidationManager does not automatically try to 
fix up these segments. To be safe it is
+      //       best to return that there is a risk of data loss for pauseless 
enabled tables for segments in COMMITTING
+      //       state
+      if (_isPauselessEnabled && segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING
+          && 
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false, 
_tableConfig)) {
+        return Pair.of(true, generateDataLossRiskMessage(segmentName));

Review Comment:
   Shouldn't this scenario have a different message as the other one?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    /**
+     * Assess the risk of data loss for the given segment.
+     *
+     * @param segmentName Name of the segment to assess
+     * @return A pair where the first element indicates if there is a risk of 
data loss, and the second element is a
+     *         message describing the risk (if any).
+     */
+    Pair<Boolean, String> assessDataLossRisk(String segmentName);
+  }
+
+  /**
+   * To be used for non-peer download enabled tables or peer-download enabled 
tables rebalanced with
+   * minAvailableReplicas > 0
+   */
+  @VisibleForTesting
+  static class NoOpRiskAssessor implements DataLossRiskAssessor {
+    NoOpRiskAssessor() {
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      return Pair.of(false, "");
+    }
+  }
+
+  /**
+   * To be used for peer-download enabled tables with downtime=true or 
minAvailableReplicas=0
+   */
+  @VisibleForTesting
+  static class PeerDownloadTableDataLossRiskAssessor implements 
DataLossRiskAssessor {
+    private final String _tableNameWithType;
+    private final TableConfig _tableConfig;
+    private final HelixManager _helixManager;
+    private final PinotLLCRealtimeSegmentManager 
_pinotLLCRealtimeSegmentManager;
+    private final boolean _isPauselessEnabled;
+
+    @VisibleForTesting
+    PeerDownloadTableDataLossRiskAssessor(String tableNameWithType, 
TableConfig tableConfig,
+        int minAvailableReplicas, HelixManager helixManager,
+        PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager) {
+      // Should only be created for peer-download enabled tables with 
minAvailableReplicas = 0
+      
Preconditions.checkState(tableConfig.getValidationConfig().getPeerSegmentDownloadScheme()
 != null
+          && minAvailableReplicas == 0);
+      _tableNameWithType = tableNameWithType;
+      _tableConfig = tableConfig;
+      _helixManager = helixManager;
+      _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
+      _isPauselessEnabled = 
PauselessConsumptionUtils.isPauselessEnabled(tableConfig);
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      SegmentZKMetadata segmentZKMetadata = ZKMetadataProvider
+          .getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
_tableNameWithType, segmentName);
+      if (segmentZKMetadata == null) {
+        return Pair.of(false, "");
+      }
+
+      // If the segment state is COMPLETED and the download URL is empty, 
there is a data loss risk
+      if (segmentZKMetadata.getStatus().isCompleted() && 
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(
+          segmentZKMetadata.getDownloadUrl())) {
+        return Pair.of(true, generateDataLossRiskMessage(segmentName));
+      }
+
+      // If the segment is not yet completed, then the following scenarios are 
possible:
+      // - Non-upsert / non-dedup table:
+      //     - data loss scenarios are not possible. Either the segment will 
restart consumption or the
+      //       RealtimeSegmentValidationManager will kick in to fix up the 
segment if pauseless is enabled
+      // - Upsert / dedup table:
+      //     - For non-pauseless tables, it is safe to move the segment 
without data loss concerns
+      //     - For pauseless tables, if the segment is still in CONSUMING 
state, moving it is safe, but if it is in
+      //       COMMITTING state then there is a risk of data loss on segment 
build failures as well since the
+      //       RealtimeSegmentValidationManager does not automatically try to 
fix up these segments. To be safe it is
+      //       best to return that there is a risk of data loss for pauseless 
enabled tables for segments in COMMITTING
+      //       state
+      if (_isPauselessEnabled && segmentZKMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING
+          && 
!_pinotLLCRealtimeSegmentManager.allowRepairOfErrorSegments(false, 
_tableConfig)) {
+        return Pair.of(true, generateDataLossRiskMessage(segmentName));
+      }
+      return Pair.of(false, "");
+    }
+
+    private static String generateDataLossRiskMessage(String segmentName) {
+      return "Moving segment " + segmentName + " as part of rebalance is risky 
for peer-download "
+          + "enabled tables, ensure the deep store has a copy of the segment 
and if upsert / dedup enabled "
+          + "that it is completed and try again. It is recommended to 
forceCommit and pause ingestion prior to "

Review Comment:
   Why `forceCommit and pause ingestion`? Wouldn't just pausing ingestion be 
sufficient?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1813,9 +1880,103 @@ public int fetch(String segmentName) {
     }
   }
 
+  @VisibleForTesting
+  @FunctionalInterface
+  interface DataLossRiskAssessor {
+    /**
+     * Assess the risk of data loss for the given segment.
+     *
+     * @param segmentName Name of the segment to assess
+     * @return A pair where the first element indicates if there is a risk of 
data loss, and the second element is a
+     *         message describing the risk (if any).
+     */
+    Pair<Boolean, String> assessDataLossRisk(String segmentName);
+  }
+
+  /**
+   * To be used for non-peer download enabled tables or peer-download enabled 
tables rebalanced with
+   * minAvailableReplicas > 0
+   */
+  @VisibleForTesting
+  static class NoOpRiskAssessor implements DataLossRiskAssessor {
+    NoOpRiskAssessor() {
+    }
+
+    @Override
+    public Pair<Boolean, String> assessDataLossRisk(String segmentName) {
+      return Pair.of(false, "");

Review Comment:
   nit: can we use `null` instead of `""` in all the cases where the left side 
of the pair is `false`? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -715,6 +715,13 @@ public RebalanceResult rebalance(
       @DefaultValue("false") @QueryParam("bootstrap") boolean bootstrap,
       @ApiParam(value = "Whether to allow downtime for the rebalance") 
@DefaultValue("false") @QueryParam("downtime")
       boolean downtime,
+      @ApiParam(value = "This flag only applies to peer-download enabled 
tables undergoing downtime=true or "
+          + "minAvailableReplicas=0 rebalance (both of which can result in 
possible data loss scenarios). If enabled, "
+          + "this flag will allow the rebalance to continue even in cases 
where data loss scenarios have been "
+          + "detected, otherwise the rebalance will be failed and user action 
will be required to rebalance again. "
+          + "This flag should be used with caution and only used in scenarios 
where data loss is acceptable")

Review Comment:
   Is the data loss in this scenario irrecoverable? If yes, should we even be 
allowing such an operation? Is there a longer term plan for addressing this? Do 
we need more documentation around this scenario?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -670,6 +669,92 @@ public void testRebalance()
     }
   }
 
+  @Test
+  public void testRebalancePeerDownloadDataLoss()
+      throws Exception {
+    for (int batchSizePerServer : 
Arrays.asList(RebalanceConfig.DISABLE_BATCH_SIZE_PER_SERVER, 1, 2)) {
+      addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX, 
true);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(10);
+      DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
+      preChecker.init(_helixResourceManager, executorService, 1);
+      TableRebalancer tableRebalancer =
+          new TableRebalancer(_helixManager, null, null, preChecker, 
_tableSizeReader, null);
+      TableConfig tableConfig =
+          new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+              .setNumReplicas(1)
+              
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+              .build();
+      // Create the table
+      addDummySchema(RAW_TABLE_NAME);
+      _helixResourceManager.addTable(tableConfig);
+
+      // Add the segments with peer download uri (i.e. simulate segments 
without deep store uri)
+      int numSegments = 10;
+      for (int i = 0; i < numSegments; i++) {
+        _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+            SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME, 
SEGMENT_NAME_PREFIX + i),
+            CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
+      }
+      Map<String, Map<String, String>> oldSegmentAssignment =

Review Comment:
   Unused?



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##########
@@ -566,6 +572,7 @@ private TableRebalancer.SingleSegmentAssignment 
getNextSingleSegmentAssignment(
       numSegmentsToOffloadMap.merge(targetInstance, -1, Integer::sum);
     }
     Map<Pair<Set<String>, Set<String>>, Set<String>> assignmentMap = new 
HashMap<>();
+    // Just use a dummy segmentName for now. If needed, can modify this 
function to take the segmentName as input

Review Comment:
   Is this comment supposed to be somewhere else? I don't see how it's relevant 
here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to