This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 562854b6883 Optimise fetchPartitionGroupIdToSmallestOffset in 
RealtimeSegmentManager (#17712)
562854b6883 is described below

commit 562854b6883d39705fe2efa1483d93bb01eebd25
Author: NOOB <[email protected]>
AuthorDate: Sat Mar 7 02:32:49 2026 +0530

    Optimise fetchPartitionGroupIdToSmallestOffset in RealtimeSegmentManager 
(#17712)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 56 +++++++++++++++--
 .../PinotLLCRealtimeSegmentManagerTest.java        | 71 ++++++++++++++++++++++
 2 files changed, 123 insertions(+), 4 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9ed39b67c46..120ab8841bd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -1849,7 +1849,8 @@ public class PinotLLCRealtimeSegmentManager {
 
           // Smallest offset is fetched from stream once and cached in 
partitionIdToSmallestOffset.
           if (partitionIdToSmallestOffset == null) {
-            partitionIdToSmallestOffset = 
fetchPartitionGroupIdToSmallestOffset(streamConfigs, idealState);
+            partitionIdToSmallestOffset =
+                fetchPartitionGroupIdToSmallestOffset(streamConfigs, 
idealState, latestSegmentZKMetadataMap);
           }
 
           // Do not create new CONSUMING segment when the stream partition has 
reached end of life.
@@ -1950,11 +1951,13 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private Map<Integer, StreamPartitionMsgOffset> 
fetchPartitionGroupIdToSmallestOffset(List<StreamConfig> streamConfigs,
-      IdealState idealState) {
+      IdealState idealState, Map<Integer, SegmentZKMetadata> 
latestSegmentZKMetadataMap) {
+    // Build consumption status from pre-computed ZK metadata map instead of 
rescanning IdealState (O(1) vs O(N))
+    List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
+        
buildPartitionGroupConsumptionStatusFromZKMetadata(latestSegmentZKMetadataMap, 
streamConfigs);
+
     Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = 
new HashMap<>();
     for (StreamConfig streamConfig : streamConfigs) {
-      List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
-          getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
       OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
       streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
 
@@ -1976,6 +1979,51 @@ public class PinotLLCRealtimeSegmentManager {
     return partitionGroupIdToSmallestOffset;
   }
 
+  /**
+   * Builds {@link PartitionGroupConsumptionStatus} list from the pre-computed 
latest segment ZK metadata map,
+   * avoiding an O(N) scan of all IdealState segments that {@link 
#getPartitionGroupConsumptionStatusList} performs.
+   */
+  @VisibleForTesting
+  List<PartitionGroupConsumptionStatus> 
buildPartitionGroupConsumptionStatusFromZKMetadata(
+      Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap, 
List<StreamConfig> streamConfigs) {
+    List<PartitionGroupConsumptionStatus> result = new 
ArrayList<>(latestSegmentZKMetadataMap.size());
+    int numStreams = streamConfigs.size();
+    if (numStreams == 1) {
+      StreamPartitionMsgOffsetFactory offsetFactory =
+          
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
+      for (Map.Entry<Integer, SegmentZKMetadata> entry : 
latestSegmentZKMetadataMap.entrySet()) {
+        int partitionGroupId = entry.getKey();
+        SegmentZKMetadata zkMetadata = entry.getValue();
+        LLCSegmentName llcSegmentName = new 
LLCSegmentName(zkMetadata.getSegmentName());
+        result.add(new PartitionGroupConsumptionStatus(partitionGroupId, 
llcSegmentName.getSequenceNumber(),
+            offsetFactory.create(zkMetadata.getStartOffset()),
+            zkMetadata.getEndOffset() != null ? 
offsetFactory.create(zkMetadata.getEndOffset()) : null,
+            zkMetadata.getStatus().toString()));
+      }
+    } else {
+      StreamPartitionMsgOffsetFactory[] offsetFactories = new 
StreamPartitionMsgOffsetFactory[numStreams];
+      for (Map.Entry<Integer, SegmentZKMetadata> entry : 
latestSegmentZKMetadataMap.entrySet()) {
+        int partitionGroupId = entry.getKey();
+        int index = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId);
+        int streamPartitionId = 
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
+        SegmentZKMetadata zkMetadata = entry.getValue();
+        LLCSegmentName llcSegmentName = new 
LLCSegmentName(zkMetadata.getSegmentName());
+        StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index];
+        if (offsetFactory == null) {
+          offsetFactory =
+              
StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory();
+          offsetFactories[index] = offsetFactory;
+        }
+        result.add(new PartitionGroupConsumptionStatus(partitionGroupId, 
streamPartitionId,
+            llcSegmentName.getSequenceNumber(),
+            offsetFactory.create(zkMetadata.getStartOffset()),
+            zkMetadata.getEndOffset() != null ? 
offsetFactory.create(zkMetadata.getEndOffset()) : null,
+            zkMetadata.getStatus().toString()));
+      }
+    }
+    return result;
+  }
+
   private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria 
offsetCriteria, int partitionGroupId,
       Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
       Map<Integer, StreamPartitionMsgOffset> 
partitionGroupIdToSmallestStreamOffset, String tableName,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 13a15c045e4..e43fba113e2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -28,6 +28,7 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -1863,6 +1864,76 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Assert.assertEquals(partitionIds.size(), 2);
   }
 
+  /**
+   * Verifies that {@code buildPartitionGroupConsumptionStatusFromZKMetadata} 
produces the same results as
+   * {@code getPartitionGroupConsumptionStatusList} for the common case where 
IdealState and ZK metadata are in sync.
+   * This validates that the optimization in {@code 
fetchPartitionGroupIdToSmallestOffset} (reusing the pre-computed
+   * latestSegmentZKMetadataMap instead of rescanning the entire IdealState) 
does not change behavior.
+   */
+  @Test
+  public void 
testBuildPartitionGroupConsumptionStatusFromZKMetadataMatchesOriginal() {
+    // Set up a table with 2 replicas, 5 instances, 4 partitions
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    setUpNewTable(segmentManager, 2, 5, 4);
+
+    // Commit segments for partitions 0 and 1 to get a mix of ONLINE (DONE) 
and CONSUMING (IN_PROGRESS) segments
+    for (int partitionGroupId = 0; partitionGroupId < 2; partitionGroupId++) {
+      String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 
partitionGroupId, 0, CURRENT_TIME_MS).getSegmentName();
+      CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(segmentName,
+          new LongMsgOffset(PARTITION_OFFSET.getOffset() + 
NUM_DOCS).toString(), 0L);
+      committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+      segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, 
committingSegmentDescriptor);
+    }
+
+    // Build latestSegmentZKMetadataMap from the fake ZK metadata (same logic 
as getLatestSegmentZKMetadataMap)
+    Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = new 
HashMap<>();
+    for (Map.Entry<String, SegmentZKMetadata> entry : 
segmentManager._segmentZKMetadataMap.entrySet()) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(entry.getKey());
+      int partitionId = llcSegmentName.getPartitionGroupId();
+      latestSegmentZKMetadataMap.merge(partitionId, entry.getValue(),
+          (existing, candidate) -> {
+            int existingSeq = new 
LLCSegmentName(existing.getSegmentName()).getSequenceNumber();
+            int candidateSeq = new 
LLCSegmentName(candidate.getSegmentName()).getSequenceNumber();
+            return candidateSeq > existingSeq ? candidate : existing;
+          });
+    }
+
+    // Get results from both methods
+    List<PartitionGroupConsumptionStatus> fromIdealState =
+        
segmentManager.getPartitionGroupConsumptionStatusList(segmentManager._idealState,
+            segmentManager._streamConfigs);
+    List<PartitionGroupConsumptionStatus> fromZKMetadata =
+        
segmentManager.buildPartitionGroupConsumptionStatusFromZKMetadata(latestSegmentZKMetadataMap,
+            segmentManager._streamConfigs);
+
+    // Sort both by partition group id for comparison
+    
fromIdealState.sort(Comparator.comparingInt(PartitionGroupConsumptionStatus::getPartitionGroupId));
+    
fromZKMetadata.sort(Comparator.comparingInt(PartitionGroupConsumptionStatus::getPartitionGroupId));
+
+    // Verify same number of partitions
+    assertEquals(fromIdealState.size(), fromZKMetadata.size(),
+        "Both methods should return the same number of partitions");
+
+    // Verify each partition has identical consumption status
+    for (int i = 0; i < fromIdealState.size(); i++) {
+      PartitionGroupConsumptionStatus isStatus = fromIdealState.get(i);
+      PartitionGroupConsumptionStatus zkStatus = fromZKMetadata.get(i);
+
+      assertEquals(zkStatus.getPartitionGroupId(), 
isStatus.getPartitionGroupId(),
+          "Partition group id mismatch at index " + i);
+      assertEquals(zkStatus.getSequenceNumber(), isStatus.getSequenceNumber(),
+          "Sequence number mismatch for partition " + 
isStatus.getPartitionGroupId());
+      assertEquals(zkStatus.getStartOffset().toString(), 
isStatus.getStartOffset().toString(),
+          "Start offset mismatch for partition " + 
isStatus.getPartitionGroupId());
+      String zkEnd = zkStatus.getEndOffset() != null ? 
zkStatus.getEndOffset().toString() : null;
+      String isEnd = isStatus.getEndOffset() != null ? 
isStatus.getEndOffset().toString() : null;
+      assertEquals(zkEnd, isEnd,
+          "End offset mismatch for partition " + 
isStatus.getPartitionGroupId());
+      assertEquals(zkStatus.getStatus(), isStatus.getStatus(),
+          "Status mismatch for partition " + isStatus.getPartitionGroupId());
+    }
+  }
+
   @Test
   public void testReduceSegmentSizeAndReset() {
     // Set up a new table with 2 replicas, 5 instances, 4 partitions


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to