Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2882412773


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -122,25 +135,27 @@ private Boolean fetchMultipleStreams()
               .collect(Collectors.toList());
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
           StreamConsumerFactory.getUniqueClientId(clientId))) {
-        _newPartitionGroupMetadataList.addAll(
+        List<PartitionGroupMetadata> partitionGroupMetadataList =
             streamMetadataProvider.computePartitionGroupMetadata(clientId,
                     streamConfig, topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000,
                     _forceGetOffsetFromStream)
                 .stream()
                 .map(metadata -> new PartitionGroupMetadata(
                     
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
-                        index), metadata.getStartOffset()))
-                .collect(Collectors.toList()));
+                        index), metadata.getStartOffset(), 
metadata.getSequenceNumber()))
+                .collect(Collectors.toList());
+        _streamMetadataList.add(
+            new StreamMetadata(streamConfig, index, 
partitionGroupMetadataList.size(), partitionGroupMetadataList));

Review Comment:
   Same issue as single-stream: `partitionCount` is set to 
`partitionGroupMetadataList.size()` when constructing `StreamMetadata`. If 
`computePartitionGroupMetadata(...)` returns only a subset of partitions, 
`partitionCount` will be wrong and can affect downstream logic (e.g., 
sizing/assignment that uses `StreamMetadata::getPartitionCount`). Consider 
fetching the actual partition count separately (e.g., 
`streamMetadataProvider.fetchPartitionCount(...)`) and storing that in 
`StreamMetadata`.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -51,23 +51,33 @@ public PartitionGroupMetadataFetcher(List<StreamConfig> 
streamConfigs,
     _pausedTopicIndices = pausedTopicIndices;
   }
 
+  public List<StreamMetadata> getStreamMetadataList() {
+    return _streamMetadataList;
+  }
+
+  /**
+   * @deprecated after 1.5.0 release. Use {@link #getStreamMetadataList()} 
instead.
+   */
+  @Deprecated
   public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
-    return _newPartitionGroupMetadataList;
+    return _streamMetadataList.stream()
+        .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
+        .collect(Collectors.toList());
   }
 
   public Exception getException() {
     return _exception;
   }
 
   /**
-   * Callable to fetch the {@link PartitionGroupMetadata} list, from the 
stream.
+   * Callable to fetch the {@link StreamMetadata} list from the streams.
    * The stream requires the list of {@link PartitionGroupConsumptionStatus} 
to compute the new
    * {@link PartitionGroupMetadata}
    */
   @Override
   public Boolean call()
       throws Exception {

Review Comment:
   `call()` clears `_streamMetadataList` but does not reset `_exception`. When 
this fetcher is used with `RetryPolicy.attempt(...)`, a transient failure can 
set `_exception`, a later successful retry can return `true`, and 
`getException()` will still return the previous failure. Consider clearing 
`_exception` at the start of `call()` (and/or when a fetch succeeds) so 
`getException()` reflects the latest execution.
   ```suggestion
         throws Exception {
       _exception = null;
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -81,18 +91,21 @@ private Boolean fetchSingleStream()
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
         StreamConsumerFactory.getUniqueClientId(clientId))) {
-      
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
 streamConfig,
-          _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, 
_forceGetOffsetFromStream));
+      List<PartitionGroupMetadata> partitionGroupMetadataList =
+          streamMetadataProvider.computePartitionGroupMetadata(clientId, 
streamConfig,
+              _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, 
_forceGetOffsetFromStream);
+      _streamMetadataList.add(
+          new StreamMetadata(streamConfig, 0, 
partitionGroupMetadataList.size(), partitionGroupMetadataList));

Review Comment:
   `StreamMetadata` is documented as carrying the total partition count, which 
can differ from the returned `PartitionGroupMetadata` list size. Here 
`partitionCount` is set to `partitionGroupMetadataList.size()`, which will be 
incorrect for providers that skip partitions (e.g., end-of-life shards). 
Consider deriving `partitionCount` from 
`StreamMetadataProvider.fetchPartitionCount(...)` (or otherwise ensuring it 
represents the total partition count) instead of using the metadata list size.
   ```suggestion
         int partitionCount = 
streamMetadataProvider.fetchPartitionCount(clientId, streamConfig);
         _streamMetadataList.add(
             new StreamMetadata(streamConfig, 0, partitionCount, 
partitionGroupMetadataList));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to