Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2886009514


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java:
##########
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Groups partition metadata for a single stream/topic.
+ *
+ * <p>This replaces the flat {@code List<PartitionGroupMetadata>} pattern 
where partitions from all streams were mixed
+ * together and required partition ID padding to identify stream membership.
+ *
+ * <p>The {@link PartitionGroupMetadata} items within this container use 
Pinot-encoded partition IDs
+ * (i.e., {@code streamIndex * 10000 + streamPartitionId}) to maintain 
backward compatibility with segment names
+ * stored in ZooKeeper.
+ */
+public class StreamMetadata {
+
+  private final StreamConfig _streamConfig;
+  private final int _streamConfigIndex;
+  private final int _partitionCount;
+  private final List<PartitionGroupMetadata> _partitionGroupMetadataList;
+
+  public StreamMetadata(StreamConfig streamConfig, int streamConfigIndex, int 
partitionCount,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
+    _streamConfig = streamConfig;
+    _streamConfigIndex = streamConfigIndex;
+    _partitionCount = partitionCount;
+    _partitionGroupMetadataList = partitionGroupMetadataList;
+  }

Review Comment:
   `StreamMetadata` is intended to be a value container, but it stores the 
caller-provided `partitionGroupMetadataList` reference directly. Even though 
`getPartitionGroupMetadataList()` returns an unmodifiable view, external 
mutations of the original list would still be reflected. Consider defensively 
copying (e.g., `List.copyOf(...)`) in the constructor (and optionally 
validating non-null) to make the object truly immutable and avoid subtle bugs.



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -851,11 +857,16 @@ public void testRepairs() {
      * End of shard cases
      */
     // 1 reached end of shard.
-    List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
+    List<StreamMetadata> streamMetadataListWithout1 =
+        segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
             mock(IdealState.class));
-    partitionGroupMetadataListWithout1.remove(1);
-    segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout1;
+    // Remove partition 1 from the first stream's metadata
+    StreamMetadata origSm = streamMetadataListWithout1.get(0);
+    List<PartitionGroupMetadata> filteredPgList = new 
ArrayList<>(origSm.getPartitionGroupMetadataList());
+    filteredPgList.remove(1);
+    segmentManager._streamMetadataList = Collections.singletonList(
+        new StreamMetadata(origSm.getStreamConfig(), 
origSm.getStreamConfigIndex(),
+            filteredPgList.size(), filteredPgList));

Review Comment:
   Same as earlier in this file: `partitionCount` is set to 
`filteredPgList.size()` after removing an entry. For realism, the test should 
keep `partitionCount` equal to the stream's total partition count returned by 
`fetchPartitionCount()`, not the size of the filtered metadata list, so it 
exercises the code paths where `partitionCount > 
partitionGroupMetadataList.size()`.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -51,23 +51,34 @@ public PartitionGroupMetadataFetcher(List<StreamConfig> 
streamConfigs,
     _pausedTopicIndices = pausedTopicIndices;
   }
 
+  public List<StreamMetadata> getStreamMetadataList() {
+    return _streamMetadataList;
+  }

Review Comment:
   `getStreamMetadataList()` returns the internal mutable 
`_streamMetadataList`. Since this is a new public API, it would be safer to 
return an unmodifiable view or a defensive copy to prevent callers from 
accidentally mutating the fetcher's internal state (which can affect subsequent 
retries/calls).



##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java:
##########
@@ -296,12 +297,17 @@ public void testCommitSegment() {
       // Expected
     }
 
-    // committing segment's partitionGroupId no longer in the 
newPartitionGroupMetadataList
-    List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
+    // committing segment's partitionGroupId no longer in the 
newStreamMetadataList
+    List<StreamMetadata> streamMetadataListWithout0 =
+        segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
             mock(IdealState.class));
-    partitionGroupMetadataListWithout0.remove(0);
-    segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout0;
+    // Remove partition 0 from the first stream's metadata
+    StreamMetadata originalSm = streamMetadataListWithout0.get(0);
+    List<PartitionGroupMetadata> filteredList = new 
ArrayList<>(originalSm.getPartitionGroupMetadataList());
+    filteredList.remove(0);
+    segmentManager._streamMetadataList = Collections.singletonList(
+        new StreamMetadata(originalSm.getStreamConfig(), 
originalSm.getStreamConfigIndex(),
+            filteredList.size(), filteredList));

Review Comment:
   In this test setup, `StreamMetadata`'s `partitionCount` is set to 
`filteredList.size()`, but the production contract is that `partitionCount` 
comes from `fetchPartitionCount()` and can be greater than the size of 
`partitionGroupMetadataList()` (e.g., when only a subset is assigned / 
end-of-life partitions are omitted). To better reflect real behavior and avoid 
masking issues, keep `partitionCount` equal to the original stream partition 
count even after filtering the metadata list.
   ```suggestion
               originalSm.getPartitionCount(), filteredList));
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java:
##########
@@ -28,12 +28,20 @@
  */
 public class PartitionGroupMetadata {
 
+  private static final int DEFAULT_SEQUENCE_NUMBER = 0;
+
   private final int _partitionGroupId;
   private final StreamPartitionMsgOffset _startOffset;
+  private final int _sequenceNumber;
 
   public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset 
startOffset) {
+    this(partitionGroupId, startOffset, DEFAULT_SEQUENCE_NUMBER);
+  }
+
+  public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset 
startOffset, int sequenceNumber) {
     _partitionGroupId = partitionGroupId;
     _startOffset = startOffset;
+    _sequenceNumber = sequenceNumber;
   }

Review Comment:
   The class-level documentation still describes `PartitionGroupMetadata` as 
containing only partitionGroupId and startOffset, but a `sequenceNumber` field 
is now part of the public API. Please update the Javadoc to include the 
sequence number semantics (e.g., initial sequence vs. subsequent segments) so 
callers know what `getSequenceNumber()` represents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to