Jackie-Jiang commented on code in PR #17706:
URL: https://github.com/apache/pinot/pull/17706#discussion_r2819085356


##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java:
##########
@@ -217,8 +217,8 @@ boolean shouldEnsureConsuming(String tableNameWithType) {
 
   private void runSegmentLevelValidation(TableConfig tableConfig) {
     String realtimeTableName = tableConfig.getTableName();
-
-    List<SegmentZKMetadata> segmentsZKMetadata = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+    List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+    _pinotHelixResourceManager.forEachSegmentsZKMetadata(realtimeTableName, 
segmentsZKMetadata::add);

Review Comment:
   This is not reducing memory footprint



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -161,32 +162,58 @@ public int getMaxAttemptsPerTask(String minionTag) {
    * @return the list of segment zk metadata for available segments in the 
table.
    */
   public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String 
tableNameWithType) {
+    return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
+  }
+
+  public List<SegmentZKMetadata> 
getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
     IdealState idealState = 
_clusterInfoAccessor.getIdealState(tableNameWithType);
+    if (idealState == null) {
+      return new ArrayList<>();
+    }
+    return getSegmentsZKMetadataInIdealState(tableNameWithType, 
segmentZKMetadata -> {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
+      return segmentZKMetadata.getStatus().isCompleted() // skip consuming 
segments
+          && instanceStateMap != null && 
!instanceStateMap.containsValue(SegmentStateModel.CONSUMING);
+      // The last check is for an edge case where
+      //   1. SegmentZKMetadata was updated to DONE in segment commit 
protocol, but
+      //   2. IdealState for the segment was not updated to ONLINE due to some 
issue in the controller.
+      // We avoid picking up such segments to allow 
RealtimeSegmentValidationManager to fix them.
+    });
+  }
+
+  private List<SegmentZKMetadata> getSegmentsZKMetadataInIdealState(String 
tableNameWithType,
+      Predicate<SegmentZKMetadata> segmentMetadataFilter) {
+    IdealState idealState = 
_clusterInfoAccessor.getIdealState(tableNameWithType);
+    if (idealState == null) {
+      return new ArrayList<>();
+    }
     Set<String> segmentsForTable = idealState.getPartitionSet();
-    List<SegmentZKMetadata> segmentZKMetadataList = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+    if (segmentsForTable == null || segmentsForTable.isEmpty()) {
+      return new ArrayList<>();
+    }
+
     List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
-    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-      if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
+    boolean[] callbackInvoked = new boolean[1];

Review Comment:
   Why do we need to track this?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java:
##########
@@ -161,32 +162,58 @@ public int getMaxAttemptsPerTask(String minionTag) {
    * @return the list of segment zk metadata for available segments in the 
table.
    */
   public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String 
tableNameWithType) {
+    return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
+  }
+
+  public List<SegmentZKMetadata> 
getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
     IdealState idealState = 
_clusterInfoAccessor.getIdealState(tableNameWithType);
+    if (idealState == null) {
+      return new ArrayList<>();
+    }
+    return getSegmentsZKMetadataInIdealState(tableNameWithType, 
segmentZKMetadata -> {
+      String segmentName = segmentZKMetadata.getSegmentName();
+      Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(segmentName);
+      return segmentZKMetadata.getStatus().isCompleted() // skip consuming 
segments
+          && instanceStateMap != null && 
!instanceStateMap.containsValue(SegmentStateModel.CONSUMING);
+      // The last check is for an edge case where
+      //   1. SegmentZKMetadata was updated to DONE in segment commit 
protocol, but
+      //   2. IdealState for the segment was not updated to ONLINE due to some 
issue in the controller.
+      // We avoid picking up such segments to allow 
RealtimeSegmentValidationManager to fix them.
+    });
+  }
+
+  private List<SegmentZKMetadata> getSegmentsZKMetadataInIdealState(String 
tableNameWithType,
+      Predicate<SegmentZKMetadata> segmentMetadataFilter) {
+    IdealState idealState = 
_clusterInfoAccessor.getIdealState(tableNameWithType);
+    if (idealState == null) {
+      return new ArrayList<>();
+    }
     Set<String> segmentsForTable = idealState.getPartitionSet();
-    List<SegmentZKMetadata> segmentZKMetadataList = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+    if (segmentsForTable == null || segmentsForTable.isEmpty()) {
+      return new ArrayList<>();
+    }
+
     List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
-    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
-      if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
+    boolean[] callbackInvoked = new boolean[1];
+    _clusterInfoAccessor.forEachSegmentsZKMetadata(tableNameWithType, 
segmentZKMetadata -> {
+      callbackInvoked[0] = true;
+      if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())
+          && (segmentMetadataFilter == null || 
segmentMetadataFilter.test(segmentZKMetadata))) {
         selectedSegmentZKMetadataList.add(segmentZKMetadata);

Review Comment:
   This is not reducing memory footprint



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to